Commit 481a9150 authored by Olivier R-D's avatar Olivier R-D

small cleanup in subscription_service

parent 8fce2f7b
......@@ -9,7 +9,7 @@ API is similar to the python bindings of freeopcua c++ client and servers. It of
Most code is autogenerated from xml specification using same code as the one that is used for freeopcua C++ client and server, thus adding missing functionnality to client and server shoud be trivial.
Using Python3.4 the server and client do not require any third party libraries. If using python2.7 or pypy you need to install enum34, trollius(asyncio), and futures(concurrent.futures), with pip for example. Server and client can be run with pypy.
Using Python > 3.4 the server and client do not require any third party libraries. If using python 2.7 or pypy < 3 you need to install enum34, trollius(asyncio), and futures(concurrent.futures), with pip for example. Server and client can be run with pypy.
coveryage.py reports a test coverage of over 90% of code, most of the rest is autogenerate code that is not used yet.
......
......@@ -184,29 +184,40 @@ class InternalSubscription(object):
def _pop_publish_result(self):
result = ua.PublishResult()
result.SubscriptionId = self.data.SubscriptionId
self._pop_triggered_datachanges(result)
self._pop_triggered_events(result)
self._pop_triggered_statuschanges(result)
self._keep_alive_count = 0
self._startup = False
result.NotificationMessage.SequenceNumber = self._notification_seq
self._notification_seq += 1
result.MoreNotifications = False
result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
return result
def _pop_triggered_datachanges(self, result):
if self._triggered_datachanges:
notif = ua.DataChangeNotification()
notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
self._triggered_datachanges = {}
self.logger.debug("sending datachanges nontification with %s events", len(notif.MonitoredItems))
self.logger.debug("sending datachanges notification with %s events", len(notif.MonitoredItems))
result.NotificationMessage.NotificationData.append(notif)
def _pop_triggered_events(self, result):
if self._triggered_events:
notif = ua.EventNotificationList()
notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
self._triggered_events = {}
result.NotificationMessage.NotificationData.append(notif)
self.logger.debug("sending event notification with %s events", len(notif.Events))
def _pop_triggered_statuschanges(self, result):
if self._triggered_statuschanges:
notif = ua.StatusChangeNotification()
notif.Status = self._triggered_statuschanges.pop(0)
result.NotificationMessage.NotificationData.append(notif)
self._keep_alive_count = 0
self._startup = False
result.NotificationMessage.SequenceNumber = self._notification_seq
self._notification_seq += 1
result.MoreNotifications = False
result.AvailableSequenceNumbers = list(self._not_acknowledged_results.keys())
self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
return result
self.logger.debug("sending event notification %s", len(notif.Status))
def trigger_statuschange(self, code):
self._triggered_statuschanges.append(code)
......@@ -263,7 +274,7 @@ class InternalSubscription(object):
with self._lock:
result = ua.MonitoredItemCreateResult()
result.RevisedSamplingInterval = self.data.RevisedPublishingInterval
result.RevisedQueueSize = params.RequestedParameters.QueueSize # FIXME check and use value
result.RevisedQueueSize = params.RequestedParameters.QueueSize
result.FilterResult = ua.downcast_extobject(params.RequestedParameters.Filter)
self._monitored_item_counter += 1
result.MonitoredItemId = self._monitored_item_counter
......@@ -322,13 +333,7 @@ class InternalSubscription(object):
mdata = self._monitored_items[mid]
event.ClientHandle = mdata.client_handle
event.Value = value
if not mid in self._triggered_datachanges:
self._triggered_datachanges[mid] = [event]
return
if mdata.parameters.RevisedQueueSize:
if len(self._triggered_datachanges[mid]) >= mdata.parameters.RevisedQueueSize:
self._triggered_datachanges[mid].pop(0)
self._triggered_datachanges[mid].append(event)
self._enqueue_event(mid, event, mdata.parameters.RevisedQueueSize, self._triggered_datachanges)
def trigger_event(self, event):
with self._lock:
......@@ -344,18 +349,18 @@ class InternalSubscription(object):
fieldlist = ua.EventFieldList()
fieldlist.ClientHandle = mdata.client_handle
fieldlist.EventFields = self._get_event_fields(mdata.parameters.FilterResult, event)
if not mid in self._triggered_events:
self._triggered_events[mid] = [fieldlist]
return True
if mdata.parameters.RevisedQueueSize:
if len(self._triggered_events[mid]) >= mdata.parameters.RevisedQueueSize:
self._triggered_events[mid].pop(0)
self._triggered_events[mid].append(fieldlist)
print("\n\n", event)
print(fieldlist)
print(self._triggered_events[mid])
self._enqueue_event(mid, fieldlist, mdata.parameters.RevisedQueueSize, self._triggered_events)
return True
def _enqueue_event(self, mid, eventdata, size, queue):
if not mid in queue:
queue[mid] = [eventdata]
return
if size != 0:
if len(queue[mid]) >= size:
queue[mid].pop(0)
queue[mid].append(eventdata)
def _get_event_fields(self, evfilter, event):
fields = []
for sattr in evfilter.SelectClauses:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment