Commit be589680 authored by Olivier R-D's avatar Olivier R-D

basis of datachange events working

parent 4117c8a6
......@@ -40,8 +40,8 @@ if __name__ == "__main__":
sub = client.create_subscription(500, handler)
handle = sub.subscribe_data_change(myvar)
time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
#sub.unsubscribe(handle)
#sub.delete()
embed()
finally:
client.disconnect()
......@@ -21,8 +21,9 @@ class SubHandler(object):
if __name__ == "__main__":
#optional setup logging
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger("opcua.address_space")
#logger = logging.getLogger("opcua.internal_server")
#logger = logging.getLogger("opcua.address_space")
logger = logging.getLogger("opcua.internal_server")
logger.setLevel(logging.DEBUG)
logger = logging.getLogger("opcua.subscription_server")
logger.setLevel(logging.DEBUG)
......@@ -42,8 +43,8 @@ if __name__ == "__main__":
print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
try:
handler = SubHandler()
#sub = server.create_subscription(500, handler)
#handle = sub.subscribe_data_change(myvar)
sub = server.create_subscription(500, handler)
handle = sub.subscribe_data_change(myvar)
#time.sleep(0.1)
#sub.unsubscribe(handle)
#sub.delete()
......
......@@ -158,6 +158,7 @@ class AddressSpace(object):
v(k, value)
except Exception as ex:
self.logger.warn("Error calling datachange callback %s, %s, %s", k, v, ex)
print(ex)
return ua.StatusCode()
def add_datachange_callback(self, nodeid, attr, callback):
......
......@@ -120,13 +120,10 @@ class InternalSession(object):
return result
def close_session(self, delete_subs):
self.logger.info("close session %s", self.session_id)
self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
self.state = SessionState.Closed
self.delete_subscriptions(self.subscriptions)
self.delete_subscriptions(self.subscriptions[:])
def activate_session(self, params):
self.logger.info("activate session")
......
......@@ -9,7 +9,6 @@ import functools
from opcua import ua
class SubscriptionManager(Thread):
def __init__(self, aspace):
Thread.__init__(self)
......@@ -63,7 +62,6 @@ class SubscriptionManager(Thread):
def create_subscription(self, params, callback):
self.logger.info("create subscription")
print(self.loop, self)
with self._lock:
result = ua.CreateSubscriptionResult()
self._sub_id_counter += 1
......@@ -79,12 +77,12 @@ class SubscriptionManager(Thread):
return result
def delete_subscriptions(self, ids):
self.logger.info("delete subscription")
self.logger.info("delete subscriptions: %s", ids)
with self._lock:
res = []
for i in ids:
if not i in self.subscriptions:
res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionsIdInvalid))
res.append(ua.StatusCode(ua.StatusCodes.BadSubscriptionIdInvalid))
else:
sub = self.subscriptions.pop(i)
sub.stop()
......@@ -139,6 +137,15 @@ class InternalSubscription(object):
self._monitored_events = {}
self._monitored_datachange = {}
self._lock = RLock()
self._triggered_datachanges = []
self._triggered_events = []
self._notification_seq = 1
self._not_acknowledged_results = []
self._startup = True
self._keep_alive_count = 0
def __str__(self):
return "Subscription(id:%s)".format(self.data.SubscriptionId)
def start(self):
self.logger.debug("starting subscription %s", self.data.SubscriptionId)
......@@ -147,15 +154,62 @@ class InternalSubscription(object):
def stop(self):
self.logger.debug("stopping subscription %s", self.data.SubscriptionId)
self.manager.cancel_task(self.task)
self.delete_all_monitored_items()
def delete_all_monitored_items(self):
self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_datachange.values()])
@asyncio.coroutine
def subscription_loop(self):
self.logger.debug("%s loop running", self)
while True:
#test disabled we do not check that one since we do not care about not received results
#if self._keep_alive_count > self.data.RevisedLifetimeCount:
#self.logger.warn("Subscription %s has expired, keep alive count(%s) > lifetime count (%s)", self.data.SubscriptionId, self._keep_alive_count, self.data.RevisedLifetimeCount)
#return
self.publish_results()
yield from asyncio.sleep(1)
def has_published_results(self):
if self._startup or self._triggered_datachanges or self._triggered_events:
return True
if self._keep_alive_count > self.data.RevisedMaxKeepAliveCount:
self.logger.debug("keep alive count %s is > than max keep alive count %s, sending publish event", self._keep_alive_count, self.data.RevisedMaxKeepAliveCount)
return True
self._keep_alive_count += 1
print("no publish result")
return False
def publish_results(self):
self.logger.debug("looking for results and publishing")
#self.logger.debug("looking for results and publishing")
if self.has_published_results(): #FIXME: should I pop a publish request here? or I do not care?
result = self.pop_publish_result()
self.callback(result)
def pop_publish_result(self):
result = ua.PublishResult()
result.SubscriptionId = self.data.SubscriptionId
if self._triggered_datachanges:
notif = ua.DataChangeNotification()
notif.MonitoredItems = self._triggered_datachanges[:]
self._triggered_datachanges.clear()
self.logger.debug("sending datachanges nontification with %s events", len(notif.MonitoredItems))
print(self._triggered_datachanges)
result.NotificationMessage.NotificationData.append(notif)
if self._triggered_events:
notif = ua.EventNotificationList()
notif.Events = self._triggered_events[:]
self._triggered_events.clear()
result.NotificationMessage.NotificationData.append(notif)
#FIXME: add statuschaneg events
self._keep_alive_count = 0
self._startup = False
result.NotificationMessage.SequenceNumber = self._notification_seq
self._notification_seq += 1
result.MoreNotifications = False
result.AvailableSequenceNumbers = self._not_acknowledged_results[:]
self._not_acknowledged_results.append(result)
return result
def create_monitored_items(self, params):
results = []
......@@ -163,6 +217,11 @@ class InternalSubscription(object):
results.append(self._create_monitored_item(item))
return results
def trigger_datachange(self, handle, nodeid, attr):
self.logger.debug("triggering datachange for handle %s, nodeid %s, and attribute %s", handle, nodeid, attr)
variant = self.aspace.get_attribute_value(nodeid, attr)
self.datachange_callback(handle, variant)
def _create_monitored_item(self, params):
with self._lock:
result = ua.MonitoredItemCreateResult()
......@@ -172,7 +231,7 @@ class InternalSubscription(object):
self._monitored_item_counter += 1
result.MonitoredItemId = self._monitored_item_counter
if params.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
self.logger.info("request to subscribe to events")
self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
self._monitored_events[params.ItemToMonitor.NodeId] = result.MonitoredItemId
else:
self.logger.info("request to subscribe to datachange")
......@@ -184,16 +243,18 @@ class InternalSubscription(object):
mdata.client_handle = params.RequestedParameters.ClientHandle
mdata.callback_handle = handle
mdata.monitored_item_id = result.MonitoredItemId
self._monitored_datachange[result.MonitoredItemId] = mdata
self._monitored_datachange[handle] = mdata
#FIXME force event generation
#force event generation
self.trigger_datachange(handle, params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
return result
def delete_monitored_items(self, params):
def delete_monitored_items(self, ids):
self.logger.debug("delete monitored items %s", ids)
with self._lock:
results = []
for mid in params.MonitoredItemIds:
for mid in ids:
if self._delete_monitored_event(mid):
results.append(ua.StatusCode())
elif self._delete_monitored_datachange(mid):
......@@ -214,13 +275,21 @@ class InternalSubscription(object):
def _delete_monitored_datachange(self, mid):
with self._lock:
if mid in self._monitored_datachange:
self._monitored_datachange.pop(mid)
for k, v in self._monitored_datachange.items():
if v.monitored_item_id == mid:
self._monitored_datachange.pop(k)
return True
return False
def datachange_callback(self, handle, value):
self.logger.info("subscription %s: datachange callback called with %s, %s", self, handle, value)
self.logger.info("subscription %s: datachange callback called with %s, %s", self, handle, value.Value)
event = ua.MonitoredItemNotification()
mdata = self._monitored_datachange[handle]
#event.monitored_item_id = mdata.monitored_item_id
#event.monitored_item_notification.ClientHandle = mdata.client_handle
event.ClientHandle = mdata.client_handle
event.Value = value
self._triggered_datachanges.append(event)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment