Commit 04638677 authored by olivier R-D's avatar olivier R-D

create_object use instantiate() + related fix and cleanup

parent 4cf9389f
......@@ -22,9 +22,12 @@ def instantiate(parent, node_type, nodeid=None, bname=None, idx=0):
rdesc.BrowseName = qname
rdesc.DisplayName = dname
rdesc.NodeClass = nclass
rdesc.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HasComponent)
if parent.get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
else:
rdesc.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
rdesc.TypeDefinition = node_type.nodeid
print("MYRDESC", rdesc)
if nodeid is None:
nodeid = ua.NodeId(namespaceidx=idx) # will trigger automatic node generation in namespace idx
if bname is None:
......@@ -40,8 +43,6 @@ def _instantiate_node(server, parentid, rdesc, nodeid, bname, recursive=True):
instantiate a node type under parent
"""
print("\n\nInstanciating: node %s in %s" % (nodeid, parentid))
print(rdesc)
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = bname
......@@ -60,20 +61,19 @@ def _instantiate_node(server, parentid, rdesc, nodeid, bname, recursive=True):
_read_and_copy_attrs(node_type, ua.VariableAttributes(), addnode)
else:
print("Node class not supported: ", rdesc.NodeClass)
print("Instantiate: Node class not supported: ", rdesc.NodeClass)
res = server.add_nodes([addnode])[0]
if recursive:
descs = node_type.get_children_descriptions(includesubtypes=False)
for c_rdesc in descs:
print(" Instanciating children", c_rdesc)
_instantiate_node(server, res.AddedNodeId, c_rdesc, nodeid=ua.NodeId(namespaceidx=res.AddedNodeId.NamespaceIndex), bname=c_rdesc.BrowseName)
return Node(server, addnode.RequestedNewNodeId)
return Node(server, res.AddedNodeId)
def _read_and_copy_attrs(node_type, struct, addnode):
names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding", "IsAbstract")]
names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in ("BodyLength", "TypeId", "SpecifiedAttributes", "Encoding", "IsAbstract", "EventNotifier")]
attrs = [getattr(ua.AttributeIds, name) for name in names]
for name in names:
results = node_type.get_attributes(attrs)
......@@ -84,6 +84,6 @@ def _read_and_copy_attrs(node_type, struct, addnode):
else:
setattr(struct, name, results[idx].Value.Value)
else:
print("!!!!!!!!!!!!Error, for nodeid %s, attribute %s, statuscode is %s" % (node_type, name, results[idx].StatusCode))
print("Instantiate: while copying attributes from node type %s, attribute %s, statuscode is %s" % (node_type, name, results[idx].StatusCode))
addnode.NodeAttributes = struct
......@@ -3,6 +3,7 @@ High level functions to create nodes
"""
from opcua import ua
from opcua.common import node
from opcua.common.instantiate import instantiate
def _parse_nodeid_qname(*args):
......@@ -40,73 +41,102 @@ def create_folder(parent, nodeid, bname):
return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
def create_object(parent, nodeid, bname, objecttype=ua.ObjectIds.BaseObjectType):
def create_object(parent, nodeid, bname, objecttype=None):
"""
create a child node object
arguments are nodeid, browsename
or namespace index, name
arguments are nodeid, browsename, [objecttype]
or namespace index, name, [objecttype]
if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes
"""
nodeid, qname = _parse_nodeid_qname(nodeid, bname)
if isinstance(objecttype, int):
objecttype = ua.NodeId(objecttype)
elif isinstance(objecttype, ua.NodeId):
objecttype = objecttype
elif isinstance(objecttype, str):
objecttype = ua.NodeId.from_string(objecttype)
if objecttype is not None:
objecttype = node.Node(parent.server, objecttype)
return instantiate(parent, objecttype, nodeid, bname)
else:
raise TypeError("Could not recognise format of objecttype")
return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, objecttype))
return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType))
def create_property(parent, nodeid, bname, val, datatype=None):
def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
"""
create a child node property
args are nodeid, browsename, value, [variant type]
or idx, name, value, [variant type]
"""
nodeid, qname = _parse_nodeid_qname(nodeid, bname)
val, datatype = _to_variant_with_datatype(val, datatype)
var = ua.Variant(val, varianttype)
if datatype and not isinstance(datatype, ua.NodeId):
raise RuntimeError()
return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, datatype, isproperty=True))
return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True))
def create_variable(parent, *args):
def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
"""
create a child node variable
args are nodeid, browsename, value, [variant type], [data type]
or idx, name, value, [variant type], [data type]
"""
nodeid, qname = _parse_nodeid_qname(*args[:2])
val, datatype = _to_variant_with_datatype(*args[2:])
nodeid, qname = _parse_nodeid_qname(nodeid, bname)
var = ua.Variant(val, varianttype)
if datatype and not isinstance(datatype, ua.NodeId):
raise RuntimeError()
return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, datatype, isproperty=False))
return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False))
def create_variable_type(parent, nodeid, bname, datatype):
"""
Create a new variable type
args are nodeid, browsename, datatype, [variant type], [data type]
or idx, name, value, [variant type], [data type]
args are nodeid, browsename and datatype
or idx, name and data type
"""
nodeid, qname = _parse_nodeid_qname(nodeid, bname)
val, datatype = _to_variant_with_datatype(datatype)
if datatype and not isinstance(datatype, ua.NodeId):
raise RuntimeError()
return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, datatype, isproperty=False))
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = qname
addnode.NodeClass = ua.NodeClass.Variable
addnode.ParentNodeId = parent.nodeid
addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubType)
attrs = ua.VariableTypeAttributes()
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.DataType = datatype
attrs.IsAbstract = False
attrs.WriteMask = 0
attrs.UserWriteMask = 0
attrs.Historizing = 0
attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
addnode.NodeAttributes = attrs
results = parent.server.add_nodes([addnode])
results[0].StatusCode.check()
return results[0].AddedNodeId
def create_data_type(parent, nodeid, bname):
def create_reference_type(parent, nodeid, bname):
"""
Create a new data type to be used in new variables, etc ..
arguments are nodeid, browsename
or namespace index, name
Create a new reference type
args are nodeid and browsename
or idx and name
"""
nodeid, qname = _parse_nodeid_qname(nodeid, bname)
return node.Node(parent.server, _create_data_type(parent.server, parent.nodeid, nodeid, qname))
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = qname
addnode.NodeClass = ua.NodeClass.Variable
addnode.ParentNodeId = parent.nodeid
addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubType)
attrs = ua.ReferenceTypeAttributes()
attrs.IsAbstract = False
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
addnode.NodeAttributes = attrs
results = parent.server.add_nodes([addnode])
results[0].StatusCode.check()
return results[0].AddedNodeId
def create_object_type(parent, nodeid, bname):
......@@ -146,7 +176,7 @@ def _create_object(server, parentnodeid, nodeid, qname, objecttype):
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = qname
addnode.ParentNodeId = parentnodeid
if node.Node(server, parentnodeid).get_type_definition() == ua.ObjectIds.FolderType:
if node.Node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType):
addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
else:
addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
......@@ -187,20 +217,7 @@ def _create_object_type(server, parentnodeid, nodeid, qname):
return results[0].AddedNodeId
def _to_variant(val, vtype=None):
return _to_variant_with_datatype(val, vtype, datatype=None)[0]
def _to_variant_with_datatype(val, vtype=None, datatype=None):
if isinstance(val, ua.Variant):
if vtype:
datatype = vtype
return val, datatype
else:
return ua.Variant(val, vtype), datatype
def _create_variable(server, parentnodeid, nodeid, qname, val, datatype=None, isproperty=False):
def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False):
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = qname
......@@ -218,10 +235,10 @@ def _create_variable(server, parentnodeid, nodeid, qname, val, datatype=None, is
if datatype:
attrs.DataType = datatype
else:
attrs.DataType = _guess_uatype(val)
attrs.DataType = _guess_datatype(var)
attrs.Value = val
if isinstance(val, list) or isinstance(val, tuple):
attrs.Value = var
if isinstance(var, list) or isinstance(var, tuple):
attrs.ValueRank = ua.ValueRank.OneDimension
else:
attrs.ValueRank = ua.ValueRank.Scalar
......@@ -267,16 +284,26 @@ def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=N
return results[0].AddedNodeId
def _create_data_type(server, parentnodeid, nodeid, qname):
def create_data_type(server, parentnodeid, nodeid, qname, description=None):
"""
Create a new data type to be used in new variables, etc ..
arguments are nodeid, browsename
or namespace index, name
"""
nodeid, qname = _parse_nodeid_qname(nodeid, bname)
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = qname
addnode.NodeClass = ua.NodeClass.DataType
addnode.ParentNodeId = parentnodeid
addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubType)
#addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # Not type definition for types
#addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types
attrs = ua.DataTypeAttributes()
if description is None:
attrs.Description = ua.LocalizedText(qname.Name)
else:
attrs.Description = ua.LocalizedText(description)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.WriteMask = 0
attrs.UserWriteMask = 0
......@@ -321,11 +348,11 @@ def _vtype_to_argument(vtype):
arg = ua.Argument()
v = ua.Variant(None, vtype)
arg.DataType = _guess_uatype(v)
arg.DataType = _guess_datatype(v)
return arg
def _guess_uatype(variant):
def _guess_datatype(variant):
if variant.VariantType == ua.VariantType.ExtensionObject:
if variant.Value is None:
raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
......
......@@ -351,8 +351,8 @@ class Node(object):
"""
references = self.get_references(refs=ua.ObjectIds.HasTypeDefinition, direction=ua.BrowseDirection.Forward)
if len(references) == 0:
return ua.ObjectIds.BaseObjectType
return references[0].NodeId.Identifier
return None
return references[0].NodeId
def get_parent(self):
"""
......
......@@ -375,12 +375,12 @@ class Server(object):
datatype = None
if len(property) > 2:
datatype = property[2]
custom_t.add_property(idx, property[0], ua.Variant(None, property[1]), datatype)
custom_t.add_property(idx, property[0], None, property[1], datatype=datatype)
for variable in variables:
datatype = None
if len(variable) > 2:
datatype = variable[2]
custom_t.add_variable(idx, variable[0], ua.Variant(None, variable[1]), datatype)
custom_t.add_variable(idx, variable[0], None, variable[1], datatype=datatype)
for method in methods:
custom_t.add_method(idx, method[0], method[1], method[2], method[3])
......
......@@ -499,16 +499,16 @@ class CommonTests(object):
f = objects.add_folder(3, 'MyFolder_TypeTest')
o = f.add_object(3, 'MyObject1', ua.ObjectIds.BaseObjectType)
self.assertEqual(o.get_type_definition(), ua.ObjectIds.BaseObjectType)
self.assertEqual(o.get_type_definition().Identifier, ua.ObjectIds.BaseObjectType)
o = f.add_object(3, 'MyObject2', ua.NodeId(ua.ObjectIds.BaseObjectType, 0))
self.assertEqual(o.get_type_definition(), ua.ObjectIds.BaseObjectType)
self.assertEqual(o.get_type_definition().Identifier, ua.ObjectIds.BaseObjectType)
base_otype= self.opc.get_node(ua.ObjectIds.BaseObjectType)
custom_otype = base_otype.add_object_type(2, 'MyFooObjectType')
o = f.add_object(3, 'MyObject3', custom_otype.nodeid)
self.assertEqual(o.get_type_definition(), custom_otype.nodeid.Identifier)
self.assertEqual(o.get_type_definition().Identifier, custom_otype.nodeid.Identifier)
references = o.get_references(refs=ua.ObjectIds.HasTypeDefinition, direction=ua.BrowseDirection.Forward)
self.assertEqual(len(references), 1)
......@@ -522,7 +522,7 @@ class CommonTests(object):
nodes = o.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
self.assertTrue(objects in nodes)
self.assertEqual(o.get_parent(), objects)
self.assertEqual(o.get_type_definition(), ua.ObjectIds.BaseObjectType)
self.assertEqual(o.get_type_definition().Identifier, ua.ObjectIds.BaseObjectType)
o2 = o.add_object(3, 'MySecondObject')
nodes = o.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Forward, includesubtypes=False)
......@@ -530,7 +530,7 @@ class CommonTests(object):
nodes = o2.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
self.assertTrue(o in nodes)
self.assertEqual(o2.get_parent(), o)
self.assertEqual(o2.get_type_definition(), ua.ObjectIds.BaseObjectType)
self.assertEqual(o2.get_type_definition().Identifier, ua.ObjectIds.BaseObjectType)
v = o.add_variable(3, 'MyVariable', 6)
nodes = o.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Forward, includesubtypes=False)
......@@ -538,7 +538,7 @@ class CommonTests(object):
nodes = v.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
self.assertTrue(o in nodes)
self.assertEqual(v.get_parent(), o)
self.assertEqual(v.get_type_definition(), ua.ObjectIds.BaseDataVariableType)
self.assertEqual(v.get_type_definition().Identifier, ua.ObjectIds.BaseDataVariableType)
p = o.add_property(3, 'MyProperty', 2)
nodes = o.get_referenced_nodes(refs=ua.ObjectIds.HasProperty, direction=ua.BrowseDirection.Forward, includesubtypes=False)
......@@ -546,7 +546,7 @@ class CommonTests(object):
nodes = p.get_referenced_nodes(refs=ua.ObjectIds.HasProperty, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
self.assertTrue(o in nodes)
self.assertEqual(p.get_parent(), o)
self.assertEqual(p.get_type_definition(), ua.ObjectIds.PropertyType)
self.assertEqual(p.get_type_definition().Identifier, ua.ObjectIds.PropertyType)
def test_get_endpoints(self):
endpoints = self.opc.get_endpoints()
......
......@@ -162,7 +162,7 @@ class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
nodes = o.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
self.assertTrue(objects in nodes)
self.assertEqual(o.get_parent(), objects)
self.assertEqual(o.get_type_definition(), ua.ObjectIds.BaseObjectType)
self.assertEqual(o.get_type_definition().Identifier, ua.ObjectIds.BaseObjectType)
@uamethod
def callback(parent):
......@@ -288,17 +288,17 @@ class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
('MyEnumVar', ua.VariantType.Int32, ua.NodeId(ua.ObjectIds.ApplicationType))]
methods = [('MyMethod', func, [ua.VariantType.Int64], [ua.VariantType.Boolean])]
type = self.opc.create_custom_object_type(2, 'MyObjectType', ua.ObjectIds.BaseObjectType, properties, variables, methods)
node_type = self.opc.create_custom_object_type(2, 'MyObjectType', ua.ObjectIds.BaseObjectType, properties, variables, methods)
check_custom_type(self, type, ua.ObjectIds.BaseObjectType)
variables = type.get_variables()
self.assertTrue(type.get_child("2:VariableString") in variables)
self.assertEqual(type.get_child("2:VariableString").get_data_value().Value.VariantType, ua.VariantType.String)
self.assertTrue(type.get_child("2:MyEnumVar") in variables)
self.assertEqual(type.get_child("2:MyEnumVar").get_data_value().Value.VariantType, ua.VariantType.Int32)
self.assertEqual(type.get_child("2:MyEnumVar").get_data_type(), ua.NodeId(ua.ObjectIds.ApplicationType))
methods = type.get_methods()
self.assertTrue(type.get_child("2:MyMethod") in methods)
check_custom_type(self, node_type, ua.ObjectIds.BaseObjectType)
variables = node_type.get_variables()
self.assertTrue(node_type.get_child("2:VariableString") in variables)
self.assertEqual(node_type.get_child("2:VariableString").get_data_value().Value.VariantType, ua.VariantType.String)
self.assertTrue(node_type.get_child("2:MyEnumVar") in variables)
self.assertEqual(node_type.get_child("2:MyEnumVar").get_data_value().Value.VariantType, ua.VariantType.Int32)
self.assertEqual(node_type.get_child("2:MyEnumVar").get_data_type(), ua.NodeId(ua.ObjectIds.ApplicationType))
methods = node_type.get_methods()
self.assertTrue(node_type.get_child("2:MyMethod") in methods)
#def test_create_custom_refrence_type_ObjectId(self):
#type = self.opc.create_custom_reference_type(2, 'MyEvent', ua.ObjectIds.Base, [('PropertyNum', ua.VariantType.Int32), ('PropertyString', ua.VariantType.String)])
......@@ -365,11 +365,11 @@ class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
def test_add_variable_with_datatype(self):
o = self.opc.get_objects_node()
v1 = o.add_variable(3, 'VariableEnumType1', ua.ApplicationType.ClientAndServer, None, ua.NodeId(ua.ObjectIds.ApplicationType))
v1 = o.add_variable(3, 'VariableEnumType1', ua.ApplicationType.ClientAndServer, datatype=ua.NodeId(ua.ObjectIds.ApplicationType))
tp1 = v1.get_data_type()
self.assertEqual( ua.NodeId(ua.ObjectIds.ApplicationType), tp1)
v2 = o.add_variable(3, 'VariableEnumType2', ua.ApplicationType.ClientAndServer, None, ua.NodeId(ua.ObjectIds.ApplicationType) )
v2 = o.add_variable(3, 'VariableEnumType2', ua.ApplicationType.ClientAndServer, datatype=ua.NodeId(ua.ObjectIds.ApplicationType) )
tp2 = v2.get_data_type()
self.assertEqual( ua.NodeId(ua.ObjectIds.ApplicationType), tp2)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment