Commit 88bbc425 authored by oroulet's avatar oroulet

Allow for asynchronous subscription callbacks

parent c4429f22
......@@ -534,7 +534,10 @@ class UaClient:
)
else:
try:
callback(response.Parameters)
if asyncio.iscoroutinefunction(callback):
await callback(response.Parameters)
else:
callback(response.Parameters)
except Exception: # we call user code, catch everything!
self.logger.exception("Exception while calling user callback: %s")
# Repeat with acknowledgement
......
......@@ -88,7 +88,7 @@ class Subscription:
self.subscription_id = response.SubscriptionId # move to data class
self.logger.info('Subscription created %s', self.subscription_id)
def publish_callback(self, publish_result: ua.PublishResult):
async def publish_callback(self, publish_result: ua.PublishResult):
"""
Handle `PublishResult` callback.
"""
......@@ -96,11 +96,11 @@ class Subscription:
if publish_result.NotificationMessage.NotificationData is not None:
for notif in publish_result.NotificationMessage.NotificationData:
if isinstance(notif, ua.DataChangeNotification):
self._call_datachange(notif)
await self._call_datachange(notif)
elif isinstance(notif, ua.EventNotificationList):
self._call_event(notif)
await self._call_event(notif)
elif isinstance(notif, ua.StatusChangeNotification):
self._call_status(notif)
await self._call_status(notif)
else:
self.logger.warning("Notification type not supported yet for notification %s", notif)
else:
......@@ -113,7 +113,7 @@ class Subscription:
results = await self.server.delete_subscriptions([self.subscription_id])
results[0].check()
def _call_datachange(self, datachange: ua.DataChangeNotification):
async def _call_datachange(self, datachange: ua.DataChangeNotification):
for item in datachange.MonitoredItems:
if item.ClientHandle not in self._monitored_items:
self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
......@@ -122,28 +122,37 @@ class Subscription:
if hasattr(self._handler, "datachange_notification"):
event_data = DataChangeNotif(data, item)
try:
self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
if asyncio.iscoroutinefunction(self._handler.datachange_notification):
await self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
else:
self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data)
except Exception:
self.logger.exception("Exception calling data change handler")
else:
self.logger.error("DataChange subscription created but handler has no datachange_notification method")
def _call_event(self, eventlist: ua.EventNotificationList):
async def _call_event(self, eventlist: ua.EventNotificationList):
for event in eventlist.Events:
data = self._monitored_items[event.ClientHandle]
result = Event.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
result.server_handle = data.server_handle
if hasattr(self._handler, "event_notification"):
try:
self._handler.event_notification(result)
if asyncio.iscoroutinefunction(self._handler.event_notification):
await self._handler.event_notification(result)
else:
self._handler.event_notification(result)
except Exception:
self.logger.exception("Exception calling event handler")
else:
self.logger.error("Event subscription created but handler has no event_notification method")
def _call_status(self, status: ua.StatusChangeNotification):
async def _call_status(self, status: ua.StatusChangeNotification):
try:
self._handler.status_change_notification(status.Status)
if asyncio.iscoroutinefunction(self._handler.status_change_notification):
await self._handler.status_change_notification(status.Status)
else:
self._handler.status_change_notification(status.Status)
except Exception:
self.logger.exception("Exception calling status change handler")
......
......@@ -53,7 +53,7 @@ class AttributeService:
res.append(self._aspace.read_attribute_value(readvalue.NodeId, readvalue.AttributeId))
return res
def write(self, params, user=User.Admin):
async def write(self, params, user=User.Admin):
#self.logger.debug("write %s as user %s", params, user)
res = []
for writevalue in params.NodesToWrite:
......@@ -68,7 +68,7 @@ class AttributeService:
ual.Value.Value, ua.AccessLevel.CurrentWrite):
res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
continue
res.append(self._aspace.write_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
res.append(await self._aspace.write_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
return res
......@@ -671,7 +671,7 @@ class AddressSpace:
return attval.value_callback()
return attval.value
def write_attribute_value(self, nodeid, attr, value):
async def write_attribute_value(self, nodeid, attr, value):
# self.logger.debug("set attr val: %s %s %s", nodeid, attr, value)
node = self._nodes.get(nodeid, None)
if node is None:
......@@ -688,7 +688,7 @@ class AddressSpace:
for k, v in cbs:
try:
v(k, value)
await v(k, value)
except Exception as ex:
self.logger.exception("Error calling datachange callback %s, %s, %s", k, v, ex)
......
......@@ -75,7 +75,7 @@ class EventGenerator:
__repr__ = __str__
def trigger(self, time_attr=None, message=None):
async def trigger(self, time_attr=None, message=None):
"""
Trigger the event. This will send a notification to all subscribed clients
"""
......@@ -100,4 +100,4 @@ class EventGenerator:
elif not self.event.Message:
self.event.Message = ua.LocalizedText(Node(self.isession, self.event.SourceNode).read_browse_name().Text)
self.isession.subscription_service.trigger_event(self.event)
await self.isession.subscription_service.trigger_event(self.event)
......@@ -280,12 +280,12 @@ class InternalServer:
"""
self.server_callback_dispatcher.removeListener(event, handle)
def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
async def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
"""
directly write datavalue to the Attribute, bypassing some checks and structure creation
so it is a little faster
"""
self.aspace.write_attribute_value(nodeid, attr, datavalue)
await self.aspace.write_attribute_value(nodeid, attr, datavalue)
def set_user_manager(self, user_manager):
"""
......
......@@ -100,7 +100,7 @@ class InternalSession:
return self.iserver.history_manager.read_history(params)
async def write(self, params):
return self.iserver.attribute_service.write(params, self.user)
return await self.iserver.attribute_service.write(params, self.user)
async def browse(self, params):
return self.iserver.view_service.browse(params)
......
......@@ -58,13 +58,13 @@ class InternalSubscription:
self._task = None
self.monitored_item_srv.delete_all_monitored_items()
def _trigger_publish(self):
async def _trigger_publish(self):
"""
Trigger immediate publication (if requested by the PublishingInterval).
"""
if not self._task and self.data.RevisedPublishingInterval <= 0.0:
# Publish immediately (as fast as possible)
self.publish_results()
await self.publish_results()
async def _subscription_loop(self):
"""
......@@ -73,7 +73,7 @@ class InternalSubscription:
try:
while True:
await asyncio.sleep(self.data.RevisedPublishingInterval / 1000.0)
self.publish_results()
await self.publish_results()
except asyncio.CancelledError:
self.logger.info('exiting _subscription_loop for %s', self.data.SubscriptionId)
pass
......@@ -91,7 +91,7 @@ class InternalSubscription:
self._keep_alive_count += 1
return False
def publish_results(self):
async def publish_results(self):
"""
Publish all enqueued data changes, events and status changes though the callback.
"""
......@@ -99,7 +99,7 @@ class InternalSubscription:
self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self,
self._publish_cycles_count, self.data.RevisedLifetimeCount)
# FIXME this will never be send since we do not have publish request anyway
self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
await self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
result = None
if self.has_published_results():
if not self.no_acks:
......@@ -110,7 +110,7 @@ class InternalSubscription:
# The callback can be:
# Subscription.publish_callback -> server internal subscription
# UaProcessor.forward_publish_response -> client subscription
self.pub_result_callback(result)
await self.pub_result_callback(result)
def _pop_publish_result(self) -> ua.PublishResult:
"""
......@@ -178,38 +178,38 @@ class InternalSubscription:
self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
return ua.NotificationMessage()
def enqueue_datachange_event(self, mid: int, eventdata: ua.MonitoredItemNotification, maxsize: int):
async def enqueue_datachange_event(self, mid: int, eventdata: ua.MonitoredItemNotification, maxsize: int):
"""
Enqueue a monitored item data change.
:param mid: Monitored Item Id
:param eventdata: Monitored Item Notification
:param maxsize: Max queue size (0: No limit)
"""
self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
await self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
def enqueue_event(self, mid: int, eventdata: ua.EventFieldList, maxsize: int):
async def enqueue_event(self, mid: int, eventdata: ua.EventFieldList, maxsize: int):
"""
Enqueue a event.
:param mid: Monitored Item Id
:param eventdata: Event Field List
:param maxsize: Max queue size (0: No limit)
"""
self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
await self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
def enqueue_statuschange(self, code):
async def enqueue_statuschange(self, code):
"""
Enqueue a status change.
:param code:
"""
self._triggered_statuschanges.append(code)
self._trigger_publish()
await self._trigger_publish()
def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size: int,
async def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size: int,
queue: dict):
if mid not in queue:
# New Monitored Item Id
queue[mid] = [eventdata]
self._trigger_publish()
await self._trigger_publish()
return
if size != 0:
# Limit queue size
......
......@@ -62,7 +62,7 @@ class MonitoredItemService:
if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
result = self._create_events_monitored_item(item)
else:
result = self._create_data_change_monitored_item(item)
result = await self._create_data_change_monitored_item(item)
results.append(result)
return results
......@@ -72,10 +72,10 @@ class MonitoredItemService:
results.append(self._modify_monitored_item(item))
return results
def trigger_datachange(self, handle, nodeid, attr):
async def trigger_datachange(self, handle, nodeid, attr):
self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
dv = self.aspace.read_attribute_value(nodeid, attr)
self.datachange_callback(handle, dv)
await self.datachange_callback(handle, dv)
def _modify_monitored_item(self, params: ua.MonitoredItemModifyRequest):
for mdata in self._monitored_items.values():
......@@ -129,7 +129,7 @@ class MonitoredItemService:
self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
return result
def _create_data_change_monitored_item(self, params: ua.MonitoredItemCreateRequest):
async def _create_data_change_monitored_item(self, params: ua.MonitoredItemCreateRequest):
self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId,
params.ItemToMonitor.AttributeId)
......@@ -144,7 +144,7 @@ class MonitoredItemService:
if result.StatusCode.is_good():
self._monitored_datachange[handle] = result.MonitoredItemId
# force data change event generation
self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
await self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
return result
def delete_monitored_items(self, ids):
......@@ -172,11 +172,11 @@ class MonitoredItemService:
self._monitored_items.pop(mid)
return ua.StatusCode()
def datachange_callback(self, handle: int, value, error=None):
async def datachange_callback(self, handle: int, value, error=None):
if error:
self.logger.info("subscription %s: datachange callback called with handle '%s' and error '%s'", self,
handle, error)
self.trigger_statuschange(error)
await self.trigger_statuschange(error)
else:
#self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self, handle, value.Value)
event = ua.MonitoredItemNotification()
......@@ -190,7 +190,7 @@ class MonitoredItemService:
if deadband_flag_pass:
event.ClientHandle = mdata.client_handle
event.Value = value
self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
await self.isub.enqueue_datachange_event(mid, event, mdata.queue_size)
def deadband_callback(self, values, flt):
if flt.DeadbandType == ua.DeadbandType.None_ or values.get_old_value() is None:
......@@ -203,17 +203,17 @@ class MonitoredItemService:
return True
return False
def trigger_event(self, event):
async def trigger_event(self, event):
if event.emitting_node not in self._monitored_events:
self.logger.debug("%s has NO subscription for events %s from node: %s", self, event, event.emitting_node)
return False
self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.emitting_node)
mids = self._monitored_events[event.emitting_node]
for mid in mids:
self._trigger_event(event, mid)
await self._trigger_event(event, mid)
return True
def _trigger_event(self, event, mid: int):
async def _trigger_event(self, event, mid: int):
if mid not in self._monitored_items:
self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event,
self)
......@@ -225,10 +225,10 @@ class MonitoredItemService:
fieldlist = ua.EventFieldList()
fieldlist.ClientHandle = mdata.client_handle
fieldlist.EventFields = event.to_event_fields(mdata.filter.SelectClauses)
self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
await self.isub.enqueue_event(mid, fieldlist, mdata.queue_size)
def trigger_statuschange(self, code):
self.isub.enqueue_statuschange(code)
async def trigger_statuschange(self, code):
await self.isub.enqueue_statuschange(code)
class WhereClauseEvaluator:
......
......@@ -614,9 +614,9 @@ class Server:
"""
return load_enums(self)
def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
async def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
"""
directly write datavalue to the Attribute, bypasing some checks and structure creation
so it is a little faster
"""
return self.iserver.write_attribute_value(nodeid, datavalue, attr)
return await self.iserver.write_attribute_value(nodeid, datavalue, attr)
......@@ -98,6 +98,6 @@ class SubscriptionService:
return ua.NotificationMessage()
return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
def trigger_event(self, event):
async def trigger_event(self, event):
for sub in self.subscriptions.values():
sub.monitored_item_srv.trigger_event(event)
await sub.monitored_item_srv.trigger_event(event)
......@@ -58,7 +58,7 @@ class UaProcessor:
response.Parameters = channel
self.send_response(request.RequestHeader.RequestHandle, seqhdr, response, ua.MessageType.SecureOpen)
def forward_publish_response(self, result: ua.PublishResult):
async def forward_publish_response(self, result: ua.PublishResult):
"""
Try to send a `PublishResponse` with the given `PublishResult`.
"""
......@@ -372,7 +372,7 @@ class UaProcessor:
if result.SubscriptionId not in self.session.subscription_service.active_subscription_ids:
# Discard the result if the subscription is no longer active
continue
self.forward_publish_response(result)
await self.forward_publish_response(result)
break
self.session.publish(params.SubscriptionAcknowledgements)
#_logger.debug("publish forward to server")
......
......@@ -74,7 +74,7 @@ def syncmethod(func):
if isinstance(result, list) and len(result) > 0 and isinstance(result[0], node.Node):
return [Node(self.tloop, i) for i in result]
if isinstance(result, server.event_generator.EventGenerator):
return EventGenerator(result)
return EventGenerator(self.tloop, result)
if isinstance(result, subscription.Subscription):
return Subscription(self.tloop, result)
return result
......@@ -237,19 +237,20 @@ class Server:
pass
def write_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
return self.aio_obj.write_attribute_value(nodeid, datavalue, attr)
return self.tloop.post(self.aio_obj.write_attribute_value(nodeid, datavalue, attr))
class EventGenerator:
def __init__(self, aio_evgen):
def __init__(self, tloop, aio_evgen):
self.aio_obj = aio_evgen
self.tloop = tloop
@property
def event(self):
return self.aio_obj.event
def trigger(self, time=None, message=None):
return self.aio_obj.trigger(time, message)
return self.tloop.post(self.aio_obj.trigger(time, message))
class Node:
......
......@@ -32,7 +32,6 @@ async def main():
await custom_etype.add_property(2, 'MyBoolProperty', ua.Variant(True, ua.VariantType.Boolean))
mysecondevgen = await server.get_event_generator(custom_etype, myobj)
async with server:
count = 0
while True:
......@@ -41,9 +40,8 @@ async def main():
myevgen.event.Severity = count
myevgen.event.MyNumericProperty = count
myevgen.event.MyStringProperty = "Property %d" % count
myevgen.trigger()
mysecondevgen.trigger(message="MySecondEvent %d" % count)
await myevgen.trigger()
await mysecondevgen.trigger(message="MySecondEvent %d" % count)
count += 1
......
......@@ -122,11 +122,11 @@ async def main():
var.append(9.3)
await myarrayvar.write_value(var)
await mydevice_var.write_value("Running")
myevgen.trigger(message="This is BaseEvent")
server.write_attribute_value(myvar.nodeid, ua.DataValue(0.9)) # Server side write method which is a bit faster than using write_value
await myevgen.trigger(message="This is BaseEvent")
await server.write_attribute_value(myvar.nodeid, ua.DataValue(0.9)) # Server side write method which is a bit faster than using write_value
while True:
await asyncio.sleep(0.1)
server.write_attribute_value(myvar.nodeid, ua.DataValue(sin(time.time())))
await server.write_attribute_value(myvar.nodeid, ua.DataValue(sin(time.time())))
if __name__ == "__main__":
......
......@@ -33,7 +33,7 @@ async def mymain():
#nb = 100000
#start = time.time()
#for i in range(nb):
#server.write_attribute_value(myvar.nodeid, ua.DataValue(i))
#await server.write_attribute_value(myvar.nodeid, ua.DataValue(i))
#await myvar.write_value(i)
print("\n Write frequency: \n", nb / (time.time() - start))
......
......@@ -143,7 +143,7 @@ async def create_srv_events(history_server: HistoryServer):
await history_server.srv.historize_node_event(history_server.srv_node, period=None)
for i in history_server.ev_values:
srv_evgen.event.Severity = history_server.ev_values[i]
srv_evgen.trigger(message="test message")
await srv_evgen.trigger(message="test message")
await asyncio.sleep(.1)
await asyncio.sleep(2)
......
......@@ -140,7 +140,7 @@ async def test_historize_events(server):
srvevgen = await server.get_event_generator()
await server.iserver.enable_history_event(srv_node, period=None)
assert await srv_node.read_event_notifier() == {ua.EventNotifier.SubscribeToEvents, ua.EventNotifier.HistoryRead}
srvevgen.trigger(message='Message')
await srvevgen.trigger(message='Message')
await server.iserver.disable_history_event(srv_node)
......
......@@ -9,19 +9,6 @@ from asyncua import ua
pytestmark = pytest.mark.asyncio
class SubHandler:
"""
Dummy subscription client
"""
def datachange_notification(self, node, val, data):
pass
def event_notification(self, event):
pass
class MySubHandler:
"""
More advanced subscription client using Future, so we can await events in tests.
......@@ -51,8 +38,8 @@ class MySubHandler2:
self.limit = limit
self._done = asyncio.Event()
def done(self):
return wait_for(self._done.wait(), 2)
async def done(self):
return await wait_for(self._done.wait(), 2)
def check_done(self):
if self.limit and len(self.results) == self.limit and not self._done.is_set():
......@@ -79,6 +66,14 @@ class MySubHandlerCounter:
self.event_count += 1
class MySubHandlerCounterAsync(MySubHandlerCounter):
async def datachange_notification(self, node, val, data):
self.datachange_count += 1
async def event_notification(self, event):
self.event_count += 1
async def test_subscription_failure(opc):
myhandler = MySubHandler()
o = opc.opc.nodes.objects
......@@ -89,9 +84,10 @@ async def test_subscription_failure(opc):
await sub.delete()
async def test_subscription_overload(opc):
@pytest.mark.parametrize("handler_class", [MySubHandlerCounter, MySubHandlerCounterAsync])
async def test_subscription_overload(opc, handler_class):
nb = 10
myhandler = MySubHandlerCounter()
myhandler = handler_class()
o = opc.opc.nodes.objects
sub = await opc.opc.create_subscription(1, myhandler)
variables = []
......@@ -119,8 +115,9 @@ async def test_subscription_overload(opc):
# assert myhandler.event_count == 0
async def test_subscription_count(opc):
myhandler = MySubHandlerCounter()
@pytest.mark.parametrize("handler_class", [MySubHandlerCounter, MySubHandlerCounterAsync])
async def test_subscription_count(opc, handler_class):
myhandler = handler_class()
sub = await opc.opc.create_subscription(1, myhandler)
o = opc.opc.nodes.objects
var = await o.add_variable(3, 'SubVarCounter', 0.1)
......@@ -134,8 +131,9 @@ async def test_subscription_count(opc):
await sub.delete()
async def test_subscription_count_list(opc):
myhandler = MySubHandlerCounter()
@pytest.mark.parametrize("handler_class", [MySubHandlerCounter, MySubHandlerCounterAsync])
async def test_subscription_count_list(opc, handler_class):
myhandler = handler_class()
sub = await opc.opc.create_subscription(1, myhandler)
o = opc.opc.nodes.objects
var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
......@@ -152,8 +150,9 @@ async def test_subscription_count_list(opc):
await sub.delete()
async def test_subscription_count_no_change(opc):
myhandler = MySubHandlerCounter()
@pytest.mark.parametrize("handler_class", [MySubHandlerCounter, MySubHandlerCounterAsync])
async def test_subscription_count_no_change(opc, handler_class):
myhandler = handler_class()
sub = await opc.opc.create_subscription(1, myhandler)
o = opc.opc.nodes.objects
var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
......@@ -167,8 +166,9 @@ async def test_subscription_count_no_change(opc):
await sub.delete()
async def test_subscription_count_empty(opc):
myhandler = MySubHandlerCounter()
@pytest.mark.parametrize("handler_class", [MySubHandlerCounter, MySubHandlerCounterAsync])
async def test_subscription_count_empty(opc, handler_class):
myhandler = handler_class()
sub = await opc.opc.create_subscription(1, myhandler)
o = opc.opc.nodes.objects
var = await o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3])
......@@ -428,7 +428,7 @@ async def test_events_default(opc):
handle = await sub.subscribe_events()
tid = datetime.utcnow()
msg = "this is my msg "
evgen.trigger(tid, msg)
await evgen.trigger(tid, msg)
ev = await myhandler.result()
assert ev is not None # we did not receive event
assert ua.NodeId(ua.ObjectIds.BaseEventType) == ev.EventType
......@@ -450,7 +450,7 @@ async def test_events_MyObject(opc):
handle = await sub.subscribe_events(o)
tid = datetime.utcnow()
msg = "this is my msg "
evgen.trigger(tid, msg)
await evgen.trigger(tid, msg)
ev = await myhandler.result()
assert ev is not None # we did not receive event
assert ua.NodeId(ua.ObjectIds.BaseEventType) == ev.EventType
......@@ -472,7 +472,7 @@ async def test_events_wrong_source(opc):
handle = await sub.subscribe_events()
tid = datetime.utcnow()
msg = "this is my msg "
evgen.trigger(tid, msg)
await evgen.trigger(tid, msg)
with pytest.raises(TimeoutError): # we should not receive event
ev = await myhandler.result()
await sub.unsubscribe(handle)
......@@ -495,7 +495,7 @@ async def test_events_CustomEvent(opc):
evgen.event.Severity = serverity
tid = datetime.utcnow()
msg = "this is my msg "
evgen.trigger(tid, msg)
await evgen.trigger(tid, msg)
ev = await myhandler.result()
assert ev is not None # we did not receive event
assert etype.nodeid == ev.EventType
......@@ -526,7 +526,7 @@ async def test_events_CustomEvent_MyObject(opc):
evgen.event.PropertyString = propertystring
tid = datetime.utcnow()
msg = "this is my msg "
evgen.trigger(tid, msg)
await evgen.trigger(tid, msg)
ev = await myhandler.result()
assert ev is not None # we did not receive event
assert etype.nodeid == ev.EventType
......@@ -564,16 +564,16 @@ async def test_several_different_events(opc):
evgen2.event.PropertyNum = propertynum2
evgen2.event.PropertyString = propertystring2
for i in range(3):
evgen1.trigger()
evgen2.trigger()
await evgen1.trigger()
await evgen2.trigger()
await sleep(1) # ToDo: replace
assert 3 == len(myhandler.results)
ev = myhandler.results[-1]
assert etype1.nodeid == ev.EventType
handle = await sub.subscribe_events(o, etype2)
for i in range(4):
evgen1.trigger()
evgen2.trigger()
await evgen1.trigger()
await evgen2.trigger()
await sleep(1) # ToDo: replace
ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
......@@ -618,11 +618,11 @@ async def test_several_different_events_2(opc):
evgen3.event.PropertyNum3 = propertynum3
evgen3.event.PropertyString = propertystring2
for i in range(3):
evgen1.trigger()
evgen2.trigger()
evgen3.trigger()
await evgen1.trigger()
await evgen2.trigger()
await evgen3.trigger()
evgen3.event.PropertyNum3 = 9999
evgen3.trigger()
await evgen3.trigger()
await sleep(1)
ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment