Commit 7b07e113 authored by oroulet's avatar oroulet Committed by GitHub

Merge pull request #15 from FreeOpcUa/testing

update some examples, make them work and add several missing async methods
parents 7b2538d0 9f337fbb
language: python
python:
- "2.7"
- "3.4"
- "pypy"
- "3.6"
# command to install dependencies
install:
- pip install python-dateutil
- pip install pytz
- pip install lxml
- if [[ $TRAVIS_PYTHON_VERSION == '3.4' ]]; then pip install cryptography ; fi
- if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then pip install futures ; fi
- if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then pip install cryptography ; fi
- if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then pip install trollius ; fi
- if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then pip install enum34 ; fi
#- if [[ $TRAVIS_PYTHON_VERSION == 'pypy3' ]]; then pip install cryptography ; fi
- if [[ $TRAVIS_PYTHON_VERSION == 'pypy' ]]; then pip install futures ; fi
- if [[ $TRAVIS_PYTHON_VERSION == 'pypy' ]]; then pip install trollius ; fi
- if [[ $TRAVIS_PYTHON_VERSION == 'pypy' ]]; then pip install enum34 ; fi
- pip install aiofiles
- pip install asyncio-contextmanager
- pip install pytest
- pip install pytest-asyncio
- pip install cryptography
# command to run tests
script: ./run-tests.sh
script: pytest -v
......@@ -24,9 +24,8 @@ class SubHandler(object):
print("New event", event)
async def task(loop):
url = "opc.tcp://commsvr.com:51234/UA/CAS_UA_Server"
# url = "opc.tcp://localhost:4840/freeopcua/server/"
async def run():
url = "opc.tcp://localhost:4840/freeopcua/server/"
try:
async with Client(url=url) as client:
root = client.get_root_node()
......@@ -37,6 +36,9 @@ async def task(loop):
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger.info("Children of root are: %r", await root.get_children())
uri = "http://examples.freeopcua.github.io"
idx = await client.get_namespace_index(uri)
_logger.info("index of our namespace is %s", idx)
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
#var = client.get_node("ns=3;i=2002")
......@@ -63,18 +65,14 @@ async def task(loop):
# await sub.delete()
# calling a method on server
res = obj.call_method("2:multiply", 3, "klk")
res = await obj.call_method("2:multiply", 3, "klk")
_logger.info("method result is: %r", res)
except Exception:
_logger.exception('error')
def main():
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.close()
loop.run_until_complete(run())
if __name__ == "__main__":
main()
import asyncio
import sys
sys.path.insert(0, "..")
import logging
from opcua import Client, Node, ua
......@@ -6,35 +8,7 @@ logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('opcua')
async def browse_nodes(node: Node):
"""
Build a nested node tree dict by recursion (filtered by OPC UA objects and variables).
"""
node_class = await node.get_node_class()
children = []
for child in await node.get_children():
if await child.get_node_class() in [ua.NodeClass.Object, ua.NodeClass.Variable]:
children.append(
await browse_nodes(child)
)
if node_class != ua.NodeClass.Variable:
var_type = None
else:
try:
var_type = (await node.get_data_type_as_variant_type()).value
except ua.UaError:
_logger.warning('Node Variable Type could not be determined for %r', node)
var_type = None
return {
'id': node.nodeid.to_string(),
'name': (await node.get_display_name()).Text,
'cls': node_class.value,
'children': children,
'type': var_type,
}
async def task(loop):
async def main():
# url = 'opc.tcp://192.168.2.64:4840'
url = 'opc.tcp://localhost:4840/freeopcua/server/'
# url = 'opc.tcp://commsvr.com:51234/UA/CAS_UA_Server'
......@@ -47,9 +21,13 @@ async def task(loop):
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger.info('Children of root are: %r', await root.get_children())
uri = 'http://examples.freeopcua.github.io'
idx = await client.get_namespace_index(uri)
# get a specific node knowing its node id
# var = client.get_node(ua.NodeId(1002, 2))
# var = client.get_node("ns=3;i=2002")
var = await root.get_child(["0:Objects", f"{idx}:MyObject", f"{idx}:MyVariable"])
print("My variable", var, await var.get_value())
# print(var)
# var.get_data_value() # get value of node as a DataValue object
# var.get_value() # get value of node as a python builtin
......@@ -57,18 +35,12 @@ async def task(loop):
# var.set_value(3.9) # set node value using implicit data type
# Now getting a variable node using its browse path
tree = await browse_nodes(client.get_objects_node())
_logger.info('Node tree: %r', tree)
except Exception:
_logger.exception('error')
def main():
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.run_until_complete(main())
loop.close()
if __name__ == '__main__':
main()
import sys
sys.path.insert(0, "..")
import os
# os.environ['PYOPCUA_NO_TYPO_CHECK'] = 'True'
......@@ -16,35 +18,26 @@ class SubscriptionHandler:
_logger.info('datachange_notification %r %s', node, val)
async def task(loop):
async def main():
url = 'opc.tcp://localhost:4840/freeopcua/server/'
client = Client(url=url)
client.set_user('test')
client.set_password('test')
# client.set_security_string()
async with client:
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root = client.get_root_node()
_logger.info('Objects node is: %r', root)
uri = 'http://examples.freeopcua.github.io'
idx = await client.get_namespace_index(uri)
var = await client.nodes.objects.get_child([f"{idx}:MyObject", f"{idx}:MyVariable"])
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger.info('Children of root are: %r', await root.get_children())
handler = SubscriptionHandler()
subscription = await client.create_subscription(500, handler)
nodes = [
client.get_node('ns=1;i=6'),
var,
client.get_node(ua.ObjectIds.Server_ServerStatus_CurrentTime),
]
await subscription.subscribe_data_change(nodes)
await asyncio.sleep(10)
def main():
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.run_until_complete(main())
loop.close()
if __name__ == "__main__":
main()
import sys
import asyncio
sys.path.insert(0, "..")
import time
import logging
from opcua import Client
......@@ -10,7 +10,7 @@ from opcua import ua
class SubHandler(object):
"""
Client to subscription. It will receive events from server
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
......@@ -20,20 +20,15 @@ class SubHandler(object):
print("Python: New event", event)
if __name__ == "__main__":
#from IPython import embed
logging.basicConfig(level=logging.DEBUG)
client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
#client = Client("opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/")
#client.set_security_string("Basic256Sha256,SignAndEncrypt,certificate-example.der,private-key-example.pem")
try:
client.connect()
root = client.get_root_node()
print("Root is", root)
print("childs of root are: ", root.get_children())
print("name of root is", root.get_browse_name())
objects = client.get_objects_node()
print("childs og objects are: ", objects.get_children())
async def main():
url = "opc.tcp://localhost:53530/OPCUA/SimulationServer/"
# url = "opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/"
async with Client(url=url) as client:
print("Root children are", await client.nodes.root.get_children())
myfloat = client.get_node("ns=4;s=Float")
mydouble = client.get_node("ns=4;s=Double")
myint64 = client.get_node("ns=4;s=Int64")
......@@ -43,25 +38,38 @@ if __name__ == "__main__":
var = client.get_node(ua.NodeId("Random1", 5))
print("var is: ", var)
print("value of var is: ", var.get_value())
var.set_value(ua.Variant([23], ua.VariantType.Double))
print("value of var is: ", await var.get_value())
await var.set_value(ua.Variant([23], ua.VariantType.Double))
print("setting float value")
myfloat.set_value(ua.Variant(1.234, ua.VariantType.Float))
print("reading float value: ", myfloat.get_value())
await myfloat.set_value(ua.Variant(1.234, ua.VariantType.Float))
print("reading float value: ", await myfloat.get_value())
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_data_change(var)
device = objects.get_child(["2:MyObjects", "2:MyDevice"])
method = device.get_child("2:MyMethod")
result = device.call_method(method, ua.Variant("sin"), ua.Variant(180, ua.VariantType.Double))
device = await client.nodes.objects.get_child(["2:MyObjects", "2:MyDevice"])
method = await device.get_child("2:MyMethod")
result = await device.call_method(method, ua.Variant("sin"), ua.Variant(180, ua.VariantType.Double))
print("Mehtod result is: ", result)
#embed()
time.sleep(3)
sub.unsubscribe(handle)
sub.delete()
#client.close_session()
finally:
client.disconnect()
handler = SubHandler()
sub = await client.create_subscription(500, handler)
handle = await sub.subscribe_data_change(var)
handle2 = await sub.subscribe_events(evtypes=2788)
cond = await client.nodes.root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "0:ConditionType"])
for _ in range(5):
# refresh server condition to force generation of events
await cond.call_method("0:ConditionRefresh", ua.Variant(sub.subscription_id, ua.VariantType.UInt32))
await asyncio.sleep(1)
await sub.unsubscribe(handle)
await sub.unsubscribe(handle2)
await sub.delete()
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()
import sys
sys.path.insert(0, "..")
import logging
from opcua import Client
from opcua import ua
from IPython import embed
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val)
def event_notification(self, event):
print("Python: New event", event.EventType)
if __name__ == "__main__":
#from IPython import embed
logging.basicConfig(level=logging.WARN)
client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
#client = Client("opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/")
try:
client.connect()
root = client.get_root_node()
print("Root is", root)
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_events(evtype=2788)
# refresh server condition to force generation of events
cond = root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "0:ConditionType"])
cond.call_method("0:ConditionRefresh", ua.Variant(sub.subscription_id, ua.VariantType.UInt32))
embed()
finally:
client.disconnect()
from threading import Thread
import asyncio
import copy
import logging
from datetime import datetime
......@@ -7,17 +7,6 @@ from math import sin
import sys
sys.path.insert(0, "..")
try:
from IPython import embed
except ImportError:
import code
def embed():
myvars = globals()
myvars.update(locals())
shell = code.InteractiveConsole(myvars)
shell.interact()
from opcua import ua, uamethod, Server
......@@ -53,9 +42,9 @@ def multiply(parent, x, y):
return x * y
if __name__ == "__main__":
async def main():
# optional: setup logging
logging.basicConfig(level=logging.WARN)
logging.basicConfig(level=logging.INFO)
#logger = logging.getLogger("opcua.address_space")
# logger.setLevel(logging.DEBUG)
#logger = logging.getLogger("opcua.internal_server")
......@@ -67,6 +56,7 @@ if __name__ == "__main__":
# now setup our server
server = Server()
await server.init()
#server.disable_clock()
#server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
......@@ -79,48 +69,48 @@ if __name__ == "__main__":
# setup our own namespace
uri = "http://examples.freeopcua.github.io"
idx = server.register_namespace(uri)
idx = await server.register_namespace(uri)
# create a new node type we can instantiate in our address space
dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")
dev.add_variable(0, "sensor1", 1.0).set_modelling_rule(True)
dev.add_property(0, "device_id", "0340").set_modelling_rule(True)
ctrl = dev.add_object(0, "controller")
ctrl.set_modelling_rule(True)
ctrl.add_property(0, "state", "Idle").set_modelling_rule(True)
dev = await server.nodes.base_object_type.add_object_type(idx, "MyDevice")
await (await dev.add_variable(idx, "sensor1", 1.0)).set_modelling_rule(True)
await (await dev.add_property(idx, "device_id", "0340")).set_modelling_rule(True)
ctrl = await dev.add_object(idx, "controller")
await ctrl.set_modelling_rule(True)
await (await ctrl.add_property(idx, "state", "Idle")).set_modelling_rule(True)
# populating our address space
# First a folder to organise our nodes
myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
myfolder = await server.nodes.objects.add_folder(idx, "myEmptyFolder")
# instanciate one instance of our device
mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
mydevice_var = mydevice.get_child(["0:controller", "0:state"]) # get proxy to our device state variable
mydevice = await server.nodes.objects.add_object(idx, "Device0001", dev)
mydevice_var = await mydevice.get_child([f"{idx}:controller", f"{idx}:state"]) # get proxy to our device state variable
# create directly some objects and variables
myobj = server.nodes.objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", 6.7)
myvar.set_writable() # Set MyVariable to be writable by clients
mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
mystringvar.set_writable() # Set MyVariable to be writable by clients
mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
mydtvar.set_writable() # Set MyVariable to be writable by clients
myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
myprop = myobj.add_property(idx, "myproperty", "I am a property")
mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
myobj = await server.nodes.objects.add_object(idx, "MyObject")
myvar = await myobj.add_variable(idx, "MyVariable", 6.7)
await myvar.set_writable() # Set MyVariable to be writable by clients
mystringvar = await myobj.add_variable(idx, "MyStringVariable", "Really nice string")
await mystringvar.set_writable() # Set MyVariable to be writable by clients
mydtvar = await myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
await mydtvar.set_writable() # Set MyVariable to be writable by clients
myarrayvar = await myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
myarrayvar = await myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
myprop = await myobj.add_property(idx, "myproperty", "I am a property")
mymethod = await myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
multiply_node = await myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
# import some nodes from xml
server.import_xml("custom_nodes.xml")
await server.import_xml("custom_nodes.xml")
# creating a default event object
# The event object automatically will have members for all events properties
# you probably want to create a custom event type, see other examples
myevgen = server.get_event_generator()
myevgen = await server.get_event_generator()
myevgen.event.Severity = 300
# starting!
server.start()
await server.start()
print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
try:
# enable following if you want to subscribe to nodes on server side
......@@ -128,14 +118,22 @@ if __name__ == "__main__":
#sub = server.create_subscription(500, handler)
#handle = sub.subscribe_data_change(myvar)
# trigger event, all subscribed clients wil receive it
var = myarrayvar.get_value() # return a ref to value in db server side! not a copy!
var = await myarrayvar.get_value() # return a ref to value in db server side! not a copy!
var = copy.copy(var) # WARNING: we need to copy before writting again otherwise no data change event will be generated
var.append(9.3)
myarrayvar.set_value(var)
mydevice_var.set_value("Running")
await myarrayvar.set_value(var)
await mydevice_var.set_value("Running")
myevgen.trigger(message="This is BaseEvent")
server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9)) # Server side write method which is a but faster than using set_value
while True:
await asyncio.sleep(1)
embed()
finally:
server.stop()
await server.stop()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
import sys
sys.path.insert(0, "..")
import logging
import asyncio
from opcua import ua, Server
from opcua.common.methods import uamethod
......@@ -9,11 +12,11 @@ def func(parent, value):
return value * 2
async def task(loop):
async def main():
# setup our server
server = Server()
await server.init()
server.set_endpoint('opc.tcp://127.0.0.1:8080/freeopcua/server/') #4840
server.set_endpoint('opc.tcp://localhost:4840/freeopcua/server/') #4840
# setup our own namespace, not really necessary but should as spec
uri = 'http://examples.freeopcua.github.io'
idx = await server.register_namespace(uri)
......@@ -34,17 +37,18 @@ async def task(loop):
async with server:
count = 0
while True:
await asyncio.sleep(1000)
print("UPDATE")
await asyncio.sleep(1)
count += 0.1
await myvar.set_value(count)
def main():
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.run_until_complete(main())
loop.close()
if __name__ == '__main__':
main()
......@@ -59,6 +59,7 @@ class Client(object):
self.nodes = Shortcuts(self.uaclient)
self.max_messagesize = 0 # No limits
self.max_chunkcount = 0 # No limits
self._renew_channel_task = None
async def __aenter__(self):
await self.connect()
......@@ -233,7 +234,6 @@ class Client(object):
Send OPC-UA hello to server
"""
ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
# TODO: Handle ua.UaError
if isinstance(ack, ua.UaStatusCodeError):
raise ack
......@@ -338,36 +338,26 @@ class Client(object):
self._policy_ids = ep.UserIdentityTokens
# Actual maximum number of milliseconds that a Session shall remain open without activity
self.session_timeout = response.RevisedSessionTimeout
self._schedule_renew_session()
# ToDo: subscribe to ServerStatus
"""
The preferred mechanism for a Client to monitor the connection status is through the keep-alive of the
Subscription. A Client should subscribe for the State Variable in the ServerStatus to detect shutdown or other
failure states. If no Subscription is created or the Server does not support Subscriptions,
the connection can be monitored by periodically reading the State Variable
"""
self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
return response
def _schedule_renew_session(self, renew_session: bool=False):
# if the session was intentionally closed `session_timeout` will be None
if renew_session and self.session_timeout:
self.loop.create_task(self._renew_session())
self.loop.call_later(
# 0.7 is from spec
min(self.session_timeout, self.secure_channel_timeout) * 0.7,
self._schedule_renew_session, True
)
async def _renew_session(self):
async def _renew_channel_loop(self):
"""
Renew the SecureChannel before the SessionTimeout will happen.
ToDo: shouldn't this only be done if there was no session activity?
In theory we could do that only if no session activity
but it does not cost much..
"""
server_state = self.get_node(ua.FourByteNodeId(ua.ObjectIds.Server_ServerStatus_State))
self.logger.debug("renewing channel")
await self.open_secure_channel(renew=True)
val = await server_state.get_value()
self.logger.debug("server state is: %s ", val)
try:
duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7
while True:
# 0.7 is from spec
await asyncio.sleep(duration)
self.logger.debug("renewing channel")
await self.open_secure_channel(renew=True)
val = await self.nodes.server_state.get_value()
self.logger.debug("server state is: %s ", val)
except asyncio.CancelledError:
pass
def server_policy_id(self, token_type, default):
"""
......@@ -458,12 +448,13 @@ class Client(object):
data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
return data, uri
def close_session(self) -> Coroutine:
async def close_session(self) -> Coroutine:
"""
Close session
"""
self.session_timeout = None
return self.uaclient.close_session(True)
self._renew_channel_task.cancel()
await self._renew_channel_task
return await self.uaclient.close_session(True)
def get_root_node(self):
return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
......
......@@ -137,7 +137,7 @@ class UASocketProtocol(asyncio.Protocol):
def _call_callback(self, request_id, body):
future = self._callbackmap.pop(request_id, None)
if future is None:
raise ua.UaError("No future object found for request: {0}, callbacks in list are {1}".format(
raise ua.UaError("No request found for requestid: {0}, callbacks in list are {1}".format(
request_id, self._callbackmap.keys()))
future.set_result(body)
......@@ -183,6 +183,7 @@ class UASocketProtocol(asyncio.Protocol):
self._connection.set_channel(response.Parameters)
return response.Parameters
async def close_secure_channel(self):
"""
Close secure channel.
......@@ -217,6 +218,8 @@ class UaClient:
self._timeout = timeout
self.security_policy = ua.SecurityPolicy()
self.protocol: UASocketProtocol = None
self._sub_cond = asyncio.Condition()
self._sub_data_queue = []
def set_security(self, policy: ua.SecurityPolicy):
self.security_policy = policy
......@@ -407,9 +410,10 @@ class UaClient:
self.logger.info("create_subscription")
request = ua.CreateSubscriptionRequest()
request.Parameters = params
data = await self.protocol.send_request(request)
response = struct_from_binary(
ua.CreateSubscriptionResponse,
await self.protocol.send_request(request)
data
)
self.logger.info("create subscription callback")
self.logger.debug(response)
......@@ -421,9 +425,10 @@ class UaClient:
self.logger.info("delete_subscription")
request = ua.DeleteSubscriptionsRequest()
request.Parameters.SubscriptionIds = subscription_ids
data = await self.protocol.send_request(request)
response = struct_from_binary(
ua.DeleteSubscriptionsResponse,
await self.protocol.send_request(request)
data
)
self.logger.info("delete subscriptions callback")
self.logger.debug(response)
......@@ -438,13 +443,36 @@ class UaClient:
acks = []
request = ua.PublishRequest()
request.Parameters.SubscriptionAcknowledgements = acks
data = await self.protocol.send_request(request, timeout=0)
# check if answer looks ok
await self.protocol.send_request(request, self._sub_data_received, timeout=0)
def _sub_data_received(self, future):
data = future.result()
self.loop.create_task(self._call_publish_callback(data))
"""
def _sub_data_received(self, future):
data = future.result()
self.loop.create_task(self._enqueue_sub_data(data))
async def _enqueue_sub_data(self, data):
self._sub_data_queue.append(data)
with self._sub_cond:
self._sub_cond.notify()
async def _subscribtion_loop(self):
while True:
async with self._sub_cond:
await self._sub_cond.wait()
data = self._sub_data_queue.pop(0)
await self._call_publish_callback(data)
"""
async def _call_publish_callback(self, data):
self.logger.info("call_publish_callback")
try:
self.protocol.check_answer(data, "while waiting for publish response")
except BadTimeout:
# Spec Part 4, 7.28
self.loop.create_task(self.publish())
await self.publish()
return
except BadNoSubscription: # Spec Part 5, 13.8.1
# BadNoSubscription is expected after deleting the last subscription.
......@@ -472,7 +500,7 @@ class UaClient:
# does so it stays in, doesn't seem to hurt.
self.logger.exception("Error parsing notification from server")
# send publish request ot server so he does stop sending notifications
self.loop.create_task(self.publish([]))
await self.publish([])
return
# look for callback
try:
......@@ -482,7 +510,7 @@ class UaClient:
return
# do callback
try:
callback(response.Parameters)
await callback(response.Parameters)
except Exception:
# we call client code, catch everything!
self.logger.exception("Exception while calling user callback: %s")
......
......@@ -654,33 +654,32 @@ class Node:
rule = ua.ObjectIds.ModellingRule_Mandatory if mandatory else ua.ObjectIds.ModellingRule_Optional
await self.add_reference(rule, ua.ObjectIds.HasModellingRule, True, False)
def add_folder(self, nodeid, bname):
return create_folder(self, nodeid, bname)
async def add_folder(self, nodeid, bname):
return await create_folder(self, nodeid, bname)
def add_object(self, nodeid, bname, objecttype=None):
return create_object(self, nodeid, bname, objecttype)
async def add_object(self, nodeid, bname, objecttype=None):
return await create_object(self, nodeid, bname, objecttype)
def add_variable(self, nodeid, bname, val, varianttype=None, datatype=None):
return create_variable(self, nodeid, bname, val, varianttype, datatype)
async def add_variable(self, nodeid, bname, val, varianttype=None, datatype=None):
return await create_variable(self, nodeid, bname, val, varianttype, datatype)
def add_object_type(self, nodeid, bname):
return create_object_type(self, nodeid, bname)
async def add_object_type(self, nodeid, bname):
return await create_object_type(self, nodeid, bname)
def add_variable_type(self, nodeid, bname, datatype):
return create_variable_type(self, nodeid, bname, datatype)
async def add_variable_type(self, nodeid, bname, datatype):
return await create_variable_type(self, nodeid, bname, datatype)
def add_data_type(self, nodeid, bname, description=None):
return create_data_type(self, nodeid, bname, description=None)
async def add_data_type(self, nodeid, bname, description=None):
return await create_data_type(self, nodeid, bname, description=None)
def add_property(self, nodeid, bname, val, varianttype=None, datatype=None):
return create_property(self, nodeid, bname, val, varianttype, datatype)
async def add_property(self, nodeid, bname, val, varianttype=None, datatype=None):
return await create_property(self, nodeid, bname, val, varianttype, datatype)
def add_method(self, *args):
return create_method(self, *args)
async def add_method(self, *args):
return await create_method(self, *args)
def add_reference_type(self, nodeid, bname, symmetric=True, inversename=None):
"""COROUTINE"""
return create_reference_type(self, nodeid, bname, symmetric, inversename)
async def add_reference_type(self, nodeid, bname, symmetric=True, inversename=None):
return await create_reference_type(self, nodeid, bname, symmetric, inversename)
def call_method(self, methodid, *args):
return call_method(self, methodid, *args)
async def call_method(self, methodid, *args):
return await call_method(self, methodid, *args)
......@@ -28,3 +28,4 @@ class Shortcuts(object):
self.namespace_array = Node(server, ObjectIds.Server_NamespaceArray)
self.opc_binary = Node(server, ObjectIds.OPCBinarySchema_TypeSystem)
self.base_structure_type = Node(server, ObjectIds.Structure)
self.server_state = Node(server, ObjectIds.Server_ServerStatus_State)
......@@ -4,7 +4,7 @@ high level interface to subscriptions
import asyncio
import logging
import collections
from typing import Union
import time
from opcua import ua
from .events import Event, get_filter_from_event_type
......@@ -95,7 +95,7 @@ class Subscription:
self.logger.info('Subscription created %s', self.subscription_id)
# Send a publish request so the server has one in its queue
# Servers should always be able to handle at least on extra publish request per subscriptions
self.loop.create_task(self.server.publish())
await self.server.publish()
async def delete(self):
"""
......@@ -104,10 +104,10 @@ class Subscription:
results = await self.server.delete_subscriptions([self.subscription_id])
results[0].check()
def publish_callback(self, publishresult: ua.PublishResult):
async def publish_callback(self, publishresult: ua.PublishResult):
self.logger.info("Publish callback called with result: %s", publishresult)
while self.subscription_id is None:
time.sleep(0.01)
await asyncio.sleep(0.01)
if publishresult.NotificationMessage.NotificationData is not None:
for notif in publishresult.NotificationMessage.NotificationData:
......@@ -125,7 +125,7 @@ class Subscription:
ack = ua.SubscriptionAcknowledgement()
ack.SubscriptionId = self.subscription_id
ack.SequenceNumber = publishresult.NotificationMessage.SequenceNumber
self.loop.create_task(self.server.publish([ack]))
await self.server.publish([ack])
def _call_datachange(self, datachange: ua.DataChangeNotification):
for item in datachange.MonitoredItems:
......
......@@ -220,8 +220,8 @@ class NodeManagementService(object):
return result
if item.ParentNodeId.is_null():
self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
#self.logger.info("add_node: while adding node %s, requested parent node is null %s %s",
#item.RequestedNewNodeId, item.ParentNodeId, item.ParentNodeId.is_null())
if check:
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadParentNodeIdInvalid)
return result
......
......@@ -23,11 +23,12 @@ class OPCUAProtocol(asyncio.Protocol):
self.peer_name = None
self.transport = None
self.processor = None
self.receive_buffer = b''
self._buffer = b''
self.iserver = iserver
self.policies = policies
self.clients = clients
self.messages = asyncio.Queue()
self._task = None
def __str__(self):
return 'OPCUAProtocol({}, {})'.format(self.peer_name, self.processor.session)
......@@ -42,7 +43,7 @@ class OPCUAProtocol(asyncio.Protocol):
self.processor.set_policies(self.policies)
self.iserver.asyncio_transports.append(transport)
self.clients.append(self)
self.loop.create_task(self._process_received_message())
self._task = self.loop.create_task(self._process_received_message_loop())
def connection_lost(self, ex):
logger.info('Lost connection from %s, %s', self.peer_name, ex)
......@@ -52,34 +53,30 @@ class OPCUAProtocol(asyncio.Protocol):
if self in self.clients:
self.clients.remove(self)
self.messages.put_nowait((None, None))
self._task.cancel()
def data_received(self, data):
if self.receive_buffer:
data = self.receive_buffer + data
self.receive_buffer = b''
self._process_received_data(data)
def _process_received_data(self, data: Union[bytes, Buffer]):
buf = Buffer(data) if type(data) is bytes else data
try:
# try to parse the incoming data
self._buffer += data
# try to parse the incoming data
while len(self._buffer) > 0:
try:
header = header_from_binary(buf)
except NotEnoughData:
logger.debug('Not enough data while parsing header from client, waiting for more')
self.receive_buffer = data + self.receive_buffer
buf = Buffer(self._buffer)
try:
header = header_from_binary(buf)
except NotEnoughData:
logger.debug('Not enough data while parsing header from client, waiting for more')
return
if len(buf) < header.body_size:
logger.debug('We did not receive enough data from client. Need %s got %s', header.body_size, len(buf))
return
# we have a complete message
self.messages.put_nowait((header, buf))
self._buffer = self._buffer[(header.header_size + header.body_size):]
except Exception:
logger.exception('Exception raised while parsing message from client')
return
if len(buf) < header.body_size:
logger.debug('We did not receive enough data from client. Need %s got %s', header.body_size, len(buf))
self.receive_buffer = data + self.receive_buffer
return
# a message has been received
self.messages.put_nowait((header, buf))
except Exception:
logger.exception('Exception raised while parsing message from client')
return
async def _process_received_message(self):
async def _process_received_message_loop(self):
"""
Take message from the queue and try to process it.
"""
......@@ -88,15 +85,18 @@ class OPCUAProtocol(asyncio.Protocol):
if header is None and buf is None:
# Connection was closed, end task
break
logger.debug('_process_received_message %s %s', header.body_size, len(buf))
ret = await self.processor.process(header, buf)
if not ret:
logger.info('processor returned False, we close connection from %s', self.peer_name)
self.transport.close()
return
if len(buf) != 0:
# There is data left in the buffer - process it
self._process_received_data(buf)
try:
await self._process_one_msg(header, buf)
except:
logger.exception()
async def _process_one_msg(self, header, buf):
logger.debug('_process_received_message %s %s', header.body_size, len(buf))
ret = await self.processor.process(header, buf)
if not ret:
logger.info('processor returned False, we close connection from %s', self.peer_name)
self.transport.close()
return
class BinaryServer:
......
......@@ -5,12 +5,11 @@ Can be used on server side or to implement binary/https opc-ua servers
from datetime import datetime, timedelta
from copy import copy
from struct import unpack_from, unpack
from struct import unpack_from
import os
import asyncio
import logging
from enum import Enum
from copy import copy, deepcopy
from urllib.parse import urlparse
from typing import Coroutine
......@@ -318,7 +317,6 @@ class InternalSession:
InternalSession._counter += 1
self.authentication_token = ua.NodeId(self._auth_counter)
InternalSession._auth_counter += 1
self.subscriptions = []
self.logger.info('Created internal session %s', self.name)
def __str__(self):
......@@ -343,9 +341,9 @@ class InternalSession:
return result
async def close_session(self, delete_subs=True):
self.logger.info('close session %s with subscriptions %s', self, self.subscriptions)
self.logger.info('close session %s')
self.state = SessionState.Closed
await self.delete_subscriptions(self.subscriptions[:])
await self.delete_subscriptions(list(self.subscription_service.subscriptions.keys()))
def activate_session(self, params):
self.logger.info('activate session')
......@@ -400,13 +398,12 @@ class InternalSession:
return self.iserver.method_service.call(params)
async def create_subscription(self, params, callback):
result = self.subscription_service.create_subscription(params, callback)
self.subscriptions.append(result.SubscriptionId)
result = await self.subscription_service.create_subscription(params, callback)
return result
async def create_monitored_items(self, params):
"""Returns Future"""
subscription_result = self.subscription_service.create_monitored_items(params)
subscription_result = await self.subscription_service.create_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(
CallbackType.ItemSubscriptionCreated, ServerItemCallback(params, subscription_result))
return subscription_result
......@@ -421,18 +418,18 @@ class InternalSession:
return self.subscription_service.republish(params)
async def delete_subscriptions(self, ids):
for i in ids:
if i in self.subscriptions:
self.subscriptions.remove(i)
return self.subscription_service.delete_subscriptions(ids)
# This is an async method, dues to symetry with client code
return await self.subscription_service.delete_subscriptions(ids)
async def delete_monitored_items(self, params):
# This is an async method, dues to symetry with client code
subscription_result = self.subscription_service.delete_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(
CallbackType.ItemSubscriptionDeleted, ServerItemCallback(params, subscription_result))
return subscription_result
async def publish(self, acks=None):
# This is an async method, dues to symetry with client code
if acks is None:
acks = []
return self.subscription_service.publish(acks)
......
......@@ -3,6 +3,7 @@ server side implementation of a subscription object
"""
import logging
import asyncio
from opcua import ua
......@@ -56,7 +57,7 @@ class MonitoredItemService:
def delete_all_monitored_items(self):
self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
def create_monitored_items(self, params):
async def create_monitored_items(self, params):
results = []
for item in params.ItemsToCreate:
# with self._lock:
......@@ -259,34 +260,33 @@ class InternalSubscription:
self._startup = True
self._keep_alive_count = 0
self._publish_cycles_count = 0
self._stopev = False
self._task = None
def __str__(self):
return "Subscription(id:{0})".format(self.data.SubscriptionId)
def start(self):
async def start(self):
self.logger.debug("starting subscription %s", self.data.SubscriptionId)
if self.data.RevisedPublishingInterval > 0.0:
self._subscription_loop()
self._task = self.subservice.loop.create_task(self._subscription_loop())
def stop(self):
async def stop(self):
self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
self._stopev = True
self._task.cancel()
await self._task
self.monitored_item_srv.delete_all_monitored_items()
def _trigger_publish(self):
if not self._stopev and self.data.RevisedPublishingInterval <= 0.0:
self.subservice.loop.call_soon(self.publish_results)
if self._task and self.data.RevisedPublishingInterval <= 0.0:
self.publish_results()
def _subscription_loop(self):
if not self._stopev:
self.subservice.loop.call_later(self.data.RevisedPublishingInterval / 1000.0, self._sub_loop)
def _sub_loop(self):
if self._stopev:
return
self.publish_results()
self._subscription_loop()
async def _subscription_loop(self):
try:
while True:
await asyncio.sleep(self.data.RevisedPublishingInterval / 1000.0)
self.publish_results()
except asyncio.CancelledError:
pass
def has_published_results(self):
if self._startup or self._triggered_datachanges or self._triggered_events:
......@@ -304,14 +304,14 @@ class InternalSubscription:
self, self._publish_cycles_count, self.data.RevisedLifetimeCount)
# FIXME this will never be send since we do not have publish request anyway
self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
self._stopev = True
result = None
if self.has_published_results():
# FIXME: should we pop a publish request here? or we do not care?
self._publish_cycles_count += 1
result = self._pop_publish_result()
if result is not None:
self.callback(result)
self.subservice.loop.create_task(self.callback(result))
#await self.callback(result)
def _pop_publish_result(self):
result = ua.PublishResult()
......@@ -354,7 +354,6 @@ class InternalSubscription:
def publish(self, acks):
self.logger.info("publish request with acks %s", acks)
self._publish_cycles_count = 0
for nb in acks:
self._not_acknowledged_results.pop(nb, None)
......
......@@ -2,6 +2,7 @@
server side implementation of subscription service
"""
import asyncio
import logging
from opcua import ua
......@@ -22,7 +23,7 @@ class SubscriptionService:
self.subscriptions = {}
self._sub_id_counter = 77
def create_subscription(self, params, callback):
async def create_subscription(self, params, callback):
self.logger.info("create subscription with callback: %s", callback)
result = ua.CreateSubscriptionResult()
result.RevisedPublishingInterval = params.RequestedPublishingInterval
......@@ -32,33 +33,32 @@ class SubscriptionService:
result.SubscriptionId = self._sub_id_counter
sub = InternalSubscription(self, result, self.aspace, callback)
sub.start()
await sub.start()
self.subscriptions[result.SubscriptionId] = sub
return result
def delete_subscriptions(self, ids):
async def delete_subscriptions(self, ids):
self.logger.info("delete subscriptions: %s", ids)
res = []
existing_subs = []
for i in ids:
#with self._lock:
if i not in self.subscriptions:
sub = self.subscriptions.pop(i, None)
if sub is None:
res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
else:
sub = self.subscriptions.pop(i)
sub.stop()
#await sub.stop()
existing_subs.append(sub)
res.append(ua.StatusCode())
await asyncio.gather(*[sub.stop() for sub in existing_subs])
return res
def publish(self, acks):
self.logger.info("publish request with acks %s", acks)
#with self._lock:
for subid, sub in self.subscriptions.items():
sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
def create_monitored_items(self, params):
async def create_monitored_items(self, params):
self.logger.info("create monitored items")
#with self._lock:
if params.SubscriptionId not in self.subscriptions:
res = []
for _ in params.ItemsToCreate:
......@@ -66,11 +66,10 @@ class SubscriptionService:
response.StatusCode = ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
res.append(response)
return res
return self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
return await self.subscriptions[params.SubscriptionId].monitored_item_srv.create_monitored_items(params)
def modify_monitored_items(self, params):
self.logger.info("modify monitored items")
#with self._lock:
if params.SubscriptionId not in self.subscriptions:
res = []
for _ in params.ItemsToModify:
......@@ -82,7 +81,6 @@ class SubscriptionService:
def delete_monitored_items(self, params):
self.logger.info("delete monitored items")
#with self._lock:
if params.SubscriptionId not in self.subscriptions:
res = []
for _ in params.MonitoredItemIds:
......@@ -99,6 +97,5 @@ class SubscriptionService:
return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
def trigger_event(self, event):
#with self._lock:
for sub in self.subscriptions.values():
sub.monitored_item_srv.trigger_event(event)
......@@ -510,6 +510,7 @@ def header_from_binary(data):
if hdr.MessageType in (ua.MessageType.SecureOpen, ua.MessageType.SecureClose, ua.MessageType.SecureMessage):
hdr.body_size -= 4
hdr.ChannelId = Primitives.UInt32.unpack(data)
hdr.header_size = 12
return hdr
......
......@@ -48,6 +48,7 @@ class Header(uatypes.FrozenClass):
self.ChannelId = channelid
self.body_size = 0
self.packet_size = 0
self.header_size = 8
self._freeze = True
def add_size(self, size):
......
......@@ -26,6 +26,7 @@ def pytest_generate_tests(metafunc):
def event_loop(request):
"""Create an instance of the default event loop for each test case."""
loop = asyncio.get_event_loop_policy().new_event_loop()
loop.set_debug(True)
yield loop
loop.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment