Commit 066c61eb authored by Yuta Okamoto's avatar Yuta Okamoto Committed by oroulet

add type hints to Subscription

parent 39e07108
......@@ -14,7 +14,7 @@ from ..common.xmlimporter import XmlImporter
from ..common.xmlexporter import XmlExporter
from ..common.node import Node
from ..common.manage_nodes import delete_nodes
from ..common.subscription import SubHandler, Subscription
from ..common.subscription import Subscription, SubscriptionHandler
from ..common.shortcuts import Shortcuts
from ..common.structures import load_type_definitions, load_enums
from ..common.structures104 import load_data_type_definitions
......@@ -719,7 +719,7 @@ class Client:
return Node(self.uaclient, nodeid)
async def create_subscription(
self, period: Union[ua.CreateSubscriptionParameters, float], handler: SubHandler, publishing: bool = True
self, period: Union[ua.CreateSubscriptionParameters, float], handler: SubscriptionHandler, publishing: bool = True
) -> Subscription:
Create a subscription.
high level interface to subscriptions
from __future__ import annotations
import asyncio
import logging
from typing import Tuple, Union, List, Iterable, Optional
from asyncua.common.ua_utils import copy_dataclass_attr
import logging
import sys
from typing import TYPE_CHECKING, Any, Tuple, Union, List, Iterable, Optional, overload
if sys.version_info >= (3, 8):
from typing import Protocol
from typing_extensions import Protocol
from asyncua import ua
from asyncua.client.ua_client import UaClient
from asyncua.common.ua_utils import copy_dataclass_attr
from asyncua.server.internal_session import InternalSession
from .events import Event, get_filter_from_event_type
from .node import Node
......@@ -44,25 +57,55 @@ class DataChangeNotif:
__repr__ = __str__
class DataChangeNotificationHandler(Protocol):
def datachange_notification(self, node: Node, val: Any, data: DataChangeNotif) -> None:
called for every datachange notification from server
class EventNotificationHandler(Protocol):
def event_notification(self, event: ua.EventNotificationList) -> None:
called for every event notification from server
class StatusChangeNotificationHandler(Protocol):
def status_change_notification(self, status: ua.StatusChangeNotification) -> None:
called for every status change notification from server
SubscriptionHandler = Union[DataChangeNotificationHandler, EventNotificationHandler, StatusChangeNotificationHandler]
Protocol class representing subscription handlers to receive events from server.
class SubHandler:
Subscription Handler. To receive events from server for a subscription
This class is just a sample class. Whatever class having these methods can be used
def datachange_notification(self, node: Node, val, data: DataChangeNotif):
def datachange_notification(self, node: Node, val: Any, data: DataChangeNotif) -> None:
called for every datachange notification from server
def event_notification(self, event: ua.EventNotificationList):
def event_notification(self, event: ua.EventNotificationList) -> None:
called for every event notification from server
def status_change_notification(self, status: ua.StatusChangeNotification):
def status_change_notification(self, status: ua.StatusChangeNotification) -> None:
called for every status change notification from server
......@@ -75,14 +118,19 @@ class Subscription:
The object represent a subscription to an opc-ua server.
This is a high level class, especially `subscribe_data_change` and `subscribe_events methods`.
If more control is necessary look at code and/or use `create_monitored_items method`.
:param server: `InternalSession` or `UAClient`
:param server: `InternalSession` or `UaClient`
def __init__(self, server, params: ua.CreateSubscriptionParameters, handler: SubHandler):
def __init__(
server: Union[InternalSession, UaClient],
params: ua.CreateSubscriptionParameters,
handler: SubscriptionHandler,
self.logger = logging.getLogger(__name__)
self.server = server
self.server: Union[InternalSession, UaClient] = server
self._client_handle = 200
self._handler: SubHandler = handler
self._handler: SubscriptionHandler = handler
self.parameters: ua.CreateSubscriptionParameters = params # move to data class
self._monitored_items = {}
self.subscription_id: Optional[int] = None
......@@ -99,14 +147,16 @@ class Subscription:
async def update(
params: ua.ModifySubscriptionParameters
) -> ua.ModifySubscriptionResponse:
) -> ua.ModifySubscriptionResult:
if not isinstance(self.server, UaClient):
raise ua.uaerrors.UaError(f"update() is not supported in {self.server}.")
response = await self.server.update_subscription(params)'Subscription updated %s', params.SubscriptionId)
# update the self.parameters attr with the updated values
copy_dataclass_attr(params, self.parameters)
return response
async def publish_callback(self, publish_result: ua.PublishResult):
async def publish_callback(self, publish_result: ua.PublishResult) -> None:
Handle `PublishResult` callback.
......@@ -122,14 +172,14 @@ class Subscription:
self.logger.warning("Notification type not supported yet for notification %s", notif)
async def delete(self):
async def delete(self) -> None:
Delete subscription on server. This is automatically done by Client and Server classes on exit.
results = await self.server.delete_subscriptions([self.subscription_id])
async def _call_datachange(self, datachange: ua.DataChangeNotification):
async def _call_datachange(self, datachange: ua.DataChangeNotification) -> None:
if not hasattr(self._handler, "datachange_notification"):
self.logger.error("DataChange subscription created but handler has no datachange_notification method")
......@@ -153,7 +203,7 @@ class Subscription:
except Exception as ex:
self.logger.exception("Exception calling data change handler. Error: %s", ex)
async def _call_event(self, eventlist: ua.EventNotificationList):
async def _call_event(self, eventlist: ua.EventNotificationList) -> None:
for event in eventlist.Events:
if event.ClientHandle not in self._monitored_items:
self.logger.warning("Received a notification for unknown handle: %s", event.ClientHandle)
......@@ -172,7 +222,7 @@ class Subscription:
self.logger.error("Event subscription created but handler has no event_notification method")
async def _call_status(self, status: ua.StatusChangeNotification):
async def _call_status(self, status: ua.StatusChangeNotification) -> None:
if not hasattr(self._handler, "status_change_notification"):
self.logger.error("DataChange subscription has no status_change_notification method")
......@@ -184,6 +234,28 @@ class Subscription:
except Exception:
self.logger.exception("Exception calling status change handler")
async def subscribe_data_change(
nodes: Node,
attr: ua.AttributeIds = ua.AttributeIds.Value,
queuesize: int = 0,
sampling_interval: ua.Duration = 0.0
) -> int:
async def subscribe_data_change(
nodes: Union[Node, Iterable[Node]],
attr: ua.AttributeIds = ua.AttributeIds.Value,
queuesize: int = 0,
sampling_interval: ua.Duration = 0.0
) -> List[Union[int, ua.StatusCode]]:
async def subscribe_data_change(
nodes: Union[Node, Iterable[Node]],
......@@ -213,8 +285,10 @@ class Subscription:
nodes, attr, queuesize=queuesize, monitoring=monitoring, sampling_interval=sampling_interval
async def _create_eventfilter(self, evtypes: Union[ua.ObjectIds, List[ua.ObjectIds], ua.NodeId, List[ua.NodeId]], where_clause_generation: bool = True):
if not isinstance(evtypes, (list, tuple)):
async def _create_eventfilter(
self, evtypes: Union[int, ua.NodeId, Iterable[Union[int, ua.NodeId]]], where_clause_generation: bool = True
) -> ua.EventFilter:
if isinstance(evtypes, (int, ua.NodeId)):
evtypes = [evtypes]
evtypes = [Node(self.server, evtype) for evtype in evtypes] # type: ignore[union-attr]
evfilter = await get_filter_from_event_type(evtypes, where_clause_generation) # type: ignore[union-attr]
......@@ -222,9 +296,9 @@ class Subscription:
async def subscribe_events(
sourcenode: Node = ua.ObjectIds.Server,
evtypes: Union[ua.ObjectIds, List[ua.ObjectIds], ua.NodeId, List[ua.NodeId]] = ua.ObjectIds.BaseEventType,
evfilter: ua.EventFilter = None,
sourcenode: Union[Node, ua.NodeId, str, int] = ua.ObjectIds.Server,
evtypes: Union[int, ua.NodeId, Iterable[Union[ua.NodeId, int]]] = ua.ObjectIds.BaseEventType,
evfilter: Optional[ua.EventFilter] = None,
queuesize: int = 0,
where_clause_generation: bool = True
) -> int:
......@@ -245,7 +319,7 @@ class Subscription:
sourcenode = Node(self.server, sourcenode)
if evfilter is None:
if not isinstance(evtypes, (list, tuple)) and evtypes == ua.ObjectIds.BaseEventType:
if evtypes == ua.ObjectIds.BaseEventType or evtypes == ua.NodeId(ua.ObjectIds.BaseEventType):
# Remove where clause for base event type, for servers that have problems with long WhereClauses.
# Also because BaseEventType wants every event we can ommit it. Issue: #1205
where_clause_generation = False
......@@ -254,9 +328,9 @@ class Subscription:
async def subscribe_alarms_and_conditions(
sourcenode: Node = ua.ObjectIds.Server,
evtypes: Union[ua.ObjectIds, List[ua.ObjectIds], ua.NodeId, List[ua.NodeId]] = ua.ObjectIds.ConditionType,
evfilter: ua.EventFilter = None,
sourcenode: Union[Node, ua.NodeId, str, int] = ua.ObjectIds.Server,
evtypes: Union[int, ua.NodeId, Iterable[Union[int, ua.NodeId]]] = ua.ObjectIds.ConditionType,
evfilter: Optional[ua.EventFilter] = None,
queuesize: int = 0
) -> int:
......@@ -275,11 +349,35 @@ class Subscription:
return await self.subscribe_events(sourcenode, evtypes, evfilter, queuesize)
async def _subscribe(
nodes: Node,
mfilter: Optional[ua.MonitoringFilter] = None,
queuesize: int = 0,
monitoring: ua.MonitoringMode = ua.MonitoringMode.Reporting,
sampling_interval: ua.Duration = 0.0
) -> int:
async def _subscribe(
nodes: Iterable[Node],
mfilter: Optional[ua.MonitoringFilter] = None,
queuesize: int = 0,
monitoring: ua.MonitoringMode = ua.MonitoringMode.Reporting,
sampling_interval: ua.Duration = 0.0
) -> List[Union[int, ua.StatusCode]]:
async def _subscribe(
nodes: Union[Node, Iterable[Node]],
mfilter: ua.MonitoringFilter = None,
mfilter: Optional[ua.MonitoringFilter] = None,
queuesize: int = 0,
monitoring: ua.MonitoringMode = ua.MonitoringMode.Reporting,
sampling_interval: ua.Duration = 0.0
......@@ -344,14 +442,14 @@ class Subscription:
mir.RequestedParameters = mparams
return mir
async def create_monitored_items(self, monitored_items: List[ua.MonitoredItemCreateRequest]) -> List[Union[int, ua.StatusCode]]:
async def create_monitored_items(self, monitored_items: Iterable[ua.MonitoredItemCreateRequest]) -> List[Union[int, ua.StatusCode]]:
low level method to have full control over subscription parameters.
Client handle must be unique since it will be used as key for internal registration of data.
params = ua.CreateMonitoredItemsParameters()
params.SubscriptionId = self.subscription_id
params.ItemsToCreate = monitored_items
params.ItemsToCreate = list(monitored_items)
params.TimestampsToReturn = ua.TimestampsToReturn.Both
# insert monitored item into map to avoid notification arrive before result return
# server_handle is left as None in purpose as we don't get it yet.
......@@ -378,18 +476,18 @@ class Subscription:
return mids
async def unsubscribe(self, handle: Union[int, List[int]]):
async def unsubscribe(self, handle: Union[int, Iterable[int]]) -> None:
Unsubscribe from datachange or events using the handle returned while subscribing.
If you delete the subscription, you do not need to unsubscribe.
:param handle: The handle that was returned when subscribing to the node/nodes
handles: List[int] = [handle] if isinstance(handle, int) else handle
handles: Iterable[int] = [handle] if isinstance(handle, int) else handle
if not handles:
params = ua.DeleteMonitoredItemsParameters()
params.SubscriptionId = self.subscription_id
params.MonitoredItemIds = handles
params.MonitoredItemIds = list(handles)
results = await self.server.delete_monitored_items(params)
handle_map = {v.server_handle: k for k, v in self._monitored_items.items()}
......@@ -397,7 +495,9 @@ class Subscription:
if handle in handle_map:
del self._monitored_items[handle_map[handle]]
async def modify_monitored_item(self, handle: int, new_samp_time: ua.Duration, new_queuesize: int = 0, mod_filter_val: int = -1):
async def modify_monitored_item(
self, handle: int, new_samp_time: ua.Duration, new_queuesize: int = 0, mod_filter_val: int = -1
) -> List[ua.MonitoredItemModifyResult]:
Modify a monitored item.
:param handle: Handle returned when originally subscribing
......@@ -439,7 +539,7 @@ class Subscription:
new_samp_time: ua.Duration,
mod_filter: ua.DataChangeFilter,
client_handle: ua.IntegerId
) -> ua.MonitoringParameters:
req_params = ua.MonitoringParameters()
req_params.ClientHandle = client_handle
req_params.QueueSize = new_queuesize
......@@ -447,14 +547,36 @@ class Subscription:
req_params.SamplingInterval = new_samp_time
return req_params
def deadband_monitor(
async def deadband_monitor(
var: Node,
deadband_val: ua.Double,
deadbandtype: ua.UInt32 = 1,
queuesize: int = 0,
attr: ua.AttributeIds = ua.AttributeIds.Value
) -> int:
async def deadband_monitor(
var: Iterable[Node],
deadband_val: ua.Double,
deadbandtype: ua.UInt32 = 1,
queuesize: int = 0,
attr: ua.AttributeIds = ua.AttributeIds.Value
) -> List[Union[int, ua.StatusCode]]:
async def deadband_monitor(
var: Union[Node, Iterable[Node]],
deadband_val: ua.Double,
deadbandtype: ua.UInt32 = 1,
queuesize: int = 0,
attr: ua.AttributeIds = ua.AttributeIds.Value
) -> Union[int, List[Union[int, ua.StatusCode]]]:
Method to create a subscription with a Deadband Value.
Default deadband value type is absolute.
......@@ -471,9 +593,9 @@ class Subscription:
deadband_filter.DeadbandType = deadbandtype
# absolute float value or from 0 to 100 for percentage deadband
deadband_filter.DeadbandValue = deadband_val
return self._subscribe(var, attr, deadband_filter, queuesize)
return await self._subscribe(var, attr, deadband_filter, queuesize)
async def set_monitoring_mode(self, monitoring: ua.MonitoringMode) -> ua.uatypes.StatusCode:
async def set_monitoring_mode(self, monitoring: ua.MonitoringMode) -> List[ua.uatypes.StatusCode]:
The monitoring mode parameter is used
to enable/disable the sampling of MonitoredItems
......@@ -482,6 +604,8 @@ class Subscription:
:param monitoring: The monitoring mode to apply
:return: Return a Set Monitoring Mode Result
if not isinstance(self.server, UaClient):
raise ua.uaerrors.UaError(f"set_monitoring_mode() is not supported in {self.server}.")
node_handles = []
for mi in self._monitored_items.values():
......@@ -492,7 +616,7 @@ class Subscription:
params.MonitoringMode = monitoring
return await self.server.set_monitoring_mode(params)
async def set_publishing_mode(self, publishing: bool) -> ua.uatypes.StatusCode:
async def set_publishing_mode(self, publishing: bool) -> List[ua.uatypes.StatusCode]:
Disable publishing of NotificationMessages for the subscription,
but doesn't discontinue the sending of keep-alive Messages,
......@@ -502,6 +626,8 @@ class Subscription:
:return: Return a Set Publishing Mode Result
if not isinstance(self.server, UaClient):
raise ua.uaerrors.UaError(f"set_publishing_mode() is not supported in {self.server}.")
params = ua.SetPublishingModeParameters()
params.SubscriptionIds = [self.subscription_id] # type: ignore
params.PublishingEnabled = publishing
from __future__ import annotations
import asyncio
import logging
from datetime import timedelta
from datetime import datetime
from asyncua import ua
from ..common.subscription import Subscription, SubHandler
from asyncua.common import subscription
from asyncua.common.subscription import Subscription, SubscriptionHandler
from ..common.utils import Buffer
......@@ -213,8 +216,8 @@ class HistoryDict(HistoryStorageInterface):
class SubHandler(SubHandler): # type: ignore
def __init__(self, storage):
class SubHandler(subscription.SubHandler):
def __init__(self, storage: HistoryStorageInterface): = storage
def datachange_notification(self, node, val, data):
......@@ -240,7 +243,7 @@ class HistoryManager:
""" = storage
async def _create_subscription(self, handler):
async def _create_subscription(self, handler: SubscriptionHandler):
params = ua.CreateSubscriptionParameters()
params.RequestedPublishingInterval = 10
params.RequestedLifetimeCount = 3000
......@@ -59,7 +59,7 @@ class InternalSession(AbstractSession):
def is_activated(self) -> bool:
return self.state == SessionState.Activated
async def create_session(self, params: ua.CreateSessionParameters, sockname: Optional[Tuple[str, int]]=None):
async def create_session(self, params: ua.CreateSessionParameters, sockname: Optional[Tuple[str, int]] = None):'Create session request')
result = ua.CreateSessionResult()
result.SessionId = self.session_id
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment