Commit 8fce2f7b authored by Olivier R-D's avatar Olivier R-D

respect queue size for subscription, deepcopy event

parent 2d558b54
build*
MANIFEST
.idea*
htmlcov*
......@@ -328,7 +328,7 @@ class BinaryClient(object):
response = ua.PublishResponse.from_binary(future.result())
try:
self._publishcallbacks[response.Parameters.SubscriptionId](response.Parameters)
except Exception as ex: # we call client code, catch everything!
except Exception: # we call client code, catch everything!
self.logger.exception("Exception while calling user callback")
def create_monitored_items(self, params):
......
......@@ -2,7 +2,6 @@
Socket server forwarding request to internal server
"""
import logging
import functools
try:
# we prefer to use bundles asyncio version, otherwise fallback to trollius
import asyncio
......@@ -11,7 +10,6 @@ except ImportError:
from trollius import From
from opcua import ua
from opcua.uaprocessor import UAProcessor
......@@ -31,6 +29,7 @@ class BinaryServer(object):
def start(self):
class OPCUAProtocol(asyncio.Protocol):
"""
instanciated for every connection
defined as internal class since it needs access
......@@ -75,8 +74,8 @@ class BinaryServer(object):
return
if len(data) <= hdr.packet_size:
return
data = data[hdr.packet_size:]
except utils.NotEnoughData:
data = data[hdr.packet_size:]
except ua.utils.NotEnoughData:
logger.warning("Not a complete packet in data from client, waiting for more data")
self.data = buf.data
break
......@@ -94,8 +93,3 @@ class BinaryServer(object):
self.logger.warning("Closing asyncio socket server")
self._server.close()
self.loop.run_coro_and_wait(self._server.wait_closed())
......@@ -75,7 +75,8 @@ class Subscription(object):
def _call_datachange(self, datachange):
for item in datachange.MonitoredItems:
data = self._monitoreditems_map[item.ClientHandle]
with self._lock:
data = self._monitoreditems_map[item.ClientHandle]
try:
self._handler.data_change(data.server_handle, data.node, item.Value.Value.Value, data.attribute)
except Exception:
......@@ -83,7 +84,8 @@ class Subscription(object):
def _call_event(self, eventlist):
for event in eventlist.Events:
data = self._monitoreditems_map[event.ClientHandle]
with self._lock:
data = self._monitoreditems_map[event.ClientHandle]
try:
#fields = {}
result = EventResult()
......@@ -106,7 +108,7 @@ class Subscription(object):
self.logger.exception("Exception calling status change handler")
def subscribe_data_change(self, node, attr=ua.AttributeIds.Value):
return self._subscribe(node, attr)
return self._subscribe(node, attr, queuesize=1)
def _get_node(self, nodeid):
if isinstance(nodeid, ua.NodeId):
......@@ -133,7 +135,7 @@ class Subscription(object):
evfilter = self._get_filter_from_event_type(evtype)
return self._subscribe(sourcenode, AttributeIds.EventNotifier, evfilter)
def _subscribe(self, node, attr, mfilter=None):
def _subscribe(self, node, attr, mfilter=None, queuesize=0):
rv = ua.ReadValueId()
rv.NodeId = node.nodeid
rv.AttributeId = attr
......@@ -142,7 +144,7 @@ class Subscription(object):
self._client_handle += 1
mparams.ClientHandle = self._client_handle
mparams.SamplingInterval = self.parameters.RequestedPublishingInterval
mparams.QueueSize = 1
mparams.QueueSize = queuesize
mparams.DiscardOldest = True
if mfilter:
mparams.Filter = mfilter
......@@ -167,7 +169,8 @@ class Subscription(object):
data.attribute = attr
data.server_handle = result.MonitoredItemId
data.mfilter = ua.downcast_extobject(result.FilterResult)
self._monitoreditems_map[mparams.ClientHandle] = data
with self._lock:
self._monitoreditems_map[mparams.ClientHandle] = data
return result.MonitoredItemId
......
"""
server side implementation of subscriptions
"""
import time
import sys
from threading import RLock
import logging
import copy
from opcua import ua
......@@ -126,8 +125,8 @@ class InternalSubscription(object):
self._monitored_datachange = {}
self._monitored_items = {}
self._lock = RLock()
self._triggered_datachanges = []
self._triggered_events = []
self._triggered_datachanges = {}
self._triggered_events = {}
self._triggered_statuschanges = []
self._notification_seq = 1
self._not_acknowledged_results = {}
......@@ -187,14 +186,14 @@ class InternalSubscription(object):
result.SubscriptionId = self.data.SubscriptionId
if self._triggered_datachanges:
notif = ua.DataChangeNotification()
notif.MonitoredItems = self._triggered_datachanges[:]
self._triggered_datachanges = []
notif.MonitoredItems = [item for sublist in self._triggered_datachanges.values() for item in sublist]
self._triggered_datachanges = {}
self.logger.debug("sending datachanges nontification with %s events", len(notif.MonitoredItems))
result.NotificationMessage.NotificationData.append(notif)
if self._triggered_events:
notif = ua.EventNotificationList()
notif.Events = self._triggered_events[:]
self._triggered_events = []
notif.Events = [item for sublist in self._triggered_events.values() for item in sublist]
self._triggered_events = {}
result.NotificationMessage.NotificationData.append(notif)
if self._triggered_statuschanges:
notif = ua.StatusChangeNotification()
......@@ -251,7 +250,7 @@ class InternalSubscription(object):
result = ua.MonitoredItemCreateResult()
if mdata.monitored_item_id == params.MonitoredItemId:
result.RevisedSamplingInterval = self.data.RevisedPublishingInterval
result.RevisedQueueSize = ua.downcast_extobject(params.RequestedParameters.QueueSize) # FIXME check and use value
result.RevisedQueueSize = ua.downcast_extobject(params.RequestedParameters.QueueSize)
result.FilterResult = params.RequestedParameters.Filter
mdata.parameters = result
return result
......@@ -323,7 +322,13 @@ class InternalSubscription(object):
mdata = self._monitored_items[mid]
event.ClientHandle = mdata.client_handle
event.Value = value
self._triggered_datachanges.append(event)
if not mid in self._triggered_datachanges:
self._triggered_datachanges[mid] = [event]
return
if mdata.parameters.RevisedQueueSize:
if len(self._triggered_datachanges[mid]) >= mdata.parameters.RevisedQueueSize:
self._triggered_datachanges[mid].pop(0)
self._triggered_datachanges[mid].append(event)
def trigger_event(self, event):
with self._lock:
......@@ -335,11 +340,20 @@ class InternalSubscription(object):
if not mid in self._monitored_items:
self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event, self)
return False
item = self._monitored_items[mid]
mdata = self._monitored_items[mid]
fieldlist = ua.EventFieldList()
fieldlist.ClientHandle = item.client_handle
fieldlist.EventFields = self._get_event_fields(item.parameters.FilterResult, event)
self._triggered_events.append(fieldlist)
fieldlist.ClientHandle = mdata.client_handle
fieldlist.EventFields = self._get_event_fields(mdata.parameters.FilterResult, event)
if not mid in self._triggered_events:
self._triggered_events[mid] = [fieldlist]
return True
if mdata.parameters.RevisedQueueSize:
if len(self._triggered_events[mid]) >= mdata.parameters.RevisedQueueSize:
self._triggered_events[mid].pop(0)
self._triggered_events[mid].append(fieldlist)
print("\n\n", event)
print(fieldlist)
print(self._triggered_events[mid])
return True
def _get_event_fields(self, evfilter, event):
......@@ -348,10 +362,12 @@ class InternalSubscription(object):
try:
if not sattr.BrowsePath:
val = getattr(event, ua.AttributeIdsInv[sattr.Attribute])
val = copy.deepcopy(val)
fields.append(ua.Variant(val))
else:
name = sattr.BrowsePath[0].Name
val = getattr(event, name)
val = copy.deepcopy(val)
fields.append(ua.Variant(val))
except AttributeError:
fields.append(ua.Variant())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment