Commit 263d6f73 authored by oroulet's avatar oroulet

fix sync subscriptions

parent 1edaf6fc
......@@ -71,6 +71,10 @@ class Client:
async def __aexit__(self, exc_type, exc_value, traceback):
await self.disconnect()
def __str__(self):
return f"Client({self.server_url.geturl()})"
__repr__ = __str__
@staticmethod
def find_endpoint(endpoints, security_mode, policy_uri):
"""
......
......@@ -121,6 +121,10 @@ class Server:
async def __aexit__(self, exc_type, exc_value, traceback):
await self.stop()
def __str__(self):
return f"OPC UA Server()"
__repr__ = __str__
async def load_certificate(self, path: str):
"""
load server certificate from file, either pem or der
......
......@@ -92,6 +92,9 @@ def syncmethod(func):
for idx, arg in enumerate(args):
if isinstance(arg, Node):
args[idx] = arg.aio_obj
for k, v in kwargs.items():
if isinstance(v, Node):
kwargs[k] = v.aio_obj
aio_func = getattr(self.aio_obj, func.__name__)
global _tloop
result = _tloop.post(aio_func(*args, **kwargs))
......@@ -108,12 +111,27 @@ def syncmethod(func):
return wrapper
class _SubHandler:
def __init__(self, sync_handler):
self.sync_handler = sync_handler
def datachange_notification(self, node, val, data):
self.sync_handler.datachange_notification(Node(node), val, data)
def event_notification(self, event):
self.sync_handler.event_notification(event)
class Client:
def __init__(self, url: str, timeout: int = 4):
global _tloop
self.aio_obj = client.Client(url, timeout, loop=_tloop.loop)
self.nodes = Shortcuts(self.aio_obj.uaclient)
def __str__(self):
return "Sync" + self.aio_obj.__str__()
__repr__ = __str__
@syncmethod
def connect(self):
pass
......@@ -126,9 +144,10 @@ class Client:
def load_type_definitions(self, nodes=None):
pass
@syncmethod
async def create_subscription(self, period, handler):
pass
def create_subscription(self, period, handler):
coro = self.aio_obj.create_subscription(period, _SubHandler(handler))
aio_sub = _tloop.post(coro)
return Subscription(aio_sub)
@syncmethod
def get_namespace_index(self, url):
......@@ -159,6 +178,10 @@ class Server:
_tloop.post(self.aio_obj.init(shelf_file))
self.nodes = Shortcuts(self.aio_obj.iserver.isession)
def __str__(self):
return "Sync" + self.aio_obj.__str__()
__repr__ = __str__
def __enter__(self):
self.start()
return self
......@@ -190,8 +213,11 @@ class Server:
def stop(self):
pass
def link_method(self, node, callback):
return self.aio_obj.link_method(node, callback)
@syncmethod
async def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
def get_event_generator(self, etype=None, emitting_node=ua.ObjectIds.Server):
pass
def get_node(self, nodeid):
......@@ -201,6 +227,18 @@ class Server:
def import_xml(self, path=None, xmlstring=None):
pass
@syncmethod
def get_namespace_index(self, url):
pass
@syncmethod
def load_enums(self):
pass
@syncmethod
def load_type_definitions(self):
pass
def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
......@@ -222,6 +260,13 @@ class Node:
self.aio_obj = aio_node
global _tloop
def __hash__(self):
return self.aio_obj.__hash__()
def __str__(self):
return "Sync" + self.aio_obj.__str__()
__repr__ = __str__
@property
def nodeid(self):
return self.aio_obj.nodeid
......
import time
from concurrent.futures import Future
import pytest
from asyncua.sync import Client, start_thread_loop, stop_thread_loop, Server, ThreadLoop
from asyncua.sync import Client, Server, ThreadLoop, Node
from asyncua import ua
......@@ -27,7 +28,7 @@ def tloop():
@pytest.fixture
def client(tloop, server):
c = Client("opc.tcp://localhost:8840/freeopcua/server")
c = Client("opc.tcp://admin@localhost:8840/freeopcua/server")
with c:
yield c
......@@ -46,4 +47,35 @@ def test_sync_get_node(client):
nodes = node.get_children()
assert len(nodes) == 2
assert nodes[0] == client.nodes.server
assert isinstance(nodes[0], Node)
class MySubHandler():
def __init__(self):
self.future = Future()
def reset(self):
self.future = Future()
def datachange_notification(self, node, val, data):
self.future.set_result((node, val))
def event_notification(self, event):
self.future.set_result(event)
def test_sync_sub(client):
myhandler = MySubHandler()
sub = client.create_subscription(1, myhandler)
var = client.nodes.objects.add_variable(3, 'SubVar', 0.1)
sub.subscribe_data_change(var)
n, v = myhandler.future.result()
assert v == 0.1
assert n == var
myhandler.reset()
var.set_value(0.123)
n, v = myhandler.future.result()
assert v == 0.123
sub.delete()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment