Commit 057f7ffd authored by Christian Bergmiller's avatar Christian Bergmiller

typings, docs, added tests

parent 027d7ce7
This diff is collapsed.
......@@ -24,7 +24,7 @@ class SubHandler:
class MySubHandler:
More advanced subscription client using Future, so we can wait for events in tests
More advanced subscription client using Future, so we can await events in tests.
def __init__(self):
......@@ -46,14 +46,22 @@ class MySubHandler:
class MySubHandler2:
def __init__(self):
def __init__(self, limit=None):
self.results = []
self.limit = limit
self.done = asyncio.Event()
def check_done(self):
if self.limit and len(self.results) == self.limit and not self.done.is_set():
def datachange_notification(self, node, val, data):
self.results.append((node, val))
def event_notification(self, event):
class MySubHandlerCounter:
......@@ -282,10 +290,14 @@ async def test_subscription_data_change_many(opc):
async def test_subscribe_server_time(opc):
Test the subscription of the `Server_ServerStatus_CurrentTime` objects `Value` attribute.
myhandler = MySubHandler()
server_time_node = opc.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
sub = await opc.opc.create_subscription(200, myhandler)
handle = await sub.subscribe_data_change(server_time_node)
assert type(handle) is int
node, val, data = await myhandler.result()
assert server_time_node == node
delta = datetime.utcnow() - val
......@@ -295,6 +307,9 @@ async def test_subscribe_server_time(opc):
async def test_modify_monitored_item(opc):
Test that the subscription of the `Server_ServerStatus_CurrentTime` object can be modified.
myhandler = MySubHandler()
server_time_node = opc.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
sub = await opc.opc.create_subscription(1000, myhandler)
......@@ -320,6 +335,46 @@ async def test_create_delete_subscription(opc):
await sub.delete()
async def test_unsubscribe_two_objects_simultaneously(opc):
Test the subscription/unsub. of the `CurrentTime` and `Server_ServerStatus_State` objects.
Unsubscribe from both Nodes simultaneously.
handler = MySubHandler2(limit=1)
nodes = [
sub = await opc.opc.create_subscription(200, handler)
handles = await sub.subscribe_data_change(nodes)
await handler.done.wait()
assert handler.results[0][0] == nodes[0]
assert (datetime.utcnow() - handler.results[0][1]) < timedelta(seconds=2)
assert handler.results[1][0] == nodes[1]
assert handler.results[1][1] == 0
await sub.unsubscribe(handles)
await sub.delete()
async def test_unsubscribe_two_objects_consecutively(opc):
Test the subscription/unsub. of the `CurrentTime` and `Server_ServerStatus_State` objects.
Unsubscribe from both Nodes consecutively.
handler = MySubHandler2(limit=2)
nodes = [
sub = await opc.opc.create_subscription(200, handler)
handles = await sub.subscribe_data_change(nodes)
assert type(handles) is list
await handler.done.wait()
for handle in handles:
await sub.unsubscribe(handle)
await sub.delete()
async def test_subscribe_events(opc):
sub = await opc.opc.create_subscription(100, MySubHandler())
handle = await sub.subscribe_events()
......@@ -422,8 +477,8 @@ async def test_events_wrong_source(opc):
async def test_events_CustomEvent(opc):
etype = await opc.server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType,
[('PropertyNum', ua.VariantType.Float),
('PropertyString', ua.VariantType.String)])
[('PropertyNum', ua.VariantType.Float),
('PropertyString', ua.VariantType.String)])
evgen = await opc.server.get_event_generator(etype)
myhandler = MySubHandler()
sub = await opc.opc.create_subscription(100, myhandler)
......@@ -455,8 +510,8 @@ async def test_events_CustomEvent_MyObject(opc):
objects = opc.server.get_objects_node()
o = await objects.add_object(3, 'MyObject')
etype = await opc.server.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType,
[('PropertyNum', ua.VariantType.Float),
('PropertyString', ua.VariantType.String)])
[('PropertyNum', ua.VariantType.Float),
('PropertyString', ua.VariantType.String)])
evgen = await opc.server.get_event_generator(etype, emitting_node=o)
myhandler = MySubHandler()
sub = await opc.opc.create_subscription(100, myhandler)
......@@ -486,12 +541,12 @@ async def test_several_different_events(opc):
objects = opc.server.get_objects_node()
o = await objects.add_object(3, 'MyObject')
etype1 = await opc.server.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType,
[('PropertyNum', ua.VariantType.Float),
('PropertyString', ua.VariantType.String)])
[('PropertyNum', ua.VariantType.Float),
('PropertyString', ua.VariantType.String)])
evgen1 = await opc.server.get_event_generator(etype1, o)
etype2 = await opc.server.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType,
[('PropertyNum', ua.VariantType.Float),
('PropertyString', ua.VariantType.String)])
[('PropertyNum', ua.VariantType.Float),
('PropertyString', ua.VariantType.String)])
evgen2 = await opc.server.get_event_generator(etype2, o)
myhandler = MySubHandler2()
sub = await opc.opc.create_subscription(100, myhandler)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment