Commit c19b0fd2 authored by Mathias Lüdtke's avatar Mathias Lüdtke Committed by oroulet

Don't instantiate nodes without a modelling rule (#511)

Don't instantiate nodes without a modelling rule
parent 482a3c49
......@@ -31,21 +31,14 @@ def instantiate(parent, node_type, nodeid=None, bname=None, dname=None, idx=0):
elif isinstance(bname, str):
bname = ua.QualifiedName.from_string(bname)
nodeids = _instantiate_node(parent.server, parent.nodeid, rdesc, nodeid, bname, dname=dname)
nodeids = _instantiate_node(parent.server, Node(parent.server, rdesc.NodeId), parent.nodeid, rdesc, nodeid, bname, dname=dname, toplevel=True)
return [Node(parent.server, nid) for nid in nodeids]
def _instantiate_node(server, parentid, rdesc, nodeid, bname, dname=None, recursive=True):
def _instantiate_node(server, node_type, parentid, rdesc, nodeid, bname, dname=None, recursive=True, toplevel=False):
"""
instantiate a node type under parent
"""
node_type = Node(server, rdesc.NodeId)
refs = node_type.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule)
# skip optional elements
if len(refs) == 1 and refs[0].nodeid == ua.NodeId(ua.ObjectIds.ModellingRule_Optional):
return []
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = bname
......@@ -84,14 +77,23 @@ def _instantiate_node(server, parentid, rdesc, nodeid, bname, dname=None, recurs
for c_rdesc in descs:
# skip items that already exists, prefer the 'lowest' one in object hierarchy
if not ua_utils.is_child_present(node, c_rdesc.BrowseName):
c_node_type = Node(server, c_rdesc.NodeId)
refs = c_node_type.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule)
# exclude nodes without ModellingRule at top-level
if toplevel and len(refs) == 0:
continue
# skip optional elements (server policy)
if len(refs) == 1 and refs[0].nodeid == ua.NodeId(ua.ObjectIds.ModellingRule_Optional):
logger.info("Will not instantiate optional node %s as part of %s", c_rdesc.BrowseName, addnode.BrowseName)
continue
# if root node being instantiated has a String NodeId, create the children with a String NodeId
if res.AddedNodeId.NodeIdType is ua.NodeIdType.String:
inst_nodeid = res.AddedNodeId.Identifier + "." + c_rdesc.BrowseName.Name
nodeids = _instantiate_node(server, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(identifier=inst_nodeid, namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
nodeids = _instantiate_node(server, c_node_type, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(identifier=inst_nodeid, namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
else:
nodeids = _instantiate_node(server, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
nodeids = _instantiate_node(server, c_node_type, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
added_nodes.extend(nodeids)
return added_nodes
......@@ -570,6 +570,15 @@ class Node(object):
results = opcua.common.manage_nodes.delete_nodes(self.server, [self], recursive, delete_references)
_check_results(results)
def _fill_delete_reference_item(self, rdesc, bidirectional = False):
ditem = ua.DeleteReferencesItem()
ditem.SourceNodeId = self.nodeid
ditem.TargetNodeId = rdesc.NodeId
ditem.ReferenceTypeId = rdesc.ReferenceTypeId
ditem.IsForward = rdesc.IsForward
ditem.DeleteBidirectional = bidirectional
return ditem
def delete_reference(self, target, reftype, forward=True, bidirectional=True):
"""
Delete given node's references from address space
......@@ -584,13 +593,7 @@ class Node(object):
else:
raise ua.UaStatusCodeError(ua.StatusCodes.BadNotFound)
ditem = ua.DeleteReferencesItem()
ditem.SourceNodeId = self.nodeid
ditem.TargetNodeId = rdesc.NodeId
ditem.ReferenceTypeId = rdesc.ReferenceTypeId
ditem.IsForward = rdesc.IsForward
ditem.DeleteBidirectional = bidirectional
ditem = self._fill_delete_reference_item(rdesc, bidirectional)
self.server.delete_references([ditem])[0].check()
def add_reference(self, target, reftype, forward=True, bidirectional=True):
......@@ -617,14 +620,33 @@ class Node(object):
results = self.server.add_references(params)
_check_results(results, len(params))
def _add_modelling_rule(self, parent, mandatory=True):
if mandatory is not None and parent.get_node_class() == ua.NodeClass.ObjectType:
rule=ua.ObjectIds.ModellingRule_Mandatory if mandatory else ua.ObjectIds.ModellingRule_Optional
self.add_reference(rule, ua.ObjectIds.HasModellingRule, True, False)
return self
def set_modelling_rule(self, mandatory):
parent = self.get_parent()
if parent is None:
return ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
if parent.get_node_class() != ua.NodeClass.ObjectType:
return ua.StatusCode(ua.StatusCodes.BadTypeMismatch)
# remove all existing modelling rule
rules = self.get_references(ua.ObjectIds.HasModellingRule)
self.server.delete_references(list(map(self._fill_delete_reference_item, rules)))
self._add_modelling_rule(parent, mandatory)
return ua.StatusCode()
def add_folder(self, nodeid, bname):
return opcua.common.manage_nodes.create_folder(self, nodeid, bname)
return opcua.common.manage_nodes.create_folder(self, nodeid, bname)._add_modelling_rule(self)
def add_object(self, nodeid, bname, objecttype=None):
return opcua.common.manage_nodes.create_object(self, nodeid, bname, objecttype)
return opcua.common.manage_nodes.create_object(self, nodeid, bname, objecttype)._add_modelling_rule(self)
def add_variable(self, nodeid, bname, val, varianttype=None, datatype=None):
return opcua.common.manage_nodes.create_variable(self, nodeid, bname, val, varianttype, datatype)
return opcua.common.manage_nodes.create_variable(self, nodeid, bname, val, varianttype, datatype)._add_modelling_rule(self)
def add_object_type(self, nodeid, bname):
return opcua.common.manage_nodes.create_object_type(self, nodeid, bname)
......@@ -636,10 +658,10 @@ class Node(object):
return opcua.common.manage_nodes.create_data_type(self, nodeid, bname, description=None)
def add_property(self, nodeid, bname, val, varianttype=None, datatype=None):
return opcua.common.manage_nodes.create_property(self, nodeid, bname, val, varianttype, datatype)
return opcua.common.manage_nodes.create_property(self, nodeid, bname, val, varianttype, datatype)._add_modelling_rule(self)
def add_method(self, *args):
return opcua.common.manage_nodes.create_method(self, *args)
return opcua.common.manage_nodes.create_method(self, *args)._add_modelling_rule(self)
def add_reference_type(self, nodeid, bname, symmetric=True, inversename=None):
return opcua.common.manage_nodes.create_reference_type(self, nodeid, bname, symmetric, inversename)
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment