Commit 3552cccf authored by Christoph Ziebuhr's avatar Christoph Ziebuhr Committed by oroulet

Add ConditionRefresh methods

parent 42889db6
......@@ -2,7 +2,6 @@ import logging
from datetime import datetime
import time
import uuid
from typing import Optional
import sys
from asyncua import ua
......@@ -25,9 +24,8 @@ class EventGenerator:
self.logger = logging.getLogger(__name__)
self.isession = isession
self.event: event_objects.BaseEvent = None
self.emitting_node: Optional[Node] = None
async def init(self, etype=None, emitting_node=ua.ObjectIds.Server):
async def init(self, etype=None, emitting_node=ua.ObjectIds.Server, add_generates_event=True):
node = None
if isinstance(etype, event_objects.BaseEvent):
......@@ -56,26 +54,27 @@ class EventGenerator:
self.event.SourceName = (await Node(self.isession, self.event.SourceNode).read_browse_name()).Name
await emitting_node.set_event_notifier([ua.EventNotifier.SubscribeToEvents])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.GeneratesEvent)
ref.SourceNodeId = emitting_node.nodeid
ref.TargetNodeClass = ua.NodeClass.ObjectType
ref.TargetNodeId = self.event.EventType
refs.append(ref)
await self.isession.add_references(refs)
# result.StatusCode.check()
self.emitting_node = emitting_node
if add_generates_event:
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId(ua.ObjectIds.GeneratesEvent)
ref.SourceNodeId = emitting_node.nodeid
ref.TargetNodeClass = ua.NodeClass.ObjectType
ref.TargetNodeId = self.event.EventType
refs.append(ref)
results = await self.isession.add_references(refs)
for result in results:
result.check()
def __str__(self):
return f"EventGenerator(Type:{self.event.EventType}, Emitting Node:{self.emitting_node}, " \
return f"EventGenerator(Type:{self.event.EventType}, Emitting Node:{self.event.emitting_node.to_string()}, " \
f"Time:{self.event.Time}, Message: {self.event.Message})"
__repr__ = __str__
async def trigger(self, time_attr=None, message=None):
async def trigger(self, time_attr=None, message=None, subscription_id=None):
"""
Trigger the event. This will send a notification to all subscribed clients
"""
......@@ -100,4 +99,4 @@ class EventGenerator:
elif not self.event.Message:
self.event.Message = ua.LocalizedText((await Node(self.isession, self.event.SourceNode).read_browse_name()).Name).Text
await self.isession.subscription_service.trigger_event(self.event)
await self.isession.subscription_service.trigger_event(self.event, subscription_id=subscription_id)
......@@ -22,6 +22,7 @@ from .subscription_service import SubscriptionService
from .standard_address_space import standard_address_space
from .users import User, UserRole
from .internal_session import InternalSession
from .event_generator import EventGenerator
try:
from asyncua.crypto import uacrypto
......@@ -49,6 +50,7 @@ class InternalServer:
self.endpoints = []
self._channel_id_counter = 5
self.allow_remote_admin = True
self.bind_condition_methods = False
self.disabled_clock = False # for debugging, we may want to disable clock that writes too much in log
self._known_servers = {} # used if we are a discovery server
self.certificate = None
......@@ -81,6 +83,20 @@ class InternalServer:
await self._address_space_fixes()
await self.setup_nodes()
await self.history_manager.init()
if self.bind_condition_methods:
await self.setup_condition_methods()
async def setup_condition_methods(self):
for etype in (ua.ObjectIds.RefreshStartEventType, ua.ObjectIds.RefreshEndEventType):
evgen = EventGenerator(self.isession)
await evgen.init(etype, add_generates_event=False)
# don't use isinstance(int), it also matches bool
if type(self.bind_condition_methods) is int:
evgen.event.Severity = self.bind_condition_methods
self.subscription_service.standard_events[etype] = evgen
self.isession.add_method_callback(ua.NodeId(ua.ObjectIds.ConditionType_ConditionRefresh), self.subscription_service.condition_refresh)
self.isession.add_method_callback(ua.NodeId(ua.ObjectIds.ConditionType_ConditionRefresh2), self.subscription_service.condition_refresh)
async def setup_nodes(self):
"""
......
......@@ -235,14 +235,18 @@ class MonitoredItemService:
return True
return False
async def trigger_event(self, event):
async def trigger_event(self, event, mid=None):
if event.emitting_node not in self._monitored_events:
self.logger.debug("%s has NO subscription for events %s from node: %s", self, event, event.emitting_node)
return False
self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.emitting_node)
mids = self._monitored_events[event.emitting_node]
for mid in mids:
if mid is not None:
await self._trigger_event(event, mid)
else:
mids = self._monitored_events[event.emitting_node]
for mid in mids:
await self._trigger_event(event, mid)
return True
async def _trigger_event(self, event, mid: int):
......
......@@ -7,7 +7,7 @@ import logging
from typing import Dict, Iterable
from asyncua import ua
from asyncua.common import utils
from asyncua.common import utils, uamethod
from .address_space import AddressSpace
from .internal_subscription import InternalSubscription
......@@ -23,6 +23,8 @@ class SubscriptionService:
self.aspace: AddressSpace = aspace
self.subscriptions: Dict[int, InternalSubscription] = {}
self._sub_id_counter = 77
self.standard_events = {}
self._conditions = {}
async def create_subscription(self, params, callback, request_callback=None):
self.logger.info("create subscription")
......@@ -113,6 +115,29 @@ class SubscriptionService:
return ua.NotificationMessage()
return self.subscriptions[params.SubscriptionId].republish(params.RetransmitSequenceNumber)
async def trigger_event(self, event):
for sub in self.subscriptions.values():
await sub.monitored_item_srv.trigger_event(event)
async def trigger_event(self, event, subscription_id=None):
if hasattr(event, 'Retain') and hasattr(event, 'NodeId'):
if event.Retain:
self._conditions[event.NodeId] = event
elif event.NodeId in self._conditions:
del self._conditions[event.NodeId]
if subscription_id is not None:
if subscription_id in self.subscriptions:
await self.subscriptions[subscription_id].monitored_item_srv.trigger_event(event)
else:
for sub in self.subscriptions.values():
await sub.monitored_item_srv.trigger_event(event)
@uamethod
async def condition_refresh(self, parent, subscription_id, mid=None):
if subscription_id not in self.subscriptions:
return ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid)
sub = self.subscriptions[subscription_id]
if mid is not None and mid not in sub.monitored_item_srv._monitored_items:
return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
if ua.ObjectIds.RefreshStartEventType in self.standard_events:
await self.standard_events[ua.ObjectIds.RefreshStartEventType].trigger(subscription_id=subscription_id)
for event in self._conditions.values():
await sub.monitored_item_srv.trigger_event(event, mid)
if ua.ObjectIds.RefreshEndEventType in self.standard_events:
await self.standard_events[ua.ObjectIds.RefreshEndEventType].trigger(subscription_id=subscription_id)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment