Commit 37f69181 authored by Olivier R-D's avatar Olivier R-D

autopep8

parent e6300bec
......@@ -3,10 +3,13 @@ import logging
from opcua import Client
from opcua import uaprotocol as ua
class SubHandler(object):
"""
Client to subscription. It will receive events from server
"""
def data_change(self, handle, node, val, attr):
print("Python: New data change event", handle, node, val, attr)
......@@ -14,7 +17,6 @@ class SubHandler(object):
print("Python: New event", handle, event)
if __name__ == "__main__":
from IPython import embed
logging.basicConfig(level=logging.WARN)
......@@ -42,7 +44,6 @@ if __name__ == "__main__":
myfloat.set_value(ua.Variant(1.234, ua.VariantType.Float))
print("reading float value: ", myfloat.get_value())
handler = SubHandler()
sub = client.create_subscription(500, handler)
sub.subscribe_data_change(var)
......@@ -52,7 +53,6 @@ if __name__ == "__main__":
result = device.call_method(method, ua.Variant("sin"), ua.Variant(180, ua.VariantType.Double))
print("Mehtod result is: ", result)
embed()
client.close_session()
finally:
......
......@@ -5,6 +5,7 @@ try:
from IPython import embed
except ImportError:
import code
def embed():
vars = globals()
vars.update(locals())
......@@ -15,10 +16,13 @@ except ImportError:
from opcua import Client
from opcua import uaprotocol as ua
class SubHandler(object):
"""
Client to subscription. It will receive events from server
"""
def data_change(self, handle, node, val, attr):
print("Python: New data change event", handle, node, val, attr)
......@@ -26,11 +30,10 @@ class SubHandler(object):
print("Python: New event", handle, event)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
#logger = logging.getLogger("KeepAlive")
#logger.setLevel(logging.DEBUG)
# logger.setLevel(logging.DEBUG)
client = Client("opc.tcp://localhost:4841/freeopcua/server/")
try:
client.connect()
......@@ -39,8 +42,8 @@ if __name__ == "__main__":
print(root.get_children())
print(root.get_browse_name())
#var = client.get_node(ua.NodeId(1002, 2))
#print(var)
#print(var.get_value())
# print(var)
# print(var.get_value())
#var.set_value(ua.Variant([23], ua.VariantType.Int64))
state = root.get_child(["0:Objects", "0:Server"])
print(state)
......@@ -52,10 +55,10 @@ if __name__ == "__main__":
handle = sub.subscribe_data_change(myvar)
time.sleep(0.1)
sub.subscribe_events()
#sub.unsubscribe(handle)
#sub.delete()
# sub.unsubscribe(handle)
# sub.delete()
#calling a method on server
# calling a method on server
res = obj.call_method("2:multiply", 3, "klk")
print("method result is: ", res)
......
......@@ -5,6 +5,7 @@ try:
from IPython import embed
except ImportError:
import code
def embed():
vars = globals()
vars.update(locals())
......@@ -16,36 +17,41 @@ from opcua import ua, uamethod, Server, Event, ObjectIds
class SubHandler(object):
"""
Client to subscription. It will receive events from server
"""
def data_change(self, handle, node, val, attr):
print("Python: New data change event", handle, node, val, attr)
def event(self, handle, event):
print("Python: New event", handle, event)
#method to be exposed through server
# method to be exposed through server
def func(parent, variant):
return [variant.Value * 2]
#method to be exposed through server
# method to be exposed through server
# uses a decorator to automatically convert to and from variants
@uamethod
def multiply(parent, x, y):
print("multiply method call with parameters: ", x, y)
return x*y
return x * y
if __name__ == "__main__":
#optional setup logging
# optional setup logging
logging.basicConfig(level=logging.WARN)
#logger = logging.getLogger("opcua.address_space")
#logger = logging.getLogger("opcua.internal_server")
#logger.setLevel(logging.DEBUG)
# logger.setLevel(logging.DEBUG)
#logger = logging.getLogger("opcua.subscription_server")
#logger.setLevel(logging.DEBUG)
# logger.setLevel(logging.DEBUG)
# now setup our server
server = Server()
......@@ -79,7 +85,7 @@ if __name__ == "__main__":
print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
try:
handler = SubHandler()
#enable following if you want to subscribe to nodes on server side
# enable following if you want to subscribe to nodes on server side
sub = server.create_subscription(500, handler)
handle = sub.subscribe_data_change(myvar)
# trigger event, all subscribed clients wil receive it
......@@ -88,4 +94,3 @@ if __name__ == "__main__":
embed()
finally:
server.stop()
......@@ -23,10 +23,9 @@ def uamethod(func):
return to_variant(result)
return wrapper
def to_variant(*args):
uaargs = []
for arg in args:
uaargs.append(ua.Variant(arg))
return uaargs
......@@ -4,7 +4,9 @@ import pickle
from opcua import ua
class AttributeValue(object):
def __init__(self, value):
self.value = value
self.value_callback = None
......@@ -14,7 +16,9 @@ class AttributeValue(object):
return "AttributeValue({})".format(self.value)
__repr__ = __str__
class NodeData(object):
def __init__(self, nodeid):
self.nodeid = nodeid
self.attributes = {}
......@@ -27,6 +31,7 @@ class NodeData(object):
class AttributeService(object):
def __init__(self, aspace):
self.logger = logging.getLogger(__name__)
self._aspace = aspace
......@@ -45,7 +50,9 @@ class AttributeService(object):
res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
return res
class ViewService(object):
def __init__(self, aspace):
self.logger = logging.getLogger(__name__)
self._aspace = aspace
......@@ -140,7 +147,7 @@ class ViewService(object):
with self._aspace.lock:
nodedata = self._aspace.nodes[nodeid]
for ref in nodedata.references:
#FIXME: here we should check other arguments!!
# FIXME: here we should check other arguments!!
if ref.BrowseName == el.TargetName:
return ref.NodeId
self.logger.info("element %s was not found in node %s", el, nodeid)
......@@ -148,6 +155,7 @@ class ViewService(object):
class NodeManagementService(object):
def __init__(self, aspace):
self.logger = logging.getLogger(__name__)
self._aspace = aspace
......@@ -167,14 +175,14 @@ class NodeManagementService(object):
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNodeIdExists)
return result
nodedata = NodeData(item.RequestedNewNodeId)
#add common attrs
# add common attrs
nodedata.attributes[ua.AttributeIds.NodeId] = AttributeValue(ua.DataValue(ua.Variant(item.RequestedNewNodeId, ua.VariantType.NodeId)))
nodedata.attributes[ua.AttributeIds.BrowseName] = AttributeValue(ua.DataValue(ua.Variant(item.BrowseName, ua.VariantType.QualifiedName)))
nodedata.attributes[ua.AttributeIds.NodeClass] = AttributeValue(ua.DataValue(ua.Variant(item.NodeClass, ua.VariantType.Int32)))
#add requested attrs
# add requested attrs
self._add_nodeattributes(item.NodeAttributes, nodedata)
#add parent
# add parent
if item.ParentNodeId == ua.NodeId():
#self.logger.warning("add_node: creating node %s without parent", item.RequestedNewNodeId)
pass
......@@ -193,10 +201,10 @@ class NodeManagementService(object):
desc.IsForward = True
self._aspace.nodes[item.ParentNodeId].references.append(desc)
#now add our node to db
# now add our node to db
self._aspace.nodes[item.RequestedNewNodeId] = nodedata
#add type definition
# add type definition
if item.TypeDefinition != ua.NodeId():
addref = ua.AddReferencesItem()
addref.SourceNodeId = item.RequestedNewNodeId
......@@ -270,6 +278,7 @@ class NodeManagementService(object):
class MethodService(object):
def __init__(self, aspace):
self.logger = logging.getLogger(__name__)
self._aspace = aspace
......@@ -301,16 +310,18 @@ class MethodService(object):
class AddressSpace(object):
"""
The address space object stores all the nodes og the OPC-UA server
and helper methods.
It is NOT threadsafe, lock the lock object before reading of modifying
a node
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.nodes = {}
self.lock = RLock() #FIXME: should use multiple reader, one writter pattern
self.lock = RLock() # FIXME: should use multiple reader, one writter pattern
self._datachange_callback_counter = 200
self._handle_to_attribute_map = {}
......@@ -386,13 +397,3 @@ class AddressSpace(object):
with self.lock:
node = self.nodes[methodid]
node.call = callback
......@@ -12,6 +12,7 @@ import opcua.utils as utils
class BinaryClient(object):
"""
low level OPC-UA client.
implement all(well..one day) methods defined in opcua spec
......@@ -19,6 +20,7 @@ class BinaryClient(object):
in python most of the structures are defined in
uaprotocol_auto.py and uaprotocol_hand.py
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._socket = None
......@@ -43,9 +45,9 @@ class BinaryClient(object):
self._thread.start()
def _send_request(self, request, callback=None, timeout=1000):
#HACK to make sure we can convert our request to binary before increasing request counter etc ...
# HACK to make sure we can convert our request to binary before increasing request counter etc ...
request.to_binary()
#END HACK
# END HACK
with self._lock:
request.RequestHeader = self._create_request_header(timeout)
hdr = ua.Header(ua.MessageType.SecureMessage, ua.ChunkType.Single, self._security_token.ChannelId)
......@@ -62,7 +64,7 @@ class BinaryClient(object):
return data
def _check_answer(self, data, context):
data = data.copy(50)#FIXME check max length nodeid + responseheader
data = data.copy(50) # FIXME check max length nodeid + responseheader
typeid = ua.NodeId.from_binary(data)
if typeid == ua.FourByteNodeId(ua.ObjectIds.ServiceFault_Encoding_DefaultBinary):
self.logger.warning("ServiceFault from server received %s", context)
......@@ -159,14 +161,13 @@ class BinaryClient(object):
hdr.RequestId = self._request_id
return hdr
def connect_socket(self, host, port):
"""
connect to server socket and start receiving thread
"""
self.logger.info("opening connection")
self._socket = socket.create_connection((host, port))
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)#nodelay ncessary to avoid packing in one frame, some servers do not like it
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # nodelay ncessary to avoid packing in one frame, some servers do not like it
self.start()
def disconnect_socket(self):
......@@ -229,7 +230,7 @@ class BinaryClient(object):
request.DeleteSubscriptions = deletesubscriptions
data = self._send_request(request)
ua.CloseSessionResponse.from_binary(data)
#response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
# response.ResponseHeader.ServiceResult.check() #disabled, it seems we sent wrong session Id, but where is the sessionId supposed to be sent???
def browse(self, parameters):
self.logger.info("browse")
......@@ -281,7 +282,7 @@ class BinaryClient(object):
seqhdr = self._create_sequence_header()
self._write_socket(hdr, symhdr, seqhdr, request)
#some servers send a response here, most do not ... so we ignore
# some servers send a response here, most do not ... so we ignore
def translate_browsepaths_to_nodeids(self, browsepaths):
self.logger.info("translate_browsepath_to_nodeid")
......@@ -326,10 +327,9 @@ class BinaryClient(object):
response = ua.PublishResponse.from_binary(future.result())
try:
self._publishcallbacks[response.Parameters.SubscriptionId](response.Parameters)
except Exception as ex: #we call client code, catch everything!
except Exception as ex: # we call client code, catch everything!
self.logger.exception("Exception while calling user callback")
def create_monitored_items(self, params):
self.logger.info("create_monitored_items")
request = ua.CreateMonitoredItemsRequest()
......@@ -364,6 +364,3 @@ class BinaryClient(object):
response = ua.CallResponse.from_binary(data)
response.ResponseHeader.ServiceResult.check()
return response.Results
......@@ -14,10 +14,13 @@ from opcua.uaprocessor import UAProcessor
logger = logging.getLogger(__name__)
class BinaryServer(Thread):
"""
Socket server forwarding request to internal server
"""
def __init__(self, internal_server, hostname, port):
Thread.__init__(self)
self.socket_server = None
......@@ -33,10 +36,10 @@ class BinaryServer(Thread):
def run(self):
logger.warning("Listening on %s:%s", self.hostname, self.port)
socketserver.TCPServer.allow_reuse_address = True #get rid of address already in used warning
socketserver.TCPServer.allow_reuse_address = True # get rid of address already in used warning
self.socket_server = ThreadingTCPServer((self.hostname, self.port), UAHandler)
#self.socket_server.daemon_threads = True # this will force a shutdown of all threads, maybe too hard
self.socket_server.internal_server = self.iserver #allow handler to acces server properties
# self.socket_server.daemon_threads = True # this will force a shutdown of all threads, maybe too hard
self.socket_server.internal_server = self.iserver # allow handler to acces server properties
with self._cond:
self._cond.notify_all()
self.socket_server.serve_forever()
......@@ -47,6 +50,7 @@ class BinaryServer(Thread):
class UAHandler(socketserver.BaseRequestHandler):
"""
The RequestHandler class for our server.
......@@ -65,5 +69,3 @@ class UAHandler(socketserver.BaseRequestHandler):
class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
from __future__ import division #support for python2
from __future__ import division # support for python2
from threading import Thread, Condition
import logging
try:
from urllib.parse import urlparse
except ImportError: #support for python2
except ImportError: # support for python2
from urlparse import urlparse
from opcua import uaprotocol as ua
......@@ -12,10 +12,12 @@ from opcua import utils
class KeepAlive(Thread):
"""
Used by Client to keep session opened.
OPCUA defines timeout both for sessions and secure channel
"""
def __init__(self, client, timeout):
Thread.__init__(self)
self.logger = logging.getLogger(__name__)
......@@ -31,7 +33,7 @@ class KeepAlive(Thread):
server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
while not self._dostop:
with self._cond:
self._cond.wait(self.timeout/1000)
self._cond.wait(self.timeout / 1000)
if self._dostop:
break
self.logger.debug("renewing channel")
......@@ -48,6 +50,7 @@ class KeepAlive(Thread):
class Client(object):
"""
High level client to connect to an OPC-UA server.
This class makes it easy to connect and browse address space.
......@@ -57,6 +60,7 @@ class Client(object):
which offers a raw OPC-UA interface.
"""
def __init__(self, url):
"""
used url argument to connect to server.
......@@ -124,7 +128,7 @@ class Client(object):
Send OPC-UA hello to server
"""
ack = self.bclient.send_hello(self.server_url.geturl())
#FIXME check ack
# FIXME check ack
def open_secure_channel(self, renew=False):
"""
......@@ -165,10 +169,10 @@ class Client(object):
params.EndpointUrl = self.server_url.geturl()
params.SessionName = self.description + " Session" + str(self._session_counter)
params.RequestedSessionTimeout = 3600000
params.MaxResponseMessageSize = 0 #means not max size
params.MaxResponseMessageSize = 0 # means not max size
response = self.bclient.create_session(params)
self.session_timeout = response.RevisedSessionTimeout
self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7) #0.7 is from spec
self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7) # 0.7 is from spec
self.keepalive.start()
return response
......@@ -224,8 +228,3 @@ class Client(object):
def get_namespace_index(self, uri):
uries = self.get_namespace_array()
return uries.index(uri)
......@@ -9,6 +9,7 @@ import uuid
class Event(object):
"""
Create an event based on an event type. Per default is BaseEventType used.
arguments are:
......@@ -16,6 +17,7 @@ class Event(object):
source: The emiting source for the node, either an objectId, NodeId or a Node
etype: The event type, either an objectId, a NodeId or a Node object
"""
def __init__(self, isession, etype=ObjectIds.BaseEventType, source=ObjectIds.Server):
self.isession = isession
......@@ -34,7 +36,7 @@ class Event(object):
else:
self.SourceNode = ua.NodeId(source)
#set some default values for attributes from BaseEventType, thus that all event must have
# set some default values for attributes from BaseEventType, thus that all event must have
self.EventId = uuid.uuid4().bytes
self.EventType = self.node.nodeid
self.LocaleTime = datetime.now()
......@@ -44,7 +46,7 @@ class Event(object):
self.Severity = ua.Variant(1, ua.VariantType.UInt16)
self.SourceName = "Server"
#og set some node attributed we also are expected to have
# og set some node attributed we also are expected to have
self.BrowseName = self.node.get_browse_name()
self.DisplayName = self.node.get_display_name()
self.NodeId = self.node.nodeid
......@@ -63,5 +65,3 @@ class Event(object):
for desc in references:
node = Node(self.isession, desc.NodeId)
setattr(self, desc.BrowseName.Name, node.get_value())
......@@ -10,7 +10,7 @@ from threading import Thread
from enum import Enum
import functools
try:
#we prefer to use bundles asyncio version, otherwise fallback to trollius
# we prefer to use bundles asyncio version, otherwise fallback to trollius
import asyncio
except ImportError:
import trollius as asyncio
......@@ -30,6 +30,7 @@ from opcua import standard_address_space
class ThreadLoop(Thread):
def __init__(self):
Thread.__init__(self)
self.logger = logging.getLogger(__name__)
......@@ -69,7 +70,9 @@ class SessionState(Enum):
Activated = 1
Closed = 2
class InternalServer(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.endpoints = []
......@@ -81,7 +84,7 @@ class InternalServer(object):
self.method_service = MethodService(self.aspace)
self.node_mgt_service = NodeManagementService(self.aspace)
standard_address_space.fill_address_space(self.node_mgt_service)
#standard_address_space.fill_address_space_from_disk(self.aspace)
# standard_address_space.fill_address_space_from_disk(self.aspace)
self.loop = ThreadLoop()
self.subcsription_service = SubscriptionService(self.loop, self.aspace)
......@@ -124,15 +127,17 @@ class InternalServer(object):
def get_endpoints(self, params=None):
self.logger.info("get endpoint")
#FIXME check params
# FIXME check params
return self.endpoints[:]
def create_session(self, name):
return InternalSession(self, self.aspace, self.subcsription_service, name)
class InternalSession(object):
_counter = 10
_auth_counter = 1000
def __init__(self, internal_server, aspace, submgr, name):
self.logger = logging.getLogger(__name__)
self.iserver = internal_server
......@@ -235,5 +240,3 @@ class InternalSession(object):
if acks is None:
acks = []
return self.subscription_service.publish(acks)
......@@ -5,11 +5,14 @@ and browse address space
import opcua.uaprotocol as ua
class Node(object):
"""
High level node object, to access node attribute,
browse and populate address space
"""
def __init__(self, server, nodeid):
self.server = server
self.nodeid = None
......@@ -21,6 +24,7 @@ class Node(object):
self.nodeid = ua.NodeId(nodeid, 0)
else:
raise Exception("argument to node must be a NodeId object or a string defining a nodeid found {} of type {}".format(nodeid, type(nodeid)))
def __eq__(self, other):
if isinstance(other, Node) and self.nodeid == other.nodeid:
return True
......@@ -73,7 +77,7 @@ class Node(object):
An exception will be generated for other node types.
"""
variant = None
if type(value) == ua.Variant:
if isinstance(value, ua.Variant):
variant = value
else:
variant = ua.Variant(value, varianttype)
......@@ -172,7 +176,7 @@ class Node(object):
el.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HierarchicalReferences)
el.IsInverse = False
el.IncludeSubtypes = True
if type(item) == ua.QualifiedName:
if isinstance(item, ua.QualifiedName):
el.TargetName = item
else:
el.TargetName = ua.QualifiedName.from_string(item)
......@@ -183,7 +187,7 @@ class Node(object):
result = self.server.translate_browsepaths_to_nodeids([bpath])
result = result[0]
result.StatusCode.check()
#FIXME: seems this method may return several nodes
# FIXME: seems this method may return several nodes
return Node(self.server, result.Targets[0].TargetId)
def add_folder(self, *args):
......@@ -333,7 +337,6 @@ class Node(object):
self.server.add_method_callback(method.nodeid, callback)
return method
def call_method(self, methodid, *args):
"""
Call an OPC-UA method. methodid is browse name of child method or the
......@@ -342,9 +345,9 @@ class Node(object):
which may be of different types
returns a list of variants which are output of the method
"""
if type(methodid) is str:
if isinstance(methodid, str):
methodid = self.get_child(methodid).nodeid
elif type(methodid) is Node:
elif isinstance(methodid, Node):
methodid = methodid.nodeid
arguments = []
......@@ -391,21 +394,20 @@ class Node(object):
return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
def _parse_add_args(self, *args):
if type(args[0]) is ua.NodeId:
if isinstance(args[0], ua.NodeId):
return args[0], args[1]
elif type(args[0]) is str:
elif isinstance(args[0], str):
return ua.NodeId.from_string(args[0]), ua.QualifiedName.from_string(args[1])
elif type(args[0]) is int:
elif isinstance(args[0], int):
return generate_nodeid(args[0]), ua.QualifiedName(args[1], args[0])
else:
raise TypeError("Add methods takes a nodeid and a qualifiedname as argument, received %s" % args)
__nodeid_counter = 2000
def generate_nodeid(idx):
global __nodeid_counter
__nodeid_counter += 1
return ua.NodeId(__nodeid_counter, idx)
......@@ -16,6 +16,7 @@ from opcua import Node, Subscription, ObjectIds, Event
class Server(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.endpoint = "opc.tcp://localhost:4841/freeopcua/server/"
......@@ -26,7 +27,7 @@ class Server(object):
self.iserver = InternalServer()
self.bserver = None
#setup some expected values
# setup some expected values
self.register_namespace(self.server_uri)
sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
sa_node.set_value([self.server_uri])
......@@ -38,7 +39,7 @@ class Server(object):
return self.iserver.get_endpoints()
def _setup_server_nodes(self):
#to be called just before starting server since it needs all parameters to be setup
# to be called just before starting server since it needs all parameters to be setup
self._set_endpoints()
def _set_endpoints(self):
......@@ -116,7 +117,7 @@ class Server(object):
uries = ns_node.get_value()
uries.append(uri)
ns_node.set_value(uries)
return (len(uries)-1)
return (len(uries) - 1)
def get_namespace_index(self, uri):
uries = self.get_namespace_array()
......@@ -124,8 +125,3 @@ class Server(object):
def get_event_object(self, etype=ObjectIds.BaseEventType, source=ObjectIds.Server):
return Event(self.iserver.isession, etype, source)
......@@ -23,9 +23,8 @@ def fill_address_space(nodeservice):
create_standard_address_space_Part11(nodeservice)
create_standard_address_space_Part13(nodeservice)
def fill_address_space_from_disk(aspace):
dirname = os.path.dirname(opcua.__file__)
path = os.path.join(dirname, "binary_address_space.pickle")
aspace.load(path)
......@@ -12,14 +12,16 @@ from opcua import ObjectIds
from opcua import AttributeIds
from opcua import Event
class EventResult():
def __str__(self):
return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
__repr__ = __str__
class SubscriptionItemData():
def __init__(self):
self.node = None
self.client_handle = None
......@@ -27,18 +29,20 @@ class SubscriptionItemData():
self.attribute = None
self.mfilter = None
class Subscription(object):
def __init__(self, server, params, handler):
self.logger = logging.getLogger(__name__)
self.server = server
self._client_handle = 200
self._handler = handler
self.parameters = params #move to data class
self.parameters = params # move to data class
self._monitoreditems_map = {}
self._lock = RLock()
self.subscription_id = None
response = self.server.create_subscription(params, self.publish_callback)
self.subscription_id = response.SubscriptionId #move to data class
self.subscription_id = response.SubscriptionId # move to data class
self.server.publish()
self.server.publish()
......@@ -133,7 +137,7 @@ class Subscription(object):
rv = ua.ReadValueId()
rv.NodeId = node.nodeid
rv.AttributeId = attr
#rv.IndexRange //We leave it null, then the entire array is returned
# rv.IndexRange //We leave it null, then the entire array is returned
mparams = ua.MonitoringParameters()
self._client_handle += 1
mparams.ClientHandle = self._client_handle
......@@ -176,6 +180,3 @@ class Subscription(object):
params.MonitoredItemIds = [handle]
results = self.server.delete_monitored_items(params)
results[0].check()
......@@ -8,7 +8,9 @@ import logging
from opcua import ua
class SubscriptionService(object):
def __init__(self, loop, aspace):
self.logger = logging.getLogger(__name__)
self.loop = loop
......@@ -90,7 +92,7 @@ class SubscriptionService(object):
def republish(self, params):
with self._lock:
if not params.SubscriptionId in self.subscriptions:
#what should I do?
# what should I do?
return ua.NotificationMessage()
return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
......@@ -100,8 +102,8 @@ class SubscriptionService(object):
sub.trigger_event(event)
class MonitoredItemData(object):
def __init__(self):
self.client_handle = None
self.callback_handle = None
......@@ -111,6 +113,7 @@ class MonitoredItemData(object):
class InternalSubscription(object):
def __init__(self, manager, data, addressspace, callback):
self.logger = logging.getLogger(__name__)
self.aspace = addressspace
......@@ -151,7 +154,7 @@ class InternalSubscription(object):
def _subscription_loop(self):
#self.logger.debug("%s loop", self)
if not self._stopev:
self.manager.loop.call_later(self.data.RevisedPublishingInterval/1000.0, self._sub_loop)
self.manager.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
def _sub_loop(self):
self.publish_results()
......@@ -170,11 +173,11 @@ class InternalSubscription(object):
def publish_results(self):
if self._publish_cycles_count > self.data.RevisedLifetimeCount:
self.logger.warn("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
#FIXME this will never be send since we do not have publish request anyway
# FIXME this will never be send since we do not have publish request anyway
self.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
self._stopev = True
with self._lock:
if self.has_published_results(): #FIXME: should we pop a publish request here? or we do not care?
if self.has_published_results(): # FIXME: should we pop a publish request here? or we do not care?
self._publish_cycles_count += 1
result = self._pop_publish_result()
self.callback(result)
......@@ -248,11 +251,11 @@ class InternalSubscription(object):
result = ua.MonitoredItemCreateResult()
if mdata.monitored_item_id == params.MonitoredItemId:
result.RevisedSamplingInterval = self.data.RevisedPublishingInterval
result.RevisedQueueSize = ua.downcast_extobject(params.RequestedParameters.QueueSize) #FIXME check and use value
result.RevisedQueueSize = ua.downcast_extobject(params.RequestedParameters.QueueSize) # FIXME check and use value
result.FilterResult = params.RequestedParameters.Filter
mdata.parameters = result
return result
#FIXME modify event subscriptions
# FIXME modify event subscriptions
result = ua.MonitoredItemCreateResult()
result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
return result
......@@ -261,7 +264,7 @@ class InternalSubscription(object):
with self._lock:
result = ua.MonitoredItemCreateResult()
result.RevisedSamplingInterval = self.data.RevisedPublishingInterval
result.RevisedQueueSize = params.RequestedParameters.QueueSize #FIXME check and use value
result.RevisedQueueSize = params.RequestedParameters.QueueSize # FIXME check and use value
result.FilterResult = ua.downcast_extobject(params.RequestedParameters.Filter)
self._monitored_item_counter += 1
result.MonitoredItemId = self._monitored_item_counter
......@@ -284,7 +287,7 @@ class InternalSubscription(object):
self.logger.debug("adding callback return status %s and handle %s", result.StatusCode, handle)
mdata.callback_handle = handle
self._monitored_datachange[handle] = result.MonitoredItemId
#force data change event generation
# force data change event generation
self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
return result
......@@ -312,7 +315,6 @@ class InternalSubscription(object):
self._monitored_items.pop(mid)
return ua.StatusCode()
def datachange_callback(self, handle, value):
self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self, handle, value.Value)
event = ua.MonitoredItemNotification()
......@@ -354,9 +356,3 @@ class InternalSubscription(object):
except AttributeError:
fields.append(ua.Variant())
return fields
......@@ -6,13 +6,17 @@ from datetime import datetime
from opcua import ua
from opcua import utils
class PublishRequestData(object):
def __init__(self):
self.requesthdr = None
self.algohdr = None
self.seqhdr = None
class UAProcessor(object):
def __init__(self, internal_server, socket, name):
self.logger = logging.getLogger(__name__)
self.iserver = internal_server
......@@ -26,7 +30,7 @@ class UAProcessor(object):
self._seq_number = 1
def loop(self):
#first we want a hello message
# first we want a hello message
while True:
header = ua.Header.from_stream(self.socket)
body = self._receive_body(header.body_size)
......@@ -59,7 +63,7 @@ class UAProcessor(object):
seqhdr.SequenceNumber = self._seq_number
self._seq_number += 1
hdr = ua.Header(msgtype, ua.ChunkType.Single, self.channel.SecurityToken.ChannelId)
if type(algohdr) is ua.SymmetricAlgorithmHeader:
if isinstance(algohdr, ua.SymmetricAlgorithmHeader):
algohdr.TokenId = self.channel.SecurityToken.TokenId
self._write_socket(hdr, algohdr, seqhdr, response)
......@@ -89,7 +93,7 @@ class UAProcessor(object):
request = ua.OpenSecureChannelRequest.from_binary(body)
channel = self._open_secure_channel(request.Parameters)
#send response
# send response
response = ua.OpenSecureChannelResponse()
response.Parameters = channel
self.send_response(request.RequestHeader.RequestHandle, algohdr, seqhdr, response, ua.MessageType.SecureOpen)
......@@ -131,8 +135,8 @@ class UAProcessor(object):
self.logger.info("Create session request")
params = ua.CreateSessionParameters.from_binary(body)
self.session = self.iserver.create_session(self.name)#create the session on server
sessiondata = self.session.create_session(params)#get a session creation result to send back
self.session = self.iserver.create_session(self.name) # create the session on server
sessiondata = self.session.create_session(params) # get a session creation result to send back
response = ua.CreateSessionResponse()
response.Parameters = sessiondata
......@@ -153,7 +157,7 @@ class UAProcessor(object):
if not self.session:
#result = ua.ActivateSessionResult()
#result.Results.append(ua.StatusCode(ua.StatusCodes.BadSessionIdInvalid))
# result.Results.append(ua.StatusCode(ua.StatusCodes.BadSessionIdInvalid))
response = ua.ServiceFault()
response.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadSessionIdInvalid)
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
......@@ -338,7 +342,7 @@ class UAProcessor(object):
self.logger.info("open secure channel")
if params.RequestType == ua.SecurityTokenRequestType.Issue:
self.channel = ua.OpenSecureChannelResult()
self.channel.SecurityToken.TokenId = 13 #random value
self.channel.SecurityToken.TokenId = 13 # random value
self.channel.SecurityToken.ChannelId = self.iserver.get_new_channel_id()
self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
self.channel.SecurityToken.TokenId += 1
......@@ -346,5 +350,3 @@ class UAProcessor(object):
self.channel.SecurityToken.RevisedLifetime = params.RequestedLifetime
self.channel.ServerNonce = utils.create_nonce()
return self.channel
......@@ -5,7 +5,6 @@ from opcua.uaprotocol_auto import *
from opcua.uaprotocol_hand import *
# FIXME: this is really crappy, should thing about a better implementation
# maybe never inherit extensionobject and parse only body....
def downcast_extobject(item):
......@@ -15,5 +14,3 @@ def downcast_extobject(item):
classname = objectidname.split("_")[0]
cmd = "{}.from_binary(utils.Buffer(item.to_binary()))".format(classname)
return eval(cmd)
......@@ -14,14 +14,16 @@ logger = logging.getLogger('opcua.uaprotocol')
class SocketClosedException(Exception):
pass
def get_bytes_from_sock(sock, size):
data = utils.recv_all(sock, size)
if len(data) < size: #socket has closed!
if len(data) < size: # socket has closed!
raise SocketClosedException("Server socket has closed")
return io.BytesIO(data)
class Hello(uatypes.FrozenClass):
def __init__(self):
self.ProtocolVersion = 0
self.ReceiveBufferSize = 65536
......@@ -53,9 +55,8 @@ class Hello(uatypes.FrozenClass):
return hello
class MessageType(object):
Invalid = b"INV" #FIXME: check value
Invalid = b"INV" # FIXME: check value
Hello = b"HEL"
Acknowledge = b"ACK"
Error = b"ERR"
......@@ -63,15 +64,16 @@ class MessageType(object):
SecureClose = b"CLO"
SecureMessage = b"MSG"
class ChunkType(object):
Invalid = b"0" #FIXME check
Invalid = b"0" # FIXME check
Single = b"F"
Intermediate = b"C"
Final = b"A"
class Header(uatypes.FrozenClass):
def __init__(self, msgType=None, chunkType=None, channelid=0):
self.MessageType = msgType
self.ChunkType = chunkType
......@@ -113,6 +115,7 @@ class Header(uatypes.FrozenClass):
class ErrorMessage(uatypes.FrozenClass):
def __init__(self):
self.Error = uatypes.StatusCode()
self.Reason = ""
......@@ -137,12 +140,13 @@ class ErrorMessage(uatypes.FrozenClass):
class Acknowledge(uatypes.FrozenClass):
def __init__(self):
self.ProtocolVersion = 0
self.ReceiveBufferSize = 65536
self.SendBufferSize = 65536
self.MaxMessageSize = 0 #No limits
self.MaxChunkCount = 0 #No limits
self.MaxMessageSize = 0 # No limits
self.MaxChunkCount = 0 # No limits
self._freeze()
def to_binary(self):
......@@ -154,8 +158,6 @@ class Acknowledge(uatypes.FrozenClass):
b.append(struct.pack("<I", self.MaxChunkCount))
return b"".join(b)
@staticmethod
def from_binary(data):
ack = Acknowledge()
......@@ -166,7 +168,9 @@ class Acknowledge(uatypes.FrozenClass):
ack.MaxChunkCount = struct.unpack("<I", data.read(4))[0]
return ack
class AsymmetricAlgorithmHeader(uatypes.FrozenClass):
def __init__(self):
self.SecurityPolicyURI = "http://opcfoundation.org/UA/SecurityPolicy#None"
self.SenderCertificate = b""
......@@ -194,6 +198,7 @@ class AsymmetricAlgorithmHeader(uatypes.FrozenClass):
class SymmetricAlgorithmHeader(uatypes.FrozenClass):
def __init__(self):
self.TokenId = 0
self._freeze()
......@@ -213,6 +218,7 @@ class SymmetricAlgorithmHeader(uatypes.FrozenClass):
class SequenceHeader(uatypes.FrozenClass):
def __init__(self):
self.SequenceNumber = None
self.RequestId = None
......@@ -235,54 +241,64 @@ class SequenceHeader(uatypes.FrozenClass):
return "{}(SequenceNumber:{}, RequestId:{} )".format(self.__class__.__name__, self.SequenceNumber, self.RequestId)
__repr__ = __str__
###FIXES for missing switchfield in NodeAttributes classes
# FIXES for missing switchfield in NodeAttributes classes
ana = auto.NodeAttributesMask
class ObjectAttributes(auto.ObjectAttributes):
def __init__(self):
auto.ObjectAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.EventNotifier
class ObjectTypeAttributes(auto.ObjectTypeAttributes):
def __init__(self):
auto.ObjectTypeAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
class VariableAttributes(auto.VariableAttributes):
def __init__(self):
auto.VariableAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.AccessLevel | ana.UserAccessLevel | ana.MinimumSamplingInterval |ana.Historizing
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.AccessLevel | ana.UserAccessLevel | ana.MinimumSamplingInterval | ana.Historizing
class VariableTypeAttributes(auto.VariableTypeAttributes):
def __init__(self):
auto.VariableTypeAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.IsAbstract
class MethodAttributes(auto.MethodAttributes):
def __init__(self):
auto.MethodAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Executable | ana.UserExecutable
class ReferenceTypeAttributes(auto.ReferenceTypeAttributes):
def __init__(self):
auto.ReferenceTypeAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract | ana.Symmetric | ana.InverseName
class DataTypeAttributes(auto.DataTypeAttributes):
def __init__(self):
auto.DataTypeAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
class ViewAttributes(auto.ViewAttributes):
def __init__(self):
auto.ViewAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.ContainsNoLoops | ana.EventNotifier
ObjectIdsInv = {v: k for k, v in ObjectIds.__dict__.items()}
AttributeIdsInv = {v: k for k, v in AttributeIds.__dict__.items()}
This diff is collapsed.
......@@ -3,10 +3,12 @@ import uuid
class Buffer(object):
"""
alternative to io.BytesIO making debug easier
and added a few conveniance methods
"""
def __init__(self, data):
self.logger = logging.getLogger(__name__)
self.data = data
......@@ -45,8 +47,6 @@ class Buffer(object):
return self.data[:size]
def recv_all(socket, size):
"""
Receive up to size bytes from socket
......@@ -60,6 +60,6 @@ def recv_all(socket, size):
size -= len(chunk)
return data
def create_nonce():
return uuid.uuid4().bytes + uuid.uuid4().bytes #seems we need at least 32 bytes not 16 as python gives us...
def create_nonce():
return uuid.uuid4().bytes + uuid.uuid4().bytes # seems we need at least 32 bytes not 16 as python gives us...
......@@ -24,20 +24,26 @@ from opcua import AttributeIds
port_num1 = 48510
port_num2 = 48530
class SubHandler():
'''
Dummy subscription client
'''
def data_change(self, handle, node, val, attr):
pass
def event(self, handle, event):
pass
class MySubHandler():
'''
More advanced subscription client using Future, so we can wait for events in tests
'''
def __init__(self):
self.future = Future()
......@@ -52,6 +58,7 @@ class MySubHandler():
class Unit(unittest.TestCase):
'''
Simple unit test that do not need to setup a server or a client
'''
......@@ -76,7 +83,7 @@ class Unit(unittest.TestCase):
fb = ua.FourByteNodeId(53)
n = ua.NumericNodeId(53)
n1 = ua.NumericNodeId(53, 0)
s = ua.StringNodeId(53, 0)#should we raise an exception???
s = ua.StringNodeId(53, 0) # should we raise an exception???
s1 = ua.StringNodeId("53", 0)
bs = ua.ByteStringNodeId(b"53", 0)
gid = ua.Guid()
......@@ -126,11 +133,11 @@ class Unit(unittest.TestCase):
def test_equal_nodeid(self):
nid1 = ua.NodeId(999, 2)
nid2 = ua.NodeId(999, 2)
self.assertTrue(nid1==nid2)
self.assertTrue(id(nid1)!=id(nid2))
self.assertTrue(nid1 == nid2)
self.assertTrue(id(nid1) != id(nid2))
def test_zero_nodeid(self):
self.assertEqual(ua.NodeId(), ua.NodeId(0,0))
self.assertEqual(ua.NodeId(), ua.NodeId(0, 0))
self.assertEqual(ua.NodeId(), ua.NodeId.from_string('ns=0;i=0;'))
def test_string_nodeid(self):
......@@ -150,7 +157,6 @@ class Unit(unittest.TestCase):
self.assertEqual(nid.NamespaceIndex, 2)
self.assertEqual(nid.Identifier, 'PLC1.Manufacturer')
def test_strrepr_nodeid(self):
nid = ua.NodeId.from_string('ns=2;s=PLC1.Manufacturer;')
self.assertEqual(nid.to_string(), 'ns=2;s=PLC1.Manufacturer')
......@@ -182,13 +188,13 @@ class Unit(unittest.TestCase):
v2 = ua.Variant.from_binary(ua.utils.Buffer(v.to_binary()))
self.assertEqual(v.Value, v2.Value)
self.assertEqual(v.VariantType, v2.VariantType)
#commonity method:
# commonity method:
self.assertEqual(v, ua.Variant(v))
def test_variant_array(self):
v = ua.Variant([1,2,3,4,5])
v = ua.Variant([1, 2, 3, 4, 5])
self.assertEqual(v.Value[1], 2)
#self.assertEqual(v.VarianType, ua.VariantType.Int64) # we do not care, we should aonly test for sutff that matter
# self.assertEqual(v.VarianType, ua.VariantType.Int64) # we do not care, we should aonly test for sutff that matter
v2 = ua.Variant.from_binary(ua.utils.Buffer(v.to_binary()))
self.assertEqual(v.Value, v2.Value)
self.assertEqual(v.VariantType, v2.VariantType)
......@@ -210,7 +216,9 @@ class Unit(unittest.TestCase):
t4 = ua.LocalizedText.from_binary(ua.utils.Buffer(t1.to_binary()))
self.assertEqual(t1, t4)
class CommonTests(object):
'''
Tests that will be run twice. Once on server side and once on
client side since we have been carefull to have the exact
......@@ -290,21 +298,20 @@ class CommonTests(object):
ev.trigger()
clthandle, ev = msclt.future.result()
#with cond:
# with cond:
#ret = cond.wait(50000)
#if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
#else: pass # python2
self.assertIsNot(ev, None)# we did not receive event
# if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
# else: pass # python2
self.assertIsNot(ev, None) # we did not receive event
self.assertEqual(ev.Message.Text, msg)
#self.assertEqual(msclt.ev.Time, tid)
self.assertEqual(ev.Severity, 500)
self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
#time.sleep(0.1)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_non_existing_path(self):
root = self.opc.get_root_node()
with self.assertRaises(Exception):
......@@ -430,10 +437,9 @@ class CommonTests(object):
with self.assertRaises(Exception):
bad.set_value(89)
with self.assertRaises(Exception):
bad.add_object(0, "myobj" )
bad.add_object(0, "myobj")
with self.assertRaises(Exception):
bad.get_child(0, "myobj" )
bad.get_child(0, "myobj")
def test_array_value(self):
o = self.opc.get_objects_node()
......@@ -468,27 +474,27 @@ class CommonTests(object):
# Now check we get the start value
clthandle, node, val, attr = msclt.future.result()
#with cond:
# with cond:
#ret = cond.wait(0.5)
#if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
#else: pass # XXX
# if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
# else: pass # XXX
self.assertEqual(val, startv1)
self.assertEqual(node, v1)
msclt.reset()#reset future object
msclt.reset() # reset future object
# modify v1 and check we get value
v1.set_value([5])
clthandle, node, val, attr = msclt.future.result()
#with cond:
# with cond:
#ret = cond.wait(0.5)
#if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
#else: pass # XXX
# if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
# else: pass # XXX
self.assertEqual(node, v1)
self.assertEqual(val, [5])
with self.assertRaises(Exception):
sub.unsubscribe(999)# non existing handle
sub.unsubscribe(999) # non existing handle
sub.unsubscribe(handle1)
with self.assertRaises(Exception):
sub.unsubscribe(handle1) # second try should fail
......@@ -526,10 +532,10 @@ class CommonTests(object):
result = o.call_method("2:ServerMethod", 2.1)
self.assertEqual(result, 4.2)
with self.assertRaises(Exception):
#FIXME: we should raise a more precise exception
# FIXME: we should raise a more precise exception
result = o.call_method("2:ServerMethod", 2.1, 89, 9)
with self.assertRaises(Exception):
result = o.call_method(ua.NodeId(999), 2.1) #non existing method
result = o.call_method(ua.NodeId(999), 2.1) # non existing method
def test_method_array(self):
o = self.opc.get_objects_node()
......@@ -558,9 +564,6 @@ class CommonTests(object):
self.assertTrue(endpoints[0].EndpointUrl.startswith("opc.tcp://localhost"))
def add_server_methods(srv):
@uamethod
def func(parent, value):
......@@ -569,7 +572,6 @@ def add_server_methods(srv):
o = srv.get_objects_node()
v = o.add_method(ua.NodeId("ServerMethod", 2), ua.QualifiedName('ServerMethod', 2), func, [ua.VariantType.Int64], [ua.VariantType.Int64])
@uamethod
def func2(parent, methodname, value):
return math.sin(value)
......@@ -585,8 +587,8 @@ def add_server_methods(srv):
v = o.add_method(ua.NodeId("ServerMethodArray2", 2), ua.QualifiedName('ServerMethodArray2', 2), func3, [ua.VariantType.Int64], [ua.VariantType.Int64])
class TestClient(unittest.TestCase, CommonTests):
'''
Run common tests on client side
Of course we need a server so we start a server in another
......@@ -619,8 +621,8 @@ class TestClient(unittest.TestCase, CommonTests):
self.clt.bclient._send_request(request)
class TestServer(unittest.TestCase, CommonTests):
'''
Run common tests on server side
Tests that can only be run on server side must be defined here
......@@ -637,7 +639,6 @@ class TestServer(unittest.TestCase, CommonTests):
def tearDownClass(self):
self.srv.stop()
def test_register_namespace(self):
uri = 'http://mycustom.Namespace.com'
idx1 = self.opc.register_namespace(uri)
......@@ -662,10 +663,8 @@ class TestServer(unittest.TestCase, CommonTests):
self.assertEqual(result, 4.2)
if __name__ == '__main__':
logging.basicConfig(level=logging.WARN)
sclt = SubHandler()
unittest.main(verbosity=3)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment