Commit 4e086e7c authored by Christian Bergmiller's avatar Christian Bergmiller

[ADD] refactor tests, server, client, examples

parent a51fdea9
import asyncio
import logging
from opcua.client.client import Client
from opcua import ua
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('opcua')
OBJECTS_AND_VARIABLES = ua.NodeClass.Object | ua.NodeClass.Variable
async def browse_nodes(node, level=0):
node_class = await node.get_node_class()
children = []
for child in await node.get_children(nodeclassmask=OBJECTS_AND_VARIABLES):
children.append(await browse_nodes(child, level=level + 1))
return {
'id': node.nodeid.to_string(),
'name': (await node.get_display_name()).Text,
'cls': node_class.value,
'children': children,
'type': (await node.get_data_type_as_variant_type()).value if node_class == ua.NodeClass.Variable else None,
}
async def task(loop):
try:
client = Client(url='opc.tcp://commsvr.com:51234/UA/CAS_UA_Server')
await client.connect()
obj_node = client.get_objects_node()
_logger.info('Objects Node: %r', obj_node)
tree = await browse_nodes(obj_node)
_logger.info('Tree: %r', tree)
except Exception:
_logger.exception('Task error')
loop.stop()
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.create_task(task(loop))
try:
loop.run_forever()
except Exception:
_logger.exception('Event loop error')
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
if __name__ == '__main__':
main()
import sys
sys.path.insert(0, "..")
try:
from IPython import embed
except ImportError:
import code
def embed():
vars = globals()
vars.update(locals())
shell = code.InteractiveConsole(vars)
shell.interact()
import asyncio
import logging
from opcua import Client
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('opcua')
class SubHandler(object):
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
data_change and event methods are called directly from receiving thread.
......@@ -28,30 +18,39 @@ class SubHandler(object):
print("New event recived: ", event)
if __name__ == "__main__":
client = Client("opc.tcp://localhost:4840/freeopcua/server/")
# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
async def task(loop):
# url = "opc.tcp://commsvr.com:51234/UA/CAS_UA_Server"
url = "opc.tcp://localhost:4840/freeopcua/server/"
# url = "opc.tcp://admin@localhost:4840/freeopcua/server/" #connect using a user
try:
client.connect()
async with Client(url=url) as client:
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root = client.get_root_node()
_logger.info("Objects node is: %r", root)
# Now getting a variable node using its browse path
obj = await root.get_child(["0:Objects", "2:MyObject"])
_logger.info("MyObject is: %r", obj)
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root = client.get_root_node()
print("Objects node is: ", root)
myevent = await root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "2:MyFirstEvent"])
_logger.info("MyFirstEventType is: %r", myevent)
# Now getting a variable node using its browse path
obj = root.get_child(["0:Objects", "2:MyObject"])
print("MyObject is: ", obj)
msclt = SubHandler()
sub = await client.create_subscription(100, msclt)
handle = await sub.subscribe_events(obj, myevent)
await sub.unsubscribe(handle)
await sub.delete()
except Exception:
_logger.exception('Error')
loop.stop()
myevent = root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "2:MyFirstEvent"])
print("MyFirstEventType is: ", myevent)
msclt = SubHandler()
sub = client.create_subscription(100, msclt)
handle = sub.subscribe_events(obj, myevent)
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.close()
embed()
sub.unsubscribe(handle)
sub.delete()
finally:
client.disconnect()
if __name__ == "__main__":
main()
import sys
sys.path.insert(0, "..")
import logging
import time
try:
from IPython import embed
except ImportError:
import code
def embed():
vars = globals()
vars.update(locals())
shell = code.InteractiveConsole(vars)
shell.interact()
import time
import asyncio
import logging
from opcua import Client
from opcua import ua
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('opcua')
class SubHandler(object):
......@@ -29,60 +19,63 @@ class SubHandler(object):
"""
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val)
print("New data change event", node, val)
def event_notification(self, event):
print("Python: New event", event)
print("New event", event)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
#logger = logging.getLogger("KeepAlive")
#logger.setLevel(logging.DEBUG)
client = Client("opc.tcp://localhost:4840/freeopcua/server/")
# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
async def task(loop):
url = "opc.tcp://commsvr.com:51234/UA/CAS_UA_Server"
# url = "opc.tcp://localhost:4840/freeopcua/server/"
try:
client.connect()
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root = client.get_root_node()
print("Root node is: ", root)
objects = client.get_objects_node()
print("Objects node is: ", objects)
# Node objects have methods to read and write node attributes as well as browse or populate address space
print("Children of root are: ", root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#var.get_value() # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
#var.set_value(3.9) # set node value using implicit data type
# Now getting a variable node using its browse path
myvar = root.get_child(["0:Objects", "2:MyObject", "2:MyVariable"])
obj = root.get_child(["0:Objects", "2:MyObject"])
print("myvar is: ", myvar)
# subscribing to a variable node
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_data_change(myvar)
time.sleep(0.1)
# we can also subscribe to events from server
sub.subscribe_events()
# sub.unsubscribe(handle)
# sub.delete()
# calling a method on server
res = obj.call_method("2:multiply", 3, "klk")
print("method result is: ", res)
embed()
finally:
client.disconnect()
async with Client(url=url) as client:
root = client.get_root_node()
_logger.info("Root node is: %r", root)
objects = client.get_objects_node()
_logger.info("Objects node is: %r", objects)
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger.info("Children of root are: %r", await root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#var.get_value() # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
#var.set_value(3.9) # set node value using implicit data type
# Now getting a variable node using its browse path
myvar = await root.get_child(["0:Objects", "2:MyObject", "2:MyVariable"])
obj = await root.get_child(["0:Objects", "2:MyObject"])
_logger.info("myvar is: %r", myvar)
# subscribing to a variable node
handler = SubHandler()
sub = await client.create_subscription(500, handler)
handle = await sub.subscribe_data_change(myvar)
await asyncio.sleep(0.1)
# we can also subscribe to events from server
await sub.subscribe_events()
# await sub.unsubscribe(handle)
# await sub.delete()
# calling a method on server
res = obj.call_method("2:multiply", 3, "klk")
_logger.info("method result is: %r", res)
except Exception:
_logger.exception('error')
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.close()
if __name__ == "__main__":
main()
import sys
sys.path.insert(0, "..")
import asyncio
import logging
from opcua import Client
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('opcua')
if __name__ == "__main__":
client = Client("opc.tcp://localhost:4840/freeopcua/server/")
# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
async def task(loop):
url = "opc.tcp://commsvr.com:51234/UA/CAS_UA_Server"
# url = "opc.tcp://localhost:4840/freeopcua/server/"
try:
client.connect()
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root = client.get_root_node()
print("Objects node is: ", root)
# Node objects have methods to read and write node attributes as well as browse or populate address space
print("Children of root are: ", root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#var.get_value() # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
#var.set_value(3.9) # set node value using implicit data type
# Now getting a variable node using its browse path
myvar = root.get_child(["0:Objects", "2:MyObject", "2:MyVariable"])
obj = root.get_child(["0:Objects", "2:MyObject"])
print("myvar is: ", myvar)
finally:
client.disconnect()
async with Client(url=url) as client:
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root = client.get_root_node()
_logger.info("Objects node is: %r", root)
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger.info("Children of root are: %r", await root.get_children())
# get a specific node knowing its node id
# var = client.get_node(ua.NodeId(1002, 2))
# var = client.get_node("ns=3;i=2002")
# print(var)
# var.get_data_value() # get value of node as a DataValue object
# var.get_value() # get value of node as a python builtin
# var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
# var.set_value(3.9) # set node value using implicit data type
# Now getting a variable node using its browse path
myvar = await root.get_child(["0:Objects", "2:MyObject", "2:MyVariable"])
obj = await root.get_child(["0:Objects", "2:MyObject"])
_logger.info("myvar is: %r", myvar)
except Exception:
_logger.exception('error')
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.close()
if __name__ == "__main__":
main()
import sys
sys.path.insert(0, "..")
import time
import asyncio
from opcua import ua, Server
if __name__ == "__main__":
async def task(loop):
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
server.set_endpoint('opc.tcp://0.0.0.0:4840/freeopcua/server/')
# setup our own namespace, not really necessary but should as spec
uri = "http://examples.freeopcua.github.io"
idx = server.register_namespace(uri)
uri = 'http://examples.freeopcua.github.io'
idx = await server.register_namespace(uri)
# get Objects node, this is where we should put our nodes
objects = server.get_objects_node()
# populating our address space
myobj = objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", 6.7)
myvar.set_writable() # Set MyVariable to be writable by clients
myobj = objects.add_object(idx, 'MyObject')
myvar = myobj.add_variable(idx, 'MyVariable', 6.7)
myvar.set_writable() # Set MyVariable to be writable by clients
# starting!
server.start()
await server.start()
try:
count = 0
while True:
time.sleep(1)
await asyncio.sleep(1)
count += 0.1
myvar.set_value(count)
finally:
#close connection, remove subcsriptions, etc
# close connection, remove subcsriptions, etc
server.stop()
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.close()
if __name__ == '__main__':
main()
......@@ -15,48 +15,7 @@ from opcua.common.structures import load_type_definitions
from opcua.crypto import uacrypto, security_policies
class KeepAlive:
"""
Used by Client to keep the session open.
OPCUA defines timeout both for sessions and secure channel
ToDo: remove
"""
def __init__(self, client, timeout):
"""
:param session_timeout: Timeout to re-new the session
in milliseconds.
"""
self.logger = logging.getLogger(__name__)
self.loop = asyncio.get_event_loop()
self.client = client
self._do_stop = False
self._cond = Condition()
self.timeout = timeout
# some server support no timeout, but we do not trust them
if self.timeout == 0:
self.timeout = 3600000 # 1 hour
def run(self):
self.logger.debug("starting keepalive thread with period of %s milliseconds", self.timeout)
server_state = self.client.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
while not self._do_stop:
with self._cond:
self._cond.wait(self.timeout / 1000)
if self._do_stop:
break
self.logger.debug("renewing channel")
self.client.open_secure_channel(renew=True)
val = server_state.get_value()
self.logger.debug("server state is: %s ", val)
self.logger.debug("keepalive thread has stopped")
def stop(self):
self.logger.debug("stoping keepalive thread")
self._do_stop = True
with self._cond:
self._cond.notify_all()
_logger = logging.getLogger(__name__)
class Client(object):
......@@ -82,6 +41,7 @@ class Client(object):
time. The timeout is specified in seconds.
"""
self.logger = logging.getLogger(__name__)
self.loop = asyncio.get_event_loop()
self.server_url = urlparse(url)
# take initial username and password from the url
self._username = self.server_url.username
......@@ -100,7 +60,7 @@ class Client(object):
self.user_private_key = None
self._server_nonce = None
self._session_counter = 1
self.keepalive = None
self.keep_alive = None
self.nodes = Shortcuts(self.uaclient)
self.max_messagesize = 0 # No limits
self.max_chunkcount = 0 # No limits
......@@ -110,7 +70,7 @@ class Client(object):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
self.disconnect()
await self.disconnect()
@staticmethod
def find_endpoint(endpoints, security_mode, policy_uri):
......@@ -230,6 +190,7 @@ class Client(object):
High level method
Connect, create and activate session
"""
_logger.info('connect')
await self.connect_socket()
await self.send_hello()
await self.open_secure_channel()
......@@ -241,6 +202,7 @@ class Client(object):
High level method
Close session, secure channel and socket
"""
_logger.info('disconnect')
try:
await self.close_session()
await self.close_secure_channel()
......@@ -337,7 +299,6 @@ class Client(object):
desc.ProductUri = self.product_uri
desc.ApplicationName = ua.LocalizedText(self.name)
desc.ApplicationType = ua.ApplicationType.Client
params = ua.CreateSessionParameters()
# at least 32 random bytes for server to prove possession of private key (specs part 4, 5.6.2.2)
nonce = utils.create_nonce(32)
......@@ -346,7 +307,8 @@ class Client(object):
params.ClientDescription = desc
params.EndpointUrl = self.server_url.geturl()
params.SessionName = self.description + " Session" + str(self._session_counter)
params.RequestedSessionTimeout = 3600000
# Requested maximum number of milliseconds that a Session should remain open without activity
params.RequestedSessionTimeout = 60 * 60 * 1000
params.MaxResponseMessageSize = 0 # means no max size
response = await self.uaclient.create_session(params)
if self.security_policy.client_certificate is None:
......@@ -362,13 +324,33 @@ class Client(object):
# remember PolicyId's: we will use them in activate_session()
ep = Client.find_endpoint(response.ServerEndpoints, self.security_policy.Mode, self.security_policy.URI)
self._policy_ids = ep.UserIdentityTokens
# Actual maximum number of milliseconds that a Session shall remain open without activity
self.session_timeout = response.RevisedSessionTimeout
# 0.7 is from spec
# ToDo: refactor with callback_later
# self.keepalive = KeepAlive(self, min(self.session_timeout, self.secure_channel_timeout) * 0.7)
# self.keepalive.start()
self.keep_alive = self.loop.create_task(self._renew_session())
# ToDo: subscribe to ServerStatus
"""
The preferred mechanism for a Client to monitor the connection status is through the keep-alive of the
Subscription. A Client should subscribe for the State Variable in the ServerStatus to detect shutdown or other
failure states. If no Subscription is created or the Server does not support Subscriptions,
the connection can be monitored by periodically reading the State Variable
"""
return response
async def _renew_session(self):
"""
Renew the SecureChannel before the SessionTimeout will happen.
ToDo: shouldn't this only be done if there was no session activity?
"""
# 0.7 is from spec
await asyncio.sleep(min(self.session_timeout, self.secure_channel_timeout) * 0.7)
server_state = self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
self.logger.debug("renewing channel")
await self.open_secure_channel(renew=True)
val = await server_state.get_value()
self.logger.debug("server state is: %s ", val)
# create new keep-alive task
self.keep_alive = self.loop.create_task(self._renew_session())
def server_policy_id(self, token_type, default):
"""
Find PolicyId of server's UserTokenPolicy by token_type.
......@@ -462,8 +444,8 @@ class Client(object):
"""
Close session
"""
if self.keepalive:
self.keepalive.stop()
if self.keep_alive:
self.keep_alive.cancel()
return await self.uaclient.close_session(True)
def get_root_node(self):
......
......@@ -18,7 +18,7 @@ class UASocketProtocol(asyncio.Protocol):
"""
def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
self.logger = logging.getLogger(__name__ + ".Socket")
self.logger = logging.getLogger(__name__ + ".UASocketProtocol")
self.loop = asyncio.get_event_loop()
self.transport = None
self.receive_buffer = asyncio.Queue()
......@@ -126,7 +126,7 @@ class UASocketProtocol(asyncio.Protocol):
elif isinstance(msg, ua.Acknowledge):
self._call_callback(0, msg)
elif isinstance(msg, ua.ErrorMessage):
self.logger.warning("Received an error: %s", msg)
self.logger.warning("Received an error: %r", msg)
else:
raise ua.UaError("Unsupported message type: %s", msg)
if self._leftover_chunk or not self.receive_buffer.empty():
......@@ -209,7 +209,7 @@ class UaClient:
"""
def __init__(self, timeout=1):
self.logger = logging.getLogger(__name__)
self.logger = logging.getLogger(__name__ + '.UaClient')
self.loop = asyncio.get_event_loop()
self._publish_callbacks = {}
self._timeout = timeout
......
......@@ -41,6 +41,7 @@ def call_method_full(parent, methodid, *args):
result.OutputArguments = [var.Value for var in result.OutputArguments]
return result
def _call_method(server, parentnodeid, methodid, arguments):
request = ua.CallMethodRequest()
request.ObjectId = parentnodeid
......
......@@ -3,15 +3,20 @@ High level node object, to access node attribute
and browse address space
"""
import logging
from opcua import ua
from opcua.common import events
import opcua.common
def _check_results(results, reqlen = 1):
_logger = logging.getLogger(__name__)
def _check_results(results, reqlen=1):
assert len(results) == reqlen, results
for r in results:
r.check()
def _to_nodeid(nodeid):
if isinstance(nodeid, int):
return ua.TwoByteNodeId(nodeid)
......
......@@ -4,7 +4,6 @@ high level interface to subscriptions
import time
import asyncio
import logging
from threading import Lock
import collections
from opcua import ua
......@@ -84,8 +83,7 @@ class Subscription(object):
self._client_handle = 200
self._handler = handler
self.parameters = params # move to data class
self._monitoreditems_map = {}
self._lock = Lock()
self._monitored_items = {}
self.subscription_id = None
async def init(self):
......@@ -125,11 +123,10 @@ class Subscription(object):
def _call_datachange(self, datachange):
for item in datachange.MonitoredItems:
with self._lock:
if item.ClientHandle not in self._monitoreditems_map:
self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
continue
data = self._monitoreditems_map[item.ClientHandle]
if item.ClientHandle not in self._monitored_items:
self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
continue
data = self._monitored_items[item.ClientHandle]
if hasattr(self._handler, "datachange_notification"):
event_data = DataChangeNotif(data, item)
try:
......@@ -147,8 +144,7 @@ class Subscription(object):
def _call_event(self, eventlist):
for event in eventlist.Events:
with self._lock:
data = self._monitoreditems_map[event.ClientHandle]
data = self._monitored_items[event.ClientHandle]
result = events.Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
result.server_handle = data.server_handle
if hasattr(self._handler, "event_notification"):
......@@ -219,9 +215,8 @@ class Subscription(object):
rv.AttributeId = attr
# rv.IndexRange //We leave it null, then the entire array is returned
mparams = ua.MonitoringParameters()
with self._lock:
self._client_handle += 1
mparams.ClientHandle = self._client_handle
self._client_handle += 1
mparams.ClientHandle = self._client_handle
mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
mparams.QueueSize = queuesize
mparams.DiscardOldest = True
......@@ -242,31 +237,28 @@ class Subscription(object):
params.SubscriptionId = self.subscription_id
params.ItemsToCreate = monitored_items
params.TimestampsToReturn = ua.TimestampsToReturn.Both
# insert monitored item into map to avoid notification arrive before result return
# server_handle is left as None in purpose as we don't get it yet.
with self._lock:
for mi in monitored_items:
data = SubscriptionItemData()
data.client_handle = mi.RequestedParameters.ClientHandle
data.node = Node(self.server, mi.ItemToMonitor.NodeId)
data.attribute = mi.ItemToMonitor.AttributeId
#TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
data.mfilter = mi.RequestedParameters.Filter
self._monitoreditems_map[mi.RequestedParameters.ClientHandle] = data
for mi in monitored_items:
data = SubscriptionItemData()
data.client_handle = mi.RequestedParameters.ClientHandle
data.node = Node(self.server, mi.ItemToMonitor.NodeId)
data.attribute = mi.ItemToMonitor.AttributeId
#TODO: Either use the filter from request or from response. Here it uses from request, in modify it uses from response
data.mfilter = mi.RequestedParameters.Filter
self._monitored_items[mi.RequestedParameters.ClientHandle] = data
results = await self.server.create_monitored_items(params)
mids = []
# process result, add server_handle, or remove it if failed
with self._lock:
for idx, result in enumerate(results):
mi = params.ItemsToCreate[idx]
if not result.StatusCode.is_good():
del self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
mids.append(result.StatusCode)
continue
data = self._monitoreditems_map[mi.RequestedParameters.ClientHandle]
data.server_handle = result.MonitoredItemId
mids.append(result.MonitoredItemId)
for idx, result in enumerate(results):
mi = params.ItemsToCreate[idx]
if not result.StatusCode.is_good():
del self._monitored_items[mi.RequestedParameters.ClientHandle]
mids.append(result.StatusCode)
continue
data = self._monitored_items[mi.RequestedParameters.ClientHandle]
data.server_handle = result.MonitoredItemId
mids.append(result.MonitoredItemId)
return mids
async def unsubscribe(self, handle):
......@@ -279,11 +271,10 @@ class Subscription(object):
params.MonitoredItemIds = [handle]
results = await self.server.delete_monitored_items(params)
results[0].check()
with self._lock:
for k, v in self._monitoreditems_map.items():
if v.server_handle == handle:
del(self._monitoreditems_map[k])
return
for k, v in self._monitored_items.items():
if v.server_handle == handle:
del(self._monitored_items[k])
return
async def modify_monitored_item(self, handle, new_samp_time, new_queuesize=0, mod_filter_val=-1):
"""
......@@ -294,9 +285,9 @@ class Subscription(object):
:param mod_filter_val: New deadband filter value
:return: Return a Modify Monitored Item Result
"""
for monitored_item_index in self._monitoreditems_map:
if self._monitoreditems_map[monitored_item_index].server_handle == handle:
item_to_change = self._monitoreditems_map[monitored_item_index]
for monitored_item_index in self._monitored_items:
if self._monitored_items[monitored_item_index].server_handle == handle:
item_to_change = self._monitored_items[monitored_item_index]
break
if mod_filter_val is None:
mod_filter = None
......@@ -320,8 +311,7 @@ class Subscription(object):
def _modify_monitored_item_request(self, new_queuesize, new_samp_time, mod_filter, client_handle):
req_params = ua.MonitoringParameters()
with self._lock:
req_params.ClientHandle = client_handle
req_params.ClientHandle = client_handle
req_params.QueueSize = new_queuesize
req_params.Filter = mod_filter
req_params.SamplingInterval = new_samp_time
......
......@@ -120,90 +120,3 @@ class SocketWrapper(object):
def create_nonce(size=32):
return os.urandom(size)
class ThreadLoop(threading.Thread):
"""
run an asyncio loop in a thread
"""
def __init__(self):
threading.Thread.__init__(self)
self.logger = logging.getLogger(__name__)
self.loop = None
self._cond = threading.Condition()
def start(self):
with self._cond:
threading.Thread.start(self)
self._cond.wait()
def run(self):
self.logger.debug("Starting subscription thread")
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
with self._cond:
self._cond.notify_all()
self.loop.run_forever()
self.logger.debug("subscription thread ended")
def create_server(self, proto, hostname, port):
return self.loop.create_server(proto, hostname, port)
def stop(self):
"""
stop subscription loop, thus the subscription thread
"""
self.loop.call_soon_threadsafe(self.loop.stop)
def call_soon(self, callback):
self.loop.call_soon_threadsafe(callback)
def call_later(self, delay, callback):
"""
threadsafe call_later from asyncio
"""
p = functools.partial(self.loop.call_later, delay, callback)
self.loop.call_soon_threadsafe(p)
def _create_task(self, future, coro, cb=None):
#task = self.loop.create_task(coro)
task = asyncio.async(coro, loop=self.loop)
if cb:
task.add_done_callback(cb)
future.set_result(task)
def create_task(self, coro, cb=None):
"""
threadsafe create_task from asyncio
"""
future = Future()
p = functools.partial(self._create_task, future, coro, cb)
self.loop.call_soon_threadsafe(p)
return future.result()
def run_coro_and_wait(self, coro):
cond = threading.Condition()
def cb(_):
with cond:
cond.notify_all()
with cond:
task = self.create_task(coro, cb)
cond.wait()
return task.result()
def _run_until_complete(self, future, coro):
task = self.loop.run_until_complete(coro)
future.set_result(task)
def run_until_complete(self, coro):
"""
threadsafe run_until_completed from asyncio
"""
future = Future()
p = functools.partial(self._run_until_complete, future, coro)
self.loop.call_soon_threadsafe(p)
return future.result()
......@@ -172,4 +172,3 @@ def x509_to_string(cert):
issuer = ', issuer: {0}'.format(x509_name_to_string(cert.issuer))
# TODO: show more information
return "{0}{1}, {2} - {3}".format(x509_name_to_string(cert.subject), issuer, cert.not_valid_before, cert.not_valid_after)
......@@ -2,12 +2,7 @@
Socket server forwarding request to internal server
"""
import logging
try:
# we prefer to use bundles asyncio version, otherwise fallback to trollius
import asyncio
except ImportError:
import trollius as asyncio
import asyncio
from opcua import ua
import opcua.ua.ua_binary as uabin
......@@ -16,14 +11,86 @@ from opcua.server.uaprocessor import UaProcessor
logger = logging.getLogger(__name__)
class BinaryServer(object):
class OPCUAProtocol(asyncio.Protocol):
"""
instanciated for every connection
defined as internal class since it needs access
to the internal server object
FIXME: find another solution
"""
def __init__(self, iserver=None, policies=None, clients=None):
self.peer_name = None
self.transport = None
self.processor = None
self.data = b''
self.iserver = iserver
self.policies = policies
self.clients = clients
def __str__(self):
return 'OPCUAProtocol({}, {})'.format(self.peer_name, self.processor.session)
__repr__ = __str__
def connection_made(self, transport):
self.peer_name = transport.get_extra_info('peername')
logger.info('New connection from %s', self.peer_name)
self.transport = transport
self.processor = UaProcessor(self.iserver, self.transport)
self.processor.set_policies(self.policies)
self.iserver.asyncio_transports.append(transport)
self.clients.append(self)
def connection_lost(self, ex):
logger.info('Lost connection from %s, %s', self.peer_name, ex)
self.transport.close()
self.iserver.asyncio_transports.remove(self.transport)
self.processor.close()
if self in self.clients:
self.clients.remove(self)
def data_received(self, data):
logger.debug('received %s bytes from socket', len(data))
if self.data:
data = self.data + data
self.data = b''
self._process_data(data)
def _process_data(self, data):
buf = ua.utils.Buffer(data)
while True:
try:
backup_buf = buf.copy()
try:
hdr = uabin.header_from_binary(buf)
except ua.utils.NotEnoughData:
logger.info('We did not receive enough data from client, waiting for more')
self.data = backup_buf.read(len(backup_buf))
return
if len(buf) < hdr.body_size:
logger.info('We did not receive enough data from client, waiting for more')
self.data = backup_buf.read(len(backup_buf))
return
ret = self.processor.process(hdr, buf)
if not ret:
logger.info('processor returned False, we close connection from %s', self.peer_name)
self.transport.close()
return
if len(buf) == 0:
return
except Exception:
logger.exception('Exception raised while parsing message from client, closing')
return
class BinaryServer:
def __init__(self, internal_server, hostname, port):
self.logger = logging.getLogger(__name__)
self.hostname = hostname
self.port = port
self.iserver = internal_server
self.loop = internal_server.loop
self.loop = asyncio.get_event_loop()
self._server = None
self._policies = []
self.clients = []
......@@ -31,80 +98,12 @@ class BinaryServer(object):
def set_policies(self, policies):
self._policies = policies
def start(self):
class OPCUAProtocol(asyncio.Protocol):
"""
instanciated for every connection
defined as internal class since it needs access
to the internal server object
FIXME: find another solution
"""
iserver = self.iserver
loop = self.loop
logger = self.logger
policies = self._policies
clients = self.clients
def __str__(self):
return "OPCUAProtocol({}, {})".format(self.peername, self.processor.session)
__repr__ = __str__
def connection_made(self, transport):
self.peername = transport.get_extra_info('peername')
self.logger.info('New connection from %s', self.peername)
self.transport = transport
self.processor = UaProcessor(self.iserver, self.transport)
self.processor.set_policies(self.policies)
self.data = b""
self.iserver.asyncio_transports.append(transport)
self.clients.append(self)
def connection_lost(self, ex):
self.logger.info('Lost connection from %s, %s', self.peername, ex)
self.transport.close()
self.iserver.asyncio_transports.remove(self.transport)
self.processor.close()
if self in self.clients:
self.clients.remove(self)
def data_received(self, data):
logger.debug("received %s bytes from socket", len(data))
if self.data:
data = self.data + data
self.data = b""
self._process_data(data)
def _process_data(self, data):
buf = ua.utils.Buffer(data)
while True:
try:
backup_buf = buf.copy()
try:
hdr = uabin.header_from_binary(buf)
except ua.utils.NotEnoughData:
logger.info("We did not receive enough data from client, waiting for more")
self.data = backup_buf.read(len(backup_buf))
return
if len(buf) < hdr.body_size:
logger.info("We did not receive enough data from client, waiting for more")
self.data = backup_buf.read(len(backup_buf))
return
ret = self.processor.process(hdr, buf)
if not ret:
logger.info("processor returned False, we close connection from %s", self.peername)
self.transport.close()
return
if len(buf) == 0:
return
except Exception:
logger.exception("Exception raised while parsing message from client, closing")
return
coro = self.loop.create_server(OPCUAProtocol, self.hostname, self.port)
self._server = self.loop.run_coro_and_wait(coro)
def _make_protocol(self):
"""Protocol Factory"""
return OPCUAProtocol(iserver=self.iserver, policies=self._policies, clients=self.clients)
async def start(self):
self._server = await self.loop.create_server(self._make_protocol, self.hostname, self.port)
# get the port and the hostname from the created server socket
# only relevant for dynamic port asignment (when self.port == 0)
if self.port == 0 and len(self._server.sockets) == 1:
......@@ -115,10 +114,10 @@ class BinaryServer(object):
self.port = sockname[1]
self.logger.warning('Listening on {0}:{1}'.format(self.hostname, self.port))
def stop(self):
self.logger.info("Closing asyncio socket server")
async def stop(self):
self.logger.info('Closing asyncio socket server')
for transport in self.iserver.asyncio_transports:
transport.close()
if self._server:
self.loop.call_soon(self._server.close)
self.loop.run_coro_and_wait(self._server.wait_closed())
await self._server.wait_closed()
......@@ -3,22 +3,18 @@ Internal server implementing opcu-ua interface.
Can be used on server side or to implement binary/https opc-ua servers
"""
from datetime import datetime, timedelta
from copy import copy, deepcopy
import os
import asyncio
import logging
from threading import Lock
from enum import Enum
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from copy import copy, deepcopy
from urllib.parse import urlparse
from datetime import datetime, timedelta
from opcua import ua
from opcua.common import utils
from opcua.common.callback import (CallbackType, ServerItemCallback,
CallbackDispatcher)
from opcua.common.callback import CallbackType, ServerItemCallback, CallbackDispatcher
from opcua.common.node import Node
from opcua.server.history import HistoryManager
from opcua.server.address_space import AddressSpace
......@@ -29,7 +25,6 @@ from opcua.server.address_space import MethodService
from opcua.server.subscription_service import SubscriptionService
from opcua.server.standard_address_space import standard_address_space
from opcua.server.users import User
#from opcua.common import xmlimporter
class SessionState(Enum):
......@@ -48,27 +43,21 @@ class InternalServer(object):
def __init__(self, shelffile=None):
self.logger = logging.getLogger(__name__)
self.server_callback_dispatcher = CallbackDispatcher()
self.endpoints = []
self._channel_id_counter = 5
self.allow_remote_admin = True
self.disabled_clock = False # for debugging we may want to disable clock that writes too much in log
self._known_servers = {} # used if we are a discovery server
self.aspace = AddressSpace()
self.attribute_service = AttributeService(self.aspace)
self.view_service = ViewService(self.aspace)
self.method_service = MethodService(self.aspace)
self.node_mgt_service = NodeManagementService(self.aspace)
self.load_standard_address_space(shelffile)
self.loop = utils.ThreadLoop()
self.loop = asyncio.get_event_loop()
self.asyncio_transports = []
self.subscription_service = SubscriptionService(self.loop, self.aspace)
self.history_manager = HistoryManager(self)
# create a session to use on server side
......@@ -78,13 +67,16 @@ class InternalServer(object):
self._address_space_fixes()
self.setup_nodes()
def setup_nodes(self):
async def init(self):
pass
async def setup_nodes(self):
"""
Set up some nodes as defined by spec
"""
uries = ["http://opcfoundation.org/UA/"]
uries = ['http://opcfoundation.org/UA/']
ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
ns_node.set_value(uries)
await ns_node.set_value(uries)
def load_standard_address_space(self, shelffile=None):
if shelffile is not None and os.path.isfile(shelffile):
......@@ -128,19 +120,17 @@ class InternalServer(object):
self.aspace.dump(path)
def start(self):
self.logger.info("starting internal server")
self.logger.info('starting internal server')
for edp in self.endpoints:
self._known_servers[edp.Server.ApplicationUri] = ServerDesc(edp.Server)
self.loop.start()
Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)).set_value(0, ua.VariantType.Int32)
Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
if not self.disabled_clock:
self._set_current_time()
def stop(self):
self.logger.info("stopping internal server")
self.logger.info('stopping internal server')
self.isession.close_session()
self.loop.stop()
self.history_manager.stop()
def _set_current_time(self):
......@@ -155,14 +145,14 @@ class InternalServer(object):
self.endpoints.append(endpoint)
def get_endpoints(self, params=None, sockname=None):
self.logger.info("get endpoint")
self.logger.info('get endpoint')
if sockname:
# return to client the ip address it has access to
edps = []
for edp in self.endpoints:
edp1 = copy(edp)
url = urlparse(edp1.EndpointUrl)
url = url._replace(netloc=sockname[0] + ":" + str(sockname[1]))
url = url._replace(netloc=sockname[0] + ':' + str(sockname[1]))
edp1.EndpointUrl = url.geturl()
edps.append(edp1)
return edps
......@@ -173,9 +163,9 @@ class InternalServer(object):
return [desc.Server for desc in self._known_servers.values()]
servers = []
for serv in self._known_servers.values():
serv_uri = serv.Server.ApplicationUri.split(":")
serv_uri = serv.Server.ApplicationUri.split(':')
for uri in params.ServerUris:
uri = uri.split(":")
uri = uri.split(':')
if serv_uri[:len(uri)] == uri:
servers.append(serv.Server)
break
......@@ -223,7 +213,7 @@ class InternalServer(object):
"""
event_notifier = source.get_event_notifier()
if ua.EventNotifier.SubscribeToEvents not in event_notifier:
raise ua.UaError("Node does not generate events", event_notifier)
raise ua.UaError('Node does not generate events', event_notifier)
if ua.EventNotifier.HistoryRead not in event_notifier:
event_notifier.add(ua.EventNotifier.HistoryRead)
......@@ -270,18 +260,17 @@ class InternalSession(object):
self.authentication_token = ua.NodeId(self._auth_counter)
InternalSession._auth_counter += 1
self.subscriptions = []
self.logger.info("Created internal session %s", self.name)
self._lock = Lock()
self.logger.info('Created internal session %s', self.name)
def __str__(self):
return "InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})".format(
return 'InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})'.format(
self.name, self.user, self.session_id, self.authentication_token)
def get_endpoints(self, params=None, sockname=None):
return self.iserver.get_endpoints(params, sockname)
def create_session(self, params, sockname=None):
self.logger.info("Create session request")
self.logger.info('Create session request')
result = ua.CreateSessionResult()
result.SessionId = self.session_id
......@@ -295,12 +284,12 @@ class InternalSession(object):
return result
def close_session(self, delete_subs=True):
self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
self.logger.info('close session %s with subscriptions %s', self, self.subscriptions)
self.state = SessionState.Closed
self.delete_subscriptions(self.subscriptions[:])
def activate_session(self, params):
self.logger.info("activate session")
self.logger.info('activate session')
result = ua.ActivateSessionResult()
if self.state != SessionState.Created:
raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
......@@ -311,9 +300,9 @@ class InternalSession(object):
self.state = SessionState.Activated
id_token = params.UserIdentityToken
if isinstance(id_token, ua.UserNameIdentityToken):
if self.iserver.allow_remote_admin and id_token.UserName in ("admin", "Admin"):
if self.iserver.allow_remote_admin and id_token.UserName in ('admin', 'Admin'):
self.user = User.Admin
self.logger.info("Activated internal session %s for user %s", self.name, self.user)
self.logger.info('Activated internal session %s for user %s', self.name, self.user)
return result
def read(self, params):
......@@ -358,8 +347,7 @@ class InternalSession(object):
def create_subscription(self, params, callback):
result = self.subscription_service.create_subscription(params, callback)
with self._lock:
self.subscriptions.append(result.SubscriptionId)
self.subscriptions.append(result.SubscriptionId)
return result
def create_monitored_items(self, params):
......@@ -379,9 +367,8 @@ class InternalSession(object):
def delete_subscriptions(self, ids):
for i in ids:
with self._lock:
if i in self.subscriptions:
self.subscriptions.remove(i)
if i in self.subscriptions:
self.subscriptions.remove(i)
return self.subscription_service.delete_subscriptions(ids)
def delete_monitored_items(self, params):
......
This diff is collapsed.
import pytest
from opcua import Client
from opcua import Server
from opcua import ua
from .tests_subscriptions import SubscriptionTests
from .tests_common import CommonTests, add_server_methods
from .tests_xml import XmlTests
port_num1 = 48510
@pytest.yield_fixture()
async def admin_client():
# start admin client
# long timeout since travis (automated testing) can be really slow
clt = Client(f'opc.tcp://admin@127.0.0.1:{port_num1}', timeout=10)
await clt.connect()
yield clt
await clt.disconnect()
@pytest.yield_fixture()
async def client():
# start anonymous client
ro_clt = Client(f'opc.tcp://127.0.0.1:{port_num1}')
await ro_clt.connect()
yield ro_clt
await ro_clt.disconnect()
@pytest.yield_fixture()
async def server():
# start our own server
srv = Server()
await srv.set_endpoint(f'opc.tcp://127.0.0.1:{port_num1}')
add_server_methods(srv)
await srv.start()
yield srv
# stop the server
await srv.stop()
@pytest.mark.asyncio
async def test_service_fault(server, admin_client):
request = ua.ReadRequest()
request.TypeId = ua.FourByteNodeId(999) # bad type!
with pytest.raises(ua.UaStatusCodeError):
await admin_client.uaclient.protocol.send_request(request)
@pytest.mark.asyncio
async def test_objects_anonymous(server, client):
objects = client.get_objects_node()
with pytest.raises(ua.UaStatusCodeError):
objects.set_attribute(ua.AttributeIds.WriteMask, ua.DataValue(999))
with pytest.raises(ua.UaStatusCodeError):
f = objects.add_folder(3, 'MyFolder')
@pytest.mark.asyncio
async def test_folder_anonymous(server, client):
objects = client.get_objects_node()
f = objects.add_folder(3, 'MyFolderRO')
f_ro = client.get_node(f.nodeid)
assert f == f_ro
with pytest.raises(ua.UaStatusCodeError):
f2 = f_ro.add_folder(3, 'MyFolder2')
@pytest.mark.asyncio
async def test_variable_anonymous(server, admin_client, client):
objects = admin_client.get_objects_node()
v = objects.add_variable(3, 'MyROVariable', 6)
v.set_value(4) # this should work
v_ro = client.get_node(v.nodeid)
with pytest.raises(ua.UaStatusCodeError):
v_ro.set_value(2)
assert await v_ro.get_value() == 4
v.set_writable(True)
v_ro.set_value(2) # now it should work
assert await v_ro.get_value() == 2
v.set_writable(False)
with pytest.raises(ua.UaStatusCodeError):
v_ro.set_value(9)
assert await v_ro.get_value() == 2
@pytest.mark.asyncio
async def test_context_manager(server):
"""Context manager calls connect() and disconnect()"""
state = [0]
def increment_state(*args, **kwargs):
state[0] += 1
# create client and replace instance methods with dummy methods
client = Client('opc.tcp://dummy_address:10000')
client.connect = increment_state.__get__(client)
client.disconnect = increment_state.__get__(client)
assert state[0] == 0
with client:
# test if client connected
assert state[0] == 1
# test if client disconnected
assert state[0] == 2
@pytest.mark.asyncio
async def test_enumstrings_getvalue(server, client):
"""
The real exception is server side, but is detected by using a client.
All due the server trace is also visible on the console.
The client only 'sees' an TimeoutError
"""
nenumstrings = client.get_node(ua.ObjectIds.AxisScaleEnumeration_EnumStrings)
value = ua.Variant(nenumstrings.get_value())
import unittest
import logging
import sys
sys.path.insert(0, "..")
sys.path.insert(0, ".")
from tests_cmd_lines import TestCmdLines
from tests_server import TestServer, TestServerCaching, TestServerStartError
from tests_client import TestClient
from tests_standard_address_space import StandardAddressSpaceTests
from tests_unit import TestUnit, TestMaskEnum
from tests_history import TestHistory, TestHistorySQL, TestHistoryLimits, TestHistorySQLLimits
from tests_crypto_connect import TestCryptoConnect
from tests_uaerrors import TestUaErrors
if __name__ == '__main__':
logging.basicConfig(level=logging.WARNING)
#l = logging.getLogger("opcua.server.internal_subscription")
#l.setLevel(logging.DEBUG)
#l = logging.getLogger("opcua.server.internal_server")
#l.setLevel(logging.DEBUG)
unittest.main(verbosity=3)
import unittest
from opcua import Client
from opcua import Server
from opcua import ua
from tests_subscriptions import SubscriptionTests
from tests_common import CommonTests, add_server_methods
from tests_xml import XmlTests
port_num1 = 48510
class TestClient(unittest.TestCase, CommonTests, SubscriptionTests, XmlTests):
'''
Run common tests on client side
Of course we need a server so we start also start a server
Tests that can only be run on client side must be defined in this class
'''
@classmethod
def setUpClass(cls):
# start our own server
cls.srv = Server()
cls.srv.set_endpoint('opc.tcp://127.0.0.1:{0:d}'.format(port_num1))
add_server_methods(cls.srv)
cls.srv.start()
# start admin client
# long timeout since travis (automated testing) can be really slow
cls.clt = Client('opc.tcp://admin@127.0.0.1:{0:d}'.format(port_num1), timeout=10)
cls.clt.connect()
cls.opc = cls.clt
# start anonymous client
cls.ro_clt = Client('opc.tcp://127.0.0.1:{0:d}'.format(port_num1))
cls.ro_clt.connect()
@classmethod
def tearDownClass(cls):
#stop our clients
cls.ro_clt.disconnect()
cls.clt.disconnect()
# stop the server
cls.srv.stop()
def test_service_fault(self):
request = ua.ReadRequest()
request.TypeId = ua.FourByteNodeId(999) # bad type!
with self.assertRaises(ua.UaStatusCodeError):
self.clt.uaclient._uasocket.send_request(request)
def test_objects_anonymous(self):
objects = self.ro_clt.get_objects_node()
with self.assertRaises(ua.UaStatusCodeError):
objects.set_attribute(ua.AttributeIds.WriteMask, ua.DataValue(999))
with self.assertRaises(ua.UaStatusCodeError):
f = objects.add_folder(3, 'MyFolder')
def test_folder_anonymous(self):
objects = self.clt.get_objects_node()
f = objects.add_folder(3, 'MyFolderRO')
f_ro = self.ro_clt.get_node(f.nodeid)
self.assertEqual(f, f_ro)
with self.assertRaises(ua.UaStatusCodeError):
f2 = f_ro.add_folder(3, 'MyFolder2')
def test_variable_anonymous(self):
objects = self.clt.get_objects_node()
v = objects.add_variable(3, 'MyROVariable', 6)
v.set_value(4) #this should work
v_ro = self.ro_clt.get_node(v.nodeid)
with self.assertRaises(ua.UaStatusCodeError):
v_ro.set_value(2)
self.assertEqual(v_ro.get_value(), 4)
v.set_writable(True)
v_ro.set_value(2) #now it should work
self.assertEqual(v_ro.get_value(), 2)
v.set_writable(False)
with self.assertRaises(ua.UaStatusCodeError):
v_ro.set_value(9)
self.assertEqual(v_ro.get_value(), 2)
def test_context_manager(self):
""" Context manager calls connect() and disconnect()
"""
state = [0]
def increment_state(self, *args, **kwargs):
state[0] += 1
# create client and replace instance methods with dummy methods
client = Client('opc.tcp://dummy_address:10000')
client.connect = increment_state.__get__(client)
client.disconnect = increment_state.__get__(client)
assert state[0] == 0
with client:
# test if client connected
self.assertEqual(state[0], 1)
# test if client disconnected
self.assertEqual(state[0], 2)
def test_enumstrings_getvalue(self):
''' The real exception is server side, but is detected by using a client.
Alldue the server trace is also visible on the console.
The client only 'sees' an TimeoutError
'''
nenumstrings = self.opc.get_node(ua.ObjectIds.AxisScaleEnumeration_EnumStrings)
with self.assertNotRaises(Exception):
value = ua.Variant(nenumstrings.get_value())
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment