Commit 214f2528 authored by Olivier R-D's avatar Olivier R-D

more pythonic method call, decorator and more tests

parent af261b74
......@@ -27,7 +27,7 @@ if __name__ == "__main__":
root = client.get_root_node()
print(root)
print(root.get_children())
print(root.get_name())
print(root.get_browse_name())
#var = client.get_node(ua.NodeId(1002, 2))
#print(var)
#print(var.get_value())
......@@ -35,6 +35,7 @@ if __name__ == "__main__":
state = root.get_child(["0:Objects", "0:Server"])
print(state)
myvar = root.get_child(["0:Objects", "2:NewObject", "2:MyVariable"])
obj = root.get_child(["0:Objects", "2:NewObject"])
print("yvar is: ", myvar)
handler = SubHandler()
sub = client.create_subscription(500, handler)
......@@ -42,6 +43,11 @@ if __name__ == "__main__":
time.sleep(0.1)
#sub.unsubscribe(handle)
#sub.delete()
#calling a method on server
res = obj.call_method("2:multiply", 3, "klk")
print("method result is: ", res)
embed()
finally:
client.disconnect()
import time
import logging
from opcua import ua
from opcua.server import Server
from opcua import ua, uamethod, Server
from IPython import embed
......@@ -15,16 +15,16 @@ class SubHandler(object):
def event(self, handle, event):
print("Python: New event", handle, event)
def func(parent, var):
print("Got var: ", var)
var.Value *= 2
print(var)
return [ua.Variant(True)]
def func2(parent):
#method to be exposed through server
def func(parent):
return [ua.Variant(78.90)]
#method to be exposed through server
# uses a decorator to automatically convert to and from variants
@uamethod
def multiply(parent, x, y):
print("OKOK", x, y)
return x*y
if __name__ == "__main__":
......@@ -56,6 +56,7 @@ if __name__ == "__main__":
myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
myprop = myobj.add_property(idx, "myproperty", "I am a property")
mymethod = myobj.add_method(idx, "mymethod", func2, [ua.VariantType.Int64], [ua.VariantType.Boolean])
multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
# starting!
server.start()
......
......@@ -8,3 +8,22 @@ from opcua.node import Node
from opcua.subscription import Subscription
from opcua.client import Client
from opcua.server import Server
def uamethod(func):
"""
Method decorator to automatically convert
arguments and output to and from variants
"""
def wrapper(parent, *args):
result = func(parent, *[arg.Value for arg in args])
return to_variant(result)
return wrapper
def to_variant(*args):
uaargs = []
for arg in args:
uaargs.append(ua.Variant(arg))
return uaargs
......@@ -300,17 +300,25 @@ class Node(object):
return method
def call_method(self, methodid, arguments):
def call_method(self, methodid, *args):
"""
Call an OPC-UA method. methodid is browse name of child method or the
nodeid of method as a NodeId object
arguments is a list of variants which may be of different type
arguments are variants or python object convertible to variants.
which may be of different types
returns a list of variants which are output of the method
"""
if type(methodid) is str:
methodid = self.get_child(methodid).nodeid
elif type(methodid) is Node:
methodid = methodid.nodeid
arguments = []
for arg in args:
if not isinstance(arg, ua.Variant):
arg = ua.Variant(arg)
arguments.append(arg)
request = ua.CallMethodRequest()
request.ObjectId = self.nodeid
request.MethodId = methodid
......@@ -318,14 +326,13 @@ class Node(object):
methodstocall = [request]
results = self.server.call(methodstocall)
res = results[0]
arguments = res.InputArgumentResults
#if len(res.OutputArguments) == 0:
#return None
#elif len(res.OutputArguments) == 1:
#return res.OutputArguments[0]
#else:
res.StatusCode.check()
return res.OutputArguments
if len(res.OutputArguments) == 0:
return None
elif len(res.OutputArguments) == 1:
return res.OutputArguments[0].Value
else:
return [var.Value for var in res.OutputArguments]
def _vtype_to_argument(self, vtype):
arg = ua.Argument()
......
......@@ -26,15 +26,17 @@ class Server(object):
self.iserver = InternalServer()
self.bserver = None
#setup some expected values
self.register_namespace(self.server_uri)
sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
sa_node.set_value([self.server_uri])
def set_endpoint(self, url):
self.endpoint = urlparse(url)
def _setup_server_nodes(self):
#to be called just before starting server since it needs all parameters to be setup
self._set_endpoints()
self.register_namespace(self.server_uri)
sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
sa_node.set_value([self.server_uri])
def _set_endpoints(self):
idtoken = ua.UserTokenPolicy()
......
......@@ -16,6 +16,7 @@ from threading import Condition
from opcua import ua
from opcua import Client
from opcua import Server
from opcua import uamethod
port_num1 = 48410
port_num2 = 48430
......@@ -451,33 +452,48 @@ class CommonTests(object):
def test_method(self):
o = self.opc.get_objects_node()
m = o.get_child("2:ServerMethod")
result = o.call_method(m, [ua.Variant(2.1)])
self.assertEqual(result, [ua.Variant(4.2)])
result = o.call_method("2:ServerMethod", 2.1)
self.assertEqual(result, 4.2)
def test_method_array(self):
o = self.opc.get_objects_node()
m = o.get_child("2:ServerMethodArray")
result = o.call_method(m, [ua.Variant("sin"), ua.Variant(math.pi), ])
self.assertTrue(result[0].Value < 0.01)
result = o.call_method(m, "sin", ua.Variant(math.pi))
self.assertTrue(result < 0.01)
def test_method_array2(self):
o = self.opc.get_objects_node()
m = o.get_child("2:ServerMethodArray2")
result = o.call_method(m, [1.1, 3.4, 9])
self.assertEqual(result, [2.2, 6.8, 18])
def add_server_methods(srv):
def func(parent, variant):
variant.Value *= 2
return [variant]
@uamethod
def func(parent, value):
return value * 2
o = srv.get_objects_node()
v = o.add_method(ua.NodeId("ServerMethod", 2), ua.QualifiedName('ServerMethod', 2), func, [ua.VariantType.Int64], [ua.VariantType.Int64])
def func2(parent, methodname, variant):
print("method name is: ", methodname)
val = math.sin(variant.Value)
res = ua.Variant(val)
return [res]
@uamethod
def func2(parent, methodname, value):
return math.sin(value)
o = srv.get_objects_node()
v = o.add_method(ua.NodeId("ServerMethodArray", 2), ua.QualifiedName('ServerMethodArray', 2), func2, [ua.VariantType.String, ua.VariantType.Int64], [ua.VariantType.Int64])
@uamethod
def func3(parent, mylist):
return [i * 2 for i in mylist]
o = srv.get_objects_node()
v = o.add_method(ua.NodeId("ServerMethodArray2", 2), ua.QualifiedName('ServerMethodArray2', 2), func3, [ua.VariantType.Int64], [ua.VariantType.Int64])
class ServerProcess(Thread):
......@@ -589,8 +605,8 @@ class TestServer(unittest.TestCase, CommonTests):
return [variant]
o = self.opc.get_objects_node()
v = o.add_method(3, 'Method1', func, [ua.VariantType.Int64], [ua.VariantType.Int64])
result = o.call_method(v, [ua.Variant(2.1)])
self.assertEqual(result, [ua.Variant(4.2)])
result = o.call_method(v, ua.Variant(2.1))
self.assertEqual(result, 4.2)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment