Commit 34be091e authored by olivier R-D's avatar olivier R-D

even more heavy restructuring

parent a11e0ab3
......@@ -46,6 +46,7 @@ Tested servers: freeopcua C++, freeopcua Python, prosys, kepware, beckoff
Client: what is not implemented yet
* localized text feature
* removing nodes
* adding some missing modify methods
......@@ -66,7 +67,8 @@ Server: what works:
Tested clients: freeopcua C++, freeopcua Python, uaexpert, prosys, quickopc
Server: what is not implemented
* complete securty model with users
* localized text feature
* complete securty model with users and password
* removing nodes
* adding some missing modify methods
......
......@@ -2,48 +2,19 @@
Pure Python OPC-UA library
"""
from opcua.client.binary_client import BinaryClient
#from opcua.common.create_nodes import create_folder
#from opcua.common.create_nodes import create_object
#from opcua.common.create_nodes import create_variable
#from opcua.common.create_nodes import create_property
#from opcua.common.methods import call_method
from opcua.common.node import Node
from opcua.common.node import create_object
from opcua.common.node import create_folder
from opcua.common.node import create_variable
from opcua.common.node import create_property
from opcua.common.node import create_method
from opcua.common.node import call_method
from opcua.common.methods import uamethod
from opcua.common.event import Event
from opcua.common.subscription import Subscription
from opcua.client.client import Client
from opcua.server.server import Server
from opcua.common.instanciate import instanciate_node
from opcua.ua import ObjectIds
from opcua.ua import AttributeIds
from opcua.ua import StatusCodes
# next we have some methods which should really be moved somewhere else
def uamethod(func):
"""
Method decorator to automatically convert
arguments and output to and from variants
"""
def wrapper(parent, *args):
if isinstance(parent, ua.NodeId):
result = func(parent, *[arg.Value for arg in args])
else:
self = parent
parent = args[0]
args = args[1:]
result = func(self, parent, *[arg.Value for arg in args])
return to_variant(result)
return wrapper
def to_variant(*args):
uaargs = []
for arg in args:
uaargs.append(ua.Variant(arg))
return uaargs
"""
High level functions to create nodes
"""
from opcua import ua
from opcua.common import node
def _parse_add_args(*args):
if isinstance(args[0], ua.NodeId):
return args[0], args[1]
elif isinstance(args[0], str):
return ua.NodeId.from_string(args[0]), ua.QualifiedName.from_string(args[1])
elif isinstance(args[0], int):
return ua.generate_nodeid(args[0]), ua.QualifiedName(args[1], args[0])
else:
raise TypeError("Add methods takes a nodeid and a qualifiedname as argument, received %s" % args)
def create_folder(parent, *args):
"""
create a child node folder
arguments are nodeid, browsename
or namespace index, name
"""
nodeid, qname = _parse_add_args(*args)
return node.Node(parent.server, _create_folder(parent.server, parent.nodeid, nodeid, qname))
def create_object(parent, *args):
"""
create a child node object
arguments are nodeid, browsename
or namespace index, name
"""
nodeid, qname = _parse_add_args(*args)
return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname))
def create_property(parent, *args):
"""
create a child node property
args are nodeid, browsename, value, [variant type]
or idx, name, value, [variant type]
"""
nodeid, qname = _parse_add_args(*args[:2])
val = _to_variant(*args[2:])
return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=True))
def create_variable(parent, *args):
"""
create a child node variable
args are nodeid, browsename, value, [variant type]
or idx, name, value, [variant type]
"""
nodeid, qname = _parse_add_args(*args[:2])
val = _to_variant(*args[2:])
return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, val, isproperty=False))
def create_method(parent, *args):
"""
create a child method object
This is only possible on server side!!
args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
or idx, name, method_to_be_called, [input argument types], [output argument types]
if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
"""
nodeid, qname = _parse_add_args(*args[:2])
callback = args[2]
if len(args) > 3:
inputs = args[3]
if len(args) > 4:
outputs = args[4]
return _create_method(parent, nodeid, qname, callback, inputs, outputs)
def _create_folder(server, parentnodeid, nodeid, qname):
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = qname
addnode.NodeClass = ua.NodeClass.Object
addnode.ParentNodeId = parentnodeid
addnode.ReferenceTypeId = ua.NodeId.from_string("i=35")
addnode.TypeDefinition = ua.NodeId.from_string("i=61")
attrs = ua.ObjectAttributes()
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.WriteMask = ua.OpenFileMode.Read
attrs.UserWriteMask = ua.OpenFileMode.Read
attrs.EventNotifier = 0
addnode.NodeAttributes = attrs
results = server.add_nodes([addnode])
results[0].StatusCode.check()
return nodeid
def _create_object(server, parentnodeid, nodeid, qname):
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = qname
addnode.NodeClass = ua.NodeClass.Object
addnode.ParentNodeId = parentnodeid
addnode.ReferenceTypeId = ua.NodeId.from_string("i=35")
addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
attrs = ua.ObjectAttributes()
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.EventNotifier = 0
attrs.WriteMask = ua.OpenFileMode.Read
attrs.UserWriteMask = ua.OpenFileMode.Read
addnode.NodeAttributes = attrs
results = server.add_nodes([addnode])
results[0].StatusCode.check()
return nodeid
def _to_variant(val, vtype=None):
if isinstance(val, ua.Variant):
return val
else:
return ua.Variant(val, vtype)
def _create_variable(server, parentnodeid, nodeid, qname, val, isproperty=False):
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = qname
addnode.NodeClass = ua.NodeClass.Variable
addnode.ParentNodeId = parentnodeid
if isproperty:
addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty)
addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType)
else:
addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent)
addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType)
attrs = ua.VariableAttributes()
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.DataType = _guess_uatype(val)
attrs.Value = val
if isinstance(val, list) or isinstance(val, tuple):
attrs.ValueRank = ua.ValueRank.OneDimension
else:
attrs.ValueRank = ua.ValueRank.Scalar
#attrs.ArrayDimensions = None
attrs.WriteMask = ua.OpenFileMode.Read
attrs.UserWriteMask = ua.OpenFileMode.Read
attrs.Historizing = 0
addnode.NodeAttributes = attrs
results = server.add_nodes([addnode])
results[0].StatusCode.check()
return nodeid
def _create_method(parent, nodeid, qname, callback, inputs, outputs):
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = qname
addnode.NodeClass = ua.NodeClass.Method
addnode.ParentNodeId = parent.nodeid
addnode.ReferenceTypeId = ua.NodeId.from_string("i=47")
#node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
attrs = ua.MethodAttributes()
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.WriteMask = ua.OpenFileMode.Read
attrs.UserWriteMask = ua.OpenFileMode.Read
attrs.Executable = True
attrs.UserExecutable = True
addnode.NodeAttributes = attrs
results = parent.server.add_nodes([addnode])
results[0].StatusCode.check()
method = node.Node(parent.server, nodeid)
if inputs:
create_property(method, ua.generate_nodeid(qname.NamespaceIndex), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs])
if outputs:
create_property(method, ua.generate_nodeid(qname.NamespaceIndex), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs])
parent.server.add_method_callback(method.nodeid, callback)
return nodeid
def _vtype_to_argument(vtype):
if isinstance(vtype, ua.Argument):
return vtype
arg = ua.Argument()
v = ua.Variant(None, vtype)
arg.DataType = _guess_uatype(v)
return arg
def _guess_uatype(variant):
if variant.VariantType == ua.VariantType.ExtensionObject:
if variant.Value is None:
raise ua.UAError("Cannot guess DataType from Null ExtensionObject")
if type(variant.Value) in (list, tuple):
if len(variant.Value) == 0:
raise ua.UAError("Cannot guess DataType from Null ExtensionObject")
extobj = variant.Value[0]
else:
extobj = variant.Value
classname = extobj.__class__.__name__
return ua.NodeId(getattr(ua.ObjectIds, classname))
else:
return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
"""
High level method related functions
"""
from opcua import ua
from opcua.common import node
def call_method(parent, methodid, *args):
"""
Call an OPC-UA method. methodid is browse name of child method or the
nodeid of method as a NodeId object
arguments are variants or python object convertible to variants.
which may be of different types
returns a list of variants which are output of the method
"""
if isinstance(methodid, str):
methodid = parent.get_child(methodid).nodeid
elif isinstance(methodid, node.Node):
methodid = methodid.nodeid
arguments = []
for arg in args:
if not isinstance(arg, ua.Variant):
arg = ua.Variant(arg)
arguments.append(arg)
result = _call_method(parent.server, parent.nodeid, methodid, arguments)
if len(result.OutputArguments) == 0:
return None
elif len(result.OutputArguments) == 1:
return result.OutputArguments[0].Value
else:
return [var.Value for var in result.OutputArguments]
def _call_method(server, parentnodeid, methodid, arguments):
request = ua.CallMethodRequest()
request.ObjectId = parentnodeid
request.MethodId = methodid
request.InputArguments = arguments
methodstocall = [request]
results = server.call(methodstocall)
res = results[0]
res.StatusCode.check()
return res
def uamethod(func):
"""
Method decorator to automatically convert
arguments and output to and from variants
"""
def wrapper(parent, *args):
if isinstance(parent, ua.NodeId):
result = func(parent, *[arg.Value for arg in args])
else:
self = parent
parent = args[0]
args = args[1:]
result = func(self, parent, *[arg.Value for arg in args])
return to_variant(result)
return wrapper
def to_variant(*args):
uaargs = []
for arg in args:
uaargs.append(ua.Variant(arg))
return uaargs
This diff is collapsed.
......@@ -18,8 +18,8 @@ from opcua import Client
from opcua import Server
from opcua import uamethod
from opcua import Event
from opcua import ObjectIds
from opcua import AttributeIds
from opcua.ua import ObjectIds
from opcua.ua import AttributeIds
from opcua.ua import extensionobject_from_binary
from opcua.ua import extensionobject_to_binary
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment