Commit 573a8a3f authored by Vladas Tamosaitis's avatar Vladas Tamosaitis Committed by oroulet

fix: parallel async datachange_notification calls

parent cfe23db1
...@@ -4,7 +4,7 @@ high level interface to subscriptions ...@@ -4,7 +4,7 @@ high level interface to subscriptions
import asyncio import asyncio
import logging import logging
import collections.abc import collections.abc
from typing import Union, List, Iterable, Optional from typing import Tuple, Union, List, Iterable, Optional
from asyncua.common.ua_utils import copy_dataclass_attr from asyncua.common.ua_utils import copy_dataclass_attr
from asyncua import ua from asyncua import ua
...@@ -124,22 +124,27 @@ class Subscription: ...@@ -124,22 +124,27 @@ class Subscription:
results[0].check() results[0].check()
async def _call_datachange(self, datachange: ua.DataChangeNotification): async def _call_datachange(self, datachange: ua.DataChangeNotification):
if not hasattr(self._handler, "datachange_notification"):
self.logger.error("DataChange subscription created but handler has no datachange_notification method")
return
known_handles_args: List[Tuple] = []
for item in datachange.MonitoredItems: for item in datachange.MonitoredItems:
if item.ClientHandle not in self._monitored_items: if item.ClientHandle not in self._monitored_items:
self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle) self.logger.warning("Received a notification for unknown handle: %s", item.ClientHandle)
continue continue
data = self._monitored_items[item.ClientHandle] data = self._monitored_items[item.ClientHandle]
if hasattr(self._handler, "datachange_notification"): event_data = DataChangeNotif(data, item)
event_data = DataChangeNotif(data, item) known_handles_args.append((data.node, item.Value.Value.Value, event_data))
try:
if asyncio.iscoroutinefunction(self._handler.datachange_notification): try:
await self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data) tasks = [
else: self._handler.datachange_notification(*args) for args in known_handles_args
self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data) ]
except Exception: if asyncio.iscoroutinefunction(self._handler.datachange_notification):
self.logger.exception("Exception calling data change handler") await asyncio.gather(*tasks)
else: except Exception as ex:
self.logger.error("DataChange subscription created but handler has no datachange_notification method") self.logger.exception("Exception calling data change handler. Error: %s", ex)
async def _call_event(self, eventlist: ua.EventNotificationList): async def _call_event(self, eventlist: ua.EventNotificationList):
for event in eventlist.Events: for event in eventlist.Events:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment