Commit af261b74 authored by Olivier R-D's avatar Olivier R-D

server: add method calls with tests, add republish

and small fixes
parent d4c1320d
......@@ -20,7 +20,6 @@ Client: what works:
Client: what is not implemented yet
* removing nodes
* subscribing to events
* subscribing to status change
* adding missing modify methods
* certificate handling
* user and password
......@@ -36,11 +35,11 @@ Server: what works:
* adding nodes to address space
* tested client: freeopcua C++, freeopcua Python, uaexpert, prosys
* datachange subscriptions
* methods
Server: what is not implemented
* event, status subscriptions
* events
* security (users, certificates, etc)
* methods
* removing nodes
Example client code:
......@@ -90,3 +89,15 @@ Example server code:
...
```
# Development
## Running tests:
python tests.py
## Coverage
coverage run tests.py
coverage html
firefox htmlcov/index.html
import logging
from opcua import Client
from opcua import uaprotocol as ua
class SubHandler(object):
"""
Client to subscription. It will receive events from server
"""
def data_change(self, handle, node, val, attr):
print("Python: New data change event", handle, node, val, attr)
def event(self, handle, event):
print("Python: New event", handle, event)
if __name__ == "__main__":
from IPython import embed
logging.basicConfig(level=logging.WARN)
client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
try:
client.connect()
root = client.get_root_node()
print("Root is", root)
print("childs of root are: ", root.get_children())
print("name of root is", root.get_browse_name())
objects = client.get_objects_node()
print("childs og objects are: ", objects.get_children())
myfloat = client.get_node("ns=4;s=Float")
mydouble = client.get_node("ns=4;s=Double")
myint64 = client.get_node("ns=4;s=Int64")
myuint64 = client.get_node("ns=4;s=UInt64")
myint32 = client.get_node("ns=4;s=Int32")
myuint32 = client.get_node("ns=4;s=UInt32")
var = client.get_node(ua.NodeId("Random1", 5))
print("var is: ", var)
print("value of var is: ", var.get_value())
var.set_value(ua.Variant([23], ua.VariantType.Double))
print("setting float value")
myfloat.set_value(ua.Variant(1.234, ua.VariantType.Float))
print("reading float value: ", myfloat.get_value())
handler = SubHandler()
sub = client.create_subscription(500, handler)
#sub.subscribe_data_change(var)
device = objects.get_child(["2:MyObjects", "2:MyDevice"])
method = device.get_child("2:MyMethod")
result = device.call_method(method, [ua.Variant("sin"), ua.Variant(180, ua.VariantType.Double)])
print("Mehtod result is: ", result)
embed()
client.close_session()
finally:
client.disconnect()
......@@ -16,6 +16,15 @@ class SubHandler(object):
print("Python: New event", handle, event)
def func(parent, var):
print("Got var: ", var)
var.Value *= 2
print(var)
return [ua.Variant(True)]
def func2(parent):
return [ua.Variant(78.90)]
if __name__ == "__main__":
......@@ -24,8 +33,8 @@ if __name__ == "__main__":
#logger = logging.getLogger("opcua.address_space")
#logger = logging.getLogger("opcua.internal_server")
#logger.setLevel(logging.DEBUG)
#logger = logging.getLogger("opcua.subscription_server")
#logger.setLevel(logging.DEBUG)
logger = logging.getLogger("opcua.subscription_server")
logger.setLevel(logging.DEBUG)
# now setup our server
......@@ -46,6 +55,7 @@ if __name__ == "__main__":
myvar = myobj.add_variable(idx, "MyVariable", 6.7)
myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
myprop = myobj.add_property(idx, "myproperty", "I am a property")
mymethod = myobj.add_method(idx, "mymethod", func2, [ua.VariantType.Int64], [ua.VariantType.Boolean])
# starting!
server.start()
......@@ -53,8 +63,8 @@ if __name__ == "__main__":
try:
handler = SubHandler()
#enable following if you want to subscribe to nodes on server side
#sub = server.create_subscription(500, handler)
#handle = sub.subscribe_data_change(myvar)
sub = server.create_subscription(500, handler)
handle = sub.subscribe_data_change(myvar)
#time.sleep(0.1)
#sub.unsubscribe(handle)
#sub.delete()
......
......@@ -23,6 +23,7 @@ class NodeData(object):
self.nodeid = nodeid
self.attributes = {}
self.references = []
self.call = None
def __str__(self):
return "NodeData(id:{}, attrs:{}, refs:{})".format(self.nodeid, self.attributes, self.references)
......@@ -155,8 +156,7 @@ class AddressSpace(object):
try:
v(k, value)
except Exception as ex:
self.logger.warn("Error calling datachange callback %s, %s, %s", k, v, ex)
print(ex)#seems exception is truncated!?!?
self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
return ua.StatusCode()
def add_datachange_callback(self, nodeid, attr, callback):
......@@ -335,8 +335,34 @@ class AddressSpace(object):
#FIXME: here we should check other arguments!!
if ref.BrowseName == el.TargetName:
return ref.NodeId
self.logger.warning("element %s was not found in node %s", el, nodeid)
self.logger.info("element %s was not found in node %s", el, nodeid)
return None
def add_method_callback(self, methodid, callback):
node = self._nodes[methodid]
node.call = callback
def call(self, methods):
results = []
for method in methods:
res = ua.CallMethodResult()
if method.ObjectId not in self._nodes or method.MethodId not in self._nodes:
res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdInvalid)
else:
node = self._nodes[method.MethodId]
if node.call is None:
res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
else:
try:
res.OutputArguments = node.call(method.ObjectId, *method.InputArguments)
for _ in method.InputArguments:
res.InputArgumentResults.append(ua.StatusCode())
except Exception:
self.logger.exception("Error executing method call %s, an exception was raised: ", method)
res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
results.append(res)
return results
......
......@@ -318,7 +318,7 @@ class BinaryClient(object):
if acks is None:
acks = []
request = ua.PublishRequest()
request.SubscriptionAcknowledgements = acks
request.Parameters.SubscriptionAcknowledgements = acks
self._send_request(request, self._call_publish_callback, timeout=0)
def _call_publish_callback(self, future):
......@@ -348,8 +348,6 @@ class BinaryClient(object):
response.ResponseHeader.ServiceResult.check()
return response.Results
def add_nodes(self, nodestoadd):
self.logger.info("add_nodes")
request = ua.AddNodesRequest()
......@@ -358,6 +356,14 @@ class BinaryClient(object):
response = ua.AddNodesResponse.from_binary(data)
response.ResponseHeader.ServiceResult.check()
return response.Results
def call(self, methodstocall):
request = ua.CallRequest()
request.Parameters.MethodsToCall = methodstocall
data = self._send_request(request)
response = ua.CallResponse.from_binary(data)
response.ResponseHeader.ServiceResult.check()
return response.Results
......@@ -25,7 +25,7 @@ class BinaryServer(Thread):
self.iserver = internal_server
def run(self):
logger.info("Starting server on %s:%s", self.hostname, self.port)
logger.warning("Listening on %s:%s", self.hostname, self.port)
socketserver.TCPServer.allow_reuse_address = True #get rid of address already in used warning
self.socket_server = ThreadingTCPServer((self.hostname, self.port), UAHandler)
#self.socket_server.daemon_threads = True # this will force a shutdown of all threads, maybe too hard
......
......@@ -3,7 +3,6 @@ Internal server implementing opcu-ua interface. can be used on server side or to
"""
from datetime import datetime
import uuid
import logging
from threading import Timer, Lock
from enum import Enum
......@@ -155,6 +154,12 @@ class InternalSession(object):
def add_nodes(self, params):
return self.aspace.add_nodes(params)
def add_method_callback(self, methodid, callback):
return self.aspace.add_method_callback(methodid, callback)
def call(self, params):
return self.aspace.call(params)
def create_subscription(self, params, callback):
result = self.submgr.create_subscription(params, callback)
with self._lock:
......@@ -167,6 +172,9 @@ class InternalSession(object):
def modify_monitored_items(self, params):
return self.submgr.modify_monitored_items(params)
def republish(self, params):
return self.submgr.republish(params)
def delete_subscriptions(self, ids):
for i in ids:
with self._lock:
......
......@@ -225,7 +225,7 @@ class Node(object):
return self._add_variable(nodeid, qname, val, isproperty=False)
def _to_variant(self, val, vtype=None):
if type(val) is ua.Variant:
if isinstance(val, ua.Variant):
return val
else:
return ua.Variant(val, vtype)
......@@ -245,7 +245,7 @@ class Node(object):
attrs = ua.VariableAttributes()
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.DataType = self._vtype_to_uatype(val.VariantType)
attrs.DataType = self._guess_uatype(val)
attrs.Value = val
attrs.ValueRank = 0
attrs.WriteMask = 0
......@@ -259,34 +259,95 @@ class Node(object):
def add_method(self, *args):
"""
create a child method object
This is only possible on server side!!
args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types]
or idx, name, method_to_be_called, [input argument types], [output argument types]
if argument types is specified, child nodes advertising what arguments the method uses and returns will be created
a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants
"""
nodeid, qname = self._parse_add_args(*args)
return self._add_object(nodeid, qname)
nodeid, qname = self._parse_add_args(*args[:2])
callback = args[2]
if len(args) > 3:
inputs = args[3]
if len(args) > 4:
outputs = args[4]
return self._add_method(nodeid, qname, callback, inputs, outputs)
def _add_method(self, nodeid, qname):
def _add_method(self, nodeid, qname, callback, inputs, outputs):
node = ua.AddNodesItem()
node.RequestedNewNodeId = nodeid
node.BrowseName = qname
node.NodeClass = ua.NodeClass.Object
node.NodeClass = ua.NodeClass.Method
node.ParentNodeId = self.nodeid
node.ReferenceTypeId = ua.NodeId.from_string("i=35")
node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
node.ReferenceTypeId = ua.NodeId.from_string("i=47")
#node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType)
attrs = ua.MethodAttributes()
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.WriteMask = 0
attrs.UserWriteMask = 0
self.Executable = True
self.UserExecutable = True
attrs.Executable = True
attrs.UserExecutable = True
node.NodeAttributes = attrs
results = self.server.add_nodes([node])
results[0].StatusCode.check()
return Node(self.server, nodeid)
method = Node(self.server, nodeid)
if inputs:
method.add_property(qname.NamespaceIndex, "InputArguments", [self._vtype_to_argument(vtype) for vtype in inputs])
if outputs:
method.add_property(qname.NamespaceIndex, "OutputArguments", [self._vtype_to_argument(vtype) for vtype in inputs])
self.server.add_method_callback(method.nodeid, callback)
return method
def call_method(self, methodid, arguments):
"""
Call an OPC-UA method. methodid is browse name of child method or the
nodeid of method as a NodeId object
arguments is a list of variants which may be of different type
returns a list of variants which are output of the method
"""
if type(methodid) is str:
methodid = self.get_child(methodid).nodeid
elif type(methodid) is Node:
methodid = methodid.nodeid
request = ua.CallMethodRequest()
request.ObjectId = self.nodeid
request.MethodId = methodid
request.InputArguments = arguments
methodstocall = [request]
results = self.server.call(methodstocall)
res = results[0]
arguments = res.InputArgumentResults
#if len(res.OutputArguments) == 0:
#return None
#elif len(res.OutputArguments) == 1:
#return res.OutputArguments[0]
#else:
res.StatusCode.check()
return res.OutputArguments
def _vtype_to_argument(self, vtype):
arg = ua.Argument()
v = ua.Variant(None, vtype)
arg.DataType = self._guess_uatype(v)
return ua.ExtensionObject.from_object(arg)
def _vtype_to_uatype(self, vtype):
return eval("ua.NodeId(ua.ObjectIds.{})".format(vtype.name))
def _guess_uatype(self, variant):
if variant.VariantType == ua.VariantType.ExtensionObject:
if variant.Value is None:
raise Exception("Cannot guess DataType from Null ExtensionObject")
if type(variant.Value) in (list, tuple):
if len(variant.Value) == 0:
raise Exception("Cannot guess DataType from Null ExtensionObject")
extobj = variant.Value[0]
else:
extobj = variant.Value
objectidname = ua.ObjectIdsInv[extobj.TypeId.Identifier]
classname = objectidname.split("_")[0]
return eval("ua.NodeId(ua.ObjectIds.{})".format(classname))
else:
return eval("ua.NodeId(ua.ObjectIds.{})".format(variant.VariantType.name))
def _parse_add_args(self, *args):
if type(args[0]) is ua.NodeId:
......
......@@ -18,8 +18,7 @@ from opcua import Node, Subscription
class Server(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.endpoint = None
self.name = None
self.endpoint = "opc.tcp://localhost:4841/freeopcua/server/"
self.server_uri = "urn:freeopcua:python:server"
self.product_uri = "urn:freeopcua.github.no:python:server"
self.name = "FreeOpcUa Python Server"
......@@ -47,6 +46,7 @@ class Server(object):
appdesc.ApplicationUri = self.server_uri
appdesc.ApplicationType = ua.ApplicationType.Server
appdesc.ProductUri = self.product_uri
appdesc.DiscoveryUrls.append(self.endpoint.geturl())
edp = ua.EndpointDescription()
edp.EndpointUrl = self.endpoint.geturl()
......
......@@ -90,6 +90,10 @@ class SubscriptionManager(Thread):
return res
def publish(self, acks):
for ack in acks:
with self._lock:
if ack.SubscriptionId in self.subscriptions:
self.subscriptions[ack.SubscriptionId].publish(ack.SequenceNumber)
self.logger.info("publish request with acks %s", acks)
def create_monitored_items(self, params):
......@@ -126,6 +130,14 @@ class SubscriptionManager(Thread):
return res
return self.subscriptions[params.SubscriptionId].delete_monitored_items(params.MonitoredItemIds)
def republish(self, params):
with self._lock:
if not params.SubscriptionId in self.subscriptions:
#what should I do?
return ua.NotificationMessage()
return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
class MonitoredItemData(object):
......@@ -151,10 +163,14 @@ class InternalSubscription(object):
self._lock = RLock()
self._triggered_datachanges = []
self._triggered_events = []
self._triggered_statuschanges = []
self._notification_seq = 1
self._not_acknowledged_results = []
self._not_acknowledged_results = {}
self._startup = True
self._keep_alive_count = 0
self._publish_cyles_count = 0
def __str__(self):
return "Subscription(id:{})".format(self.data.SubscriptionId)
......@@ -176,12 +192,10 @@ class InternalSubscription(object):
self.logger.debug("%s loop running", self)
while True:
yield from asyncio.sleep(self.data.RevisedPublishingInterval/1000)
#test disabled we do not check that one since we do not care about not received results
#if self._keep_alive_count > self.data.RevisedLifetimeCount:
#self.logger.warn("Subscription %s has expired, keep alive count(%s) > lifetime count (%s)", self.data.SubscriptionId, self._keep_alive_count, self.data.RevisedLifetimeCount)
#return
try:
self.publish_results()
expired = self.publish_results()
if expired:
self.stop()
except Exception as ex: #we catch everythin since it seems exceptions are lost in loop
self.logger.exception("Exception in %s loop", self)
......@@ -196,11 +210,18 @@ class InternalSubscription(object):
return False
def publish_results(self):
#self.logger.debug("looking for results and publishing")
expired = False
if self._publish_cyles_count > self.data.RevisedLifetimeCount:
self.logger.warn("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
#FIXME this will never be send since we do not have publish request anyway
self.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
expired = True
with self._lock:
if self.has_published_results(): #FIXME: should I pop a publish request here? or I do not care?
if self.has_published_results(): #FIXME: should we pop a publish request here? or we do not care?
self._publish_cyles_count += 1
result = self._pop_publish_result()
self.callback(result)
return expired
def _pop_publish_result(self):
result = ua.PublishResult()
......@@ -216,15 +237,38 @@ class InternalSubscription(object):
notif.Events = self._triggered_events[:]
self._triggered_events.clear()
result.NotificationMessage.NotificationData.append(notif)
#FIXME: add statuschaneg events
if self._triggered_statuschanges:
notif = ua.StatusChangeNotification()
notif.Status = self._triggered_statuschanges.pop(0)
result.NotificationMessage.NotificationData.append(notif)
self._keep_alive_count = 0
self._startup = False
result.NotificationMessage.SequenceNumber = self._notification_seq
self._notification_seq += 1
result.MoreNotifications = False
result.AvailableSequenceNumbers = [res.NotificationMessage.SequenceNumber for res in self._not_acknowledged_results]
self._not_acknowledged_results.append(result)
result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
return result
def trigger_statuschange(self, code):
self._triggered_statuschanges.append(code)
def publish(self, nb):
with self._lock:
self._publish_cyles_count = 0
if nb in self._not_acknowledged_results:
self._not_acknowledged_results.pop(nb)
def republish(self, nb):
self.logger.info("re-publish request for ack %s in subscription %s", nb, self)
with self._lock:
if nb in self._not_acknowledged_results:
self.logger.info("re-publishing ack %s in subscription %s", nb, self)
return self._not_acknowledged_results[nb].NotificationMessage
else:
self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
return ua.NotificationMessage()
def create_monitored_items(self, params):
results = []
......@@ -296,7 +340,6 @@ class InternalSubscription(object):
results.append(ua.StatusCode())
elif self._delete_monitored_datachange(mid):
results.append(ua.StatusCode())
#FIXME add statuschange
else:
results.append(ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid))
return results
......@@ -306,7 +349,7 @@ class InternalSubscription(object):
for k, v in self._monitored_events:
if v == mid:
self._monitored_events.pop(k)
#FIXME we may need to remove events in queue, or we do not care ?
#we do not remove events in queue, or should we care?
return True
return False
......@@ -323,8 +366,6 @@ class InternalSubscription(object):
event = ua.MonitoredItemNotification()
with self._lock:
mdata = self._monitored_datachange[handle]
#event.monitored_item_id = mdata.monitored_item_id
#event.monitored_item_notification.ClientHandle = mdata.client_handle
event.ClientHandle = mdata.client_handle
event.Value = value
self._triggered_datachanges.append(event)
......
......@@ -38,8 +38,8 @@ class UAProcessor(object):
hello = ua.Hello.from_binary(body)
hdr = ua.Header(ua.MessageType.Acknowledge, ua.ChunkType.Single)
ack = ua.Acknowledge()
ack.ReceivebufferSize = hello.ReceiveBufferSize
ack.SendbufferSize = hello.SendBufferSize
ack.ReceiveBufferSize = hello.ReceiveBufferSize
ack.SendBufferSize = hello.SendBufferSize
self._write_socket(hdr, ack)
while True:
......@@ -59,7 +59,8 @@ class UAProcessor(object):
seqhdr.SequenceNumber = self._seq_number
self._seq_number += 1
hdr = ua.Header(msgtype, ua.ChunkType.Single, self.channel.SecurityToken.ChannelId)
algohdr.TokenId = self.channel.SecurityToken.TokenId
if type(algohdr) is ua.SymmetricAlgorithmHeader:
algohdr.TokenId = self.channel.SecurityToken.TokenId
self._write_socket(hdr, algohdr, seqhdr, response)
def _write_socket(self, hdr, *args):
......@@ -268,8 +269,6 @@ class UAProcessor(object):
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
self.logger.info("delete monitored items request")
params = ua.DeleteMonitoredItemsParameters.from_binary(body)
......@@ -285,9 +284,9 @@ class UAProcessor(object):
self.logger.info("publish request")
if not self.session:
return
return False
acks = ua.unpack_array("Int32", body)
params = ua.PublishParameters.from_binary(body)
data = PublishRequestData()
data.requesthdr = requesthdr
......@@ -295,7 +294,18 @@ class UAProcessor(object):
data.algohdr = algohdr
with self._datalock:
self._publishdata_queue.append(data) # will be used to send publish answers from server
self.session.publish(acks)
self.session.publish(params.SubscriptionAcknowledgements)
elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
self.logger.info("re-publish request")
params = ua.RepublishParameters.from_binary(body)
msg = self.session.republish(params)
response = ua.RepublishResponse()
response.NotificationMessage = msg
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
self.logger.info("close secure channel request")
......@@ -304,6 +314,18 @@ class UAProcessor(object):
self.channel = None
return False
elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
self.logger.info("call request")
params = ua.CallParameters.from_binary(body)
results = self.session.call(params.MethodsToCall)
response = ua.CallResponse()
response.Results = results
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
else:
self.logger.warning("Uknown message received %s", typeid)
sf = ua.ServiceFault()
......@@ -312,6 +334,9 @@ class UAProcessor(object):
return True
def _open_secure_channel(self, params):
self.logger.info("open secure channel")
if params.RequestType == ua.SecurityTokenRequestType.Issue:
......
This diff is collapsed.
......@@ -19,13 +19,8 @@ def get_bytes_from_sock(sock, size):
raise SocketClosedException("Server socket has closed")
return io.BytesIO(data)
class LocalizedText(auto.LocalizedText):
def __init__(self, text=""):
auto.LocalizedText.__init__(self)
self.Text = text.encode()
class Hello(object):
class Hello(uatypes.FrozenClass):
def __init__(self):
self.ProtocolVersion = 0
self.ReceiveBufferSize = 65536
......@@ -33,6 +28,7 @@ class Hello(object):
self.MaxMessageSize = 0
self.MaxChunkCount = 0
self.EndpointUrl = ""
self._freeze()
def to_binary(self):
b = []
......@@ -74,12 +70,13 @@ class ChunkType(object):
class Header(object):
class Header(uatypes.FrozenClass):
def __init__(self, msgType=None, chunkType=None, channelid=0):
self.MessageType = msgType
self.ChunkType = chunkType
self.ChannelId = channelid
self.body_size = 0
self._freeze()
def add_size(self, size):
self.body_size += size
......@@ -114,10 +111,11 @@ class Header(object):
__repr__ = __str__
class ErrorMessage(object):
class ErrorMessage(uatypes.FrozenClass):
def __init__(self):
self.Error = uatypes.StatusCode()
self.Reason = ""
self._freeze()
def to_binary(self):
b = []
......@@ -137,13 +135,14 @@ class ErrorMessage(object):
__repr__ = __str__
class Acknowledge:
class Acknowledge(uatypes.FrozenClass):
def __init__(self):
self.ProtocolVersion = 0
self.ReceiveBufferSize = 65536
self.SendBufferSize = 65536
self.MaxMessageSize = 0 #No limits
self.MaxChunkCount = 0 #No limits
self._freeze()
def to_binary(self):
b = []
......@@ -166,25 +165,12 @@ class Acknowledge:
ack.MaxChunkCount = struct.unpack("<I", data.read(4))[0]
return ack
class Error:
def __init__(self):
self.Code = None
self.Reason = None
@staticmethod
def from_binary(data):
obj = Error()
obj.Code = struct.unpack("<I", data.read(4))[0]
size = struct.unpack("<i", data.read(4))[0]
obj.Reason = struct.unpack("<{}s".format(size), data.read(size))[0]
class AsymmetricAlgorithmHeader:
class AsymmetricAlgorithmHeader(uatypes.FrozenClass):
def __init__(self):
self.SecurityPolicyURI = "http://opcfoundation.org/UA/SecurityPolicy#None"
self.SenderCertificate = b""
self.ReceiverCertificateThumbPrint = b""
self._freeze()
def to_binary(self):
b = []
......@@ -206,9 +192,10 @@ class AsymmetricAlgorithmHeader:
__repr__ = __str__
class SymmetricAlgorithmHeader:
class SymmetricAlgorithmHeader(uatypes.FrozenClass):
def __init__(self):
self.TokenId = 0
self._freeze()
@staticmethod
def from_binary(data):
......@@ -224,10 +211,11 @@ class SymmetricAlgorithmHeader:
__repr__ = __str__
class SequenceHeader:
class SequenceHeader(uatypes.FrozenClass):
def __init__(self):
self.SequenceNumber = None
self.RequestId = None
self._freeze()
@staticmethod
def from_binary(data):
......@@ -300,6 +288,6 @@ def downcast_extobject(item):
objectidname = ObjectIdsInv[item.TypeId.Identifier]
classname = objectidname.split("_")[0]
cmd = "{}.from_binary(utils.Buffer(item.to_binary()))".format(classname)
#logger.debug("running %s", cmd)
return eval(cmd)
......@@ -10,6 +10,7 @@ import uuid
import struct
import opcua.status_code as status_code
from opcua.object_ids import ObjectIds
logger = logging.getLogger('opcua.uaprotocol')
......@@ -157,23 +158,6 @@ def unpack_string(data):
#return str(b)
return b.decode("utf-8")
def unpack_array(uatype, data):
length = struct.unpack('<i', data.read(4))[0]
array = []
if length != -1:
for _ in range(0, length):
array.append(unpack_uatype(uatype, data))
return array
def unpack_object_array(objclass, data):
print("DEPRECATED, unpack_objects_array is deprecated, use unpack_array")
length = struct.unpack('<i', data.read(4))[0]
array = []
if length != -1:
for _ in range(0, length):
array.append(objclass.from_binary(data))
return array
def test_bit(data, offset):
mask = 1 << offset
return data & mask
......@@ -444,6 +428,93 @@ class QualifiedName(object):
__repr__ = __str__
class LocalizedText(FrozenClass):
'''
A string qualified with a namespace index.
'''
def __init__(self, text=""):
self.Encoding = 0
self.Text = text.encode()
if self.Text: self.Encoding |= (1 << 1)
self.Locale = b''
self._freeze()
def to_binary(self):
packet = []
if self.Locale: self.Encoding |= (1 << 0)
if self.Text: self.Encoding |= (1 << 1)
packet.append(pack_uatype('UInt8', self.Encoding))
if self.Locale:
packet.append(pack_uatype('CharArray', self.Locale))
if self.Text:
packet.append(pack_uatype('CharArray', self.Text))
return b''.join(packet)
@staticmethod
def from_binary(data):
obj = LocalizedText()
obj.Encoding = unpack_uatype('UInt8', data)
if obj.Encoding & (1 << 0):
obj.Locale = unpack_uatype('CharArray', data)
if obj.Encoding & (1 << 1):
obj.Text = unpack_uatype('CharArray', data)
return obj
def __str__(self):
return 'LocalizedText(' + 'Encoding:' + str(self.Encoding) + ', ' + \
'Locale:' + str(self.Locale) + ', ' + \
'Text:' + str(self.Text) + ')'
def __eq__(self, other):
if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
return True
return False
class ExtensionObject(FrozenClass):
'''
'''
def __init__(self):
self.TypeId = NodeId()
self.Encoding = 0
self.Body = b''
self._freeze()
def to_binary(self):
packet = []
if self.Body: self.Encoding |= (1 << 0)
packet.append(self.TypeId.to_binary())
packet.append(pack_uatype('UInt8', self.Encoding))
if self.Body:
packet.append(pack_uatype('ByteString', self.Body))
return b''.join(packet)
@staticmethod
def from_binary(data):
obj = ExtensionObject()
obj.TypeId = NodeId.from_binary(data)
obj.Encoding = unpack_uatype('UInt8', data)
if obj.Encoding & (1 << 0):
obj.Body = unpack_uatype('ByteString', data)
return obj
@staticmethod
def from_object(obj):
ext = ExtensionObject()
code = "ObjectIds.{}_Encoding_DefaultBinary".format(obj.__class__.__name__)
oid = eval(code)
ext.TypeId = FourByteNodeId(oid)
ext.Body = obj.to_binary()
return ext
def __str__(self):
return 'ExtensionObject(' + 'TypeId:' + str(self.TypeId) + ', ' + \
'Encoding:' + str(self.Encoding) + ', ' + \
'Body:' + str(self.Body) + ')'
__repr__ = __str__
class VariantType(Enum):
'''
The possible types of a variant.
......@@ -507,6 +578,8 @@ class Variant(object):
return VariantType.ByteString
elif type(val) == datetime:
return VariantType.DateTime
elif type(val) == ExtensionObject:
return VariantType.ExtensionObject
else:
raise Exception("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
......
......@@ -14,9 +14,9 @@ IgnoredEnums = []#["IdType", "NodeIdType"]
#we want to implement som struct by hand, to make better interface or simply because they are too complicated
IgnoredStructs = []#["NodeId", "ExpandedNodeId", "Variant", "QualifiedName", "DataValue", "LocalizedText"]#, "ExtensionObject"]
#by default we split requests and respons in header and parameters, but some are so simple we do not split them
NoSplitStruct = ["GetEndpointsResponse", "CloseSessionRequest", "AddNodesResponse", "BrowseResponse", "HistoryReadResponse", "HistoryUpdateResponse", "RegisterServerResponse", "CloseSecureChannelRequest", "CloseSecureChannelResponse", "CloseSessionRequest", "CloseSessionResponse", "UnregisterNodesResponse", "MonitoredItemModifyRequest", "MonitoredItemsCreateRequest", "ReadResponse", "WriteResponse", "TranslateBrowsePathsToNodeIdsResponse", "DeleteSubscriptionsResponse", "DeleteMonitoredItemsResponse", "PublishRequest", "CreateMonitoredItemsResponse", "ServiceFault", "AddReferencesRequest", "AddReferencesResponse", "ModifyMonitoredItemsResponse"]
NoSplitStruct = ["GetEndpointsResponse", "CloseSessionRequest", "AddNodesResponse", "BrowseResponse", "HistoryReadResponse", "HistoryUpdateResponse", "RegisterServerResponse", "CloseSecureChannelRequest", "CloseSecureChannelResponse", "CloseSessionRequest", "CloseSessionResponse", "UnregisterNodesResponse", "MonitoredItemModifyRequest", "MonitoredItemsCreateRequest", "ReadResponse", "WriteResponse", "TranslateBrowsePathsToNodeIdsResponse", "DeleteSubscriptionsResponse", "DeleteMonitoredItemsResponse", "CreateMonitoredItemsResponse", "ServiceFault", "AddReferencesRequest", "AddReferencesResponse", "ModifyMonitoredItemsResponse", "RepublishResponse", "CallResponse"]
#structs that end with Request or Response but are not
NotRequest = ["MonitoredItemCreateRequest", "MonitoredItemModifyRequest"]
NotRequest = ["MonitoredItemCreateRequest", "MonitoredItemModifyRequest", "CallMethodRequest"]
OverrideTypes = {}#AttributeId": "AttributeID", "ResultMask": "BrowseResultMask", "NodeClassMask": "NodeClass", "AccessLevel": "VariableAccessLevel", "UserAccessLevel": "VariableAccessLevel", "NotificationData": "NotificationData"}
OverrideNames = {}#{"RequestHeader": "Header", "ResponseHeader": "Header", "StatusCode": "Status", "NodesToRead": "AttributesToRead"} # "MonitoringMode": "Mode",, "NotificationMessage": "Notification", "NodeIdType": "Type"}
......
......@@ -6,7 +6,7 @@ import struct
import generate_model as gm
IgnoredEnums = ["NodeIdType"]
IgnoredStructs = ["QualifiedName", "NodeId", "ExpandedNodeId", "FilterOperand", "Variant", "DataValue"]
IgnoredStructs = ["QualifiedName", "NodeId", "ExpandedNodeId", "FilterOperand", "Variant", "DataValue", "LocalizedText", "ExtensionObject"]
class CodeGenerator(object):
def __init__(self, model, output):
......
#! /usr/bin/env python
import logging
import math
import io
import sys
from datetime import datetime, timedelta
......@@ -186,6 +187,7 @@ class CommonTests(object):
def test_root(self):
root = self.opc.get_root_node()
self.assertEqual(ua.QualifiedName('Root', 0), root.get_browse_name())
self.assertEqual(ua.LocalizedText('Root'), root.get_display_name())
nid = ua.NodeId(84, 0)
self.assertEqual(nid, root.nodeid)
......@@ -446,13 +448,41 @@ class CommonTests(object):
myid = myvar.nodeid
self.assertEqual(idx, myid.NamespaceIndex)
def test_method(self):
o = self.opc.get_objects_node()
m = o.get_child("2:ServerMethod")
result = o.call_method(m, [ua.Variant(2.1)])
self.assertEqual(result, [ua.Variant(4.2)])
def test_method_array(self):
o = self.opc.get_objects_node()
m = o.get_child("2:ServerMethodArray")
result = o.call_method(m, [ua.Variant("sin"), ua.Variant(math.pi), ])
self.assertTrue(result[0].Value < 0.01)
def add_server_methods(srv):
def func(parent, variant):
variant.Value *= 2
return [variant]
o = srv.get_objects_node()
v = o.add_method(ua.NodeId("ServerMethod", 2), ua.QualifiedName('ServerMethod', 2), func, [ua.VariantType.Int64], [ua.VariantType.Int64])
def func2(parent, methodname, variant):
print("method name is: ", methodname)
val = math.sin(variant.Value)
res = ua.Variant(val)
return [res]
o = srv.get_objects_node()
v = o.add_method(ua.NodeId("ServerMethodArray", 2), ua.QualifiedName('ServerMethodArray', 2), func2, [ua.VariantType.String, ua.VariantType.Int64], [ua.VariantType.Int64])
class ServerProcess(Thread):
'''
Start a server in another process
Start a server in another process/thread
'''
def __init__(self):
Thread.__init__(self)
......@@ -463,6 +493,9 @@ class ServerProcess(Thread):
def run(self):
self.srv = Server()
self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num1)
add_server_methods(self.srv)
self.srv.start()
self.started.set()
while not self._exit.is_set():
......@@ -519,6 +552,7 @@ class TestServer(unittest.TestCase, CommonTests):
def setUpClass(self):
self.srv = Server()
self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num2)
add_server_methods(self.srv)
self.srv.start()
self.opc = self.srv
......@@ -549,6 +583,15 @@ class TestServer(unittest.TestCase, CommonTests):
myid = myvar.nodeid
self.assertEqual(idx, myid.NamespaceIndex)
def test_server_method(self):
def func(parent, variant):
variant.Value *= 2
return [variant]
o = self.opc.get_objects_node()
v = o.add_method(3, 'Method1', func, [ua.VariantType.Int64], [ua.VariantType.Int64])
result = o.call_method(v, [ua.Variant(2.1)])
self.assertEqual(result, [ua.Variant(4.2)])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment