Commit 182d0252 authored by Christoph Ziebuhr's avatar Christoph Ziebuhr Committed by oroulet

Fix publishing in high load situations

parent 12df8c31
......@@ -187,8 +187,8 @@ class InternalSession(AbstractSession):
"""COROUTINE"""
return await self.iserver.method_service.call(params)
async def create_subscription(self, params, callback=None):
result = await self.subscription_service.create_subscription(params, callback, external=self.external)
async def create_subscription(self, params, callback, request_callback=None):
result = await self.subscription_service.create_subscription(params, callback, request_callback=request_callback)
self.subscriptions.append(result.SubscriptionId)
return result
......@@ -226,8 +226,8 @@ class InternalSession(AbstractSession):
def publish(self, acks: Optional[Iterable[ua.SubscriptionAcknowledgement]] = None):
return self.subscription_service.publish(acks or [])
def modify_subscription(self, params, callback):
return self.subscription_service.modify_subscription(params, callback)
def modify_subscription(self, params):
return self.subscription_service.modify_subscription(params)
async def transfer_subscriptions(self, params: ua.TransferSubscriptionsParameters) -> List[ua.TransferResult]:
# Subscriptions aren't bound to a Session and can be transfered!
......
......@@ -18,17 +18,20 @@ class InternalSubscription:
"""
def __init__(self, data: ua.CreateSubscriptionResult, aspace: AddressSpace,
callback=None, no_acks=False):
callback, request_callback=None):
"""
:param loop: Event loop instance
:param data: Create Subscription Result
:param aspace: Server Address Space
:param callback: Callback for publishing
:param no_acks: If true no acknowledging will be expected (for server internal subscriptions)
:param request_callback: Callback for getting queued publish requests.
If None, publishing will be done without waiting for a token and no
acknowledging will be expected (for server internal subscriptions)
"""
self.logger = logging.getLogger(__name__)
self.data: ua.CreateSubscriptionResult = data
self.pub_result_callback = callback
self.pub_request_callback = request_callback
self.monitored_item_srv = MonitoredItemService(self, aspace)
self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {}
self._triggered_events: Dict[int, List[ua.EventFieldList]] = {}
......@@ -40,7 +43,6 @@ class InternalSubscription:
self._keep_alive_count = 0
self._publish_cycles_count = 0
self._task = None
self.no_acks = no_acks
def __str__(self):
return f"Subscription(id:{self.data.SubscriptionId})"
......@@ -101,26 +103,36 @@ class InternalSubscription:
self._keep_alive_count += 1
return False
async def publish_results(self):
async def publish_results(self, requestdata=None):
"""
Publish all enqueued data changes, events and status changes though the callback.
This method gets first called without publish request from subscription loop.
It tries to get a publish request itself (if needed). If it doesn't succeed, method gets
queued to be called back with publish request when one is available.
"""
if self._publish_cycles_count > self.data.RevisedLifetimeCount:
self.logger.warning("Subscription %s has expired, publish cycle count(%s) > lifetime count (%s)", self,
self._publish_cycles_count, self.data.RevisedLifetimeCount)
# FIXME this will never be send since we do not have publish request anyway
await self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
result = None
if self.has_published_results():
if not self.no_acks:
if not self.has_published_results():
return False
# called from loop and external request
if requestdata is None and self.pub_request_callback:
# get publish request or queue us to be called back
requestdata = self.pub_request_callback(self.data.SubscriptionId)
if requestdata is None:
self._publish_cycles_count += 1
result = self._pop_publish_result()
if result is not None:
# self.logger.info('publish_results for %s', self.data.SubscriptionId)
# The callback can be:
# Subscription.publish_callback -> server internal subscription
# UaProcessor.forward_publish_response -> client subscription
return False
result = self._pop_publish_result()
# self.logger.info('publish_results for %s', self.data.SubscriptionId)
if requestdata is None:
# Subscription.publish_callback -> server internal subscription
await self.pub_result_callback(result)
else:
# UaProcessor.forward_publish_response -> client subscription
await self.pub_result_callback(result, requestdata)
return True
def _pop_publish_result(self) -> ua.PublishResult:
"""
......@@ -133,9 +145,10 @@ class InternalSubscription:
self._pop_triggered_events(result)
self._pop_triggered_statuschanges(result)
self._keep_alive_count = 0
self._publish_cycles_count = 0
self._startup = False
result.NotificationMessage.SequenceNumber = self._notification_seq
if result.NotificationMessage.NotificationData and not self.no_acks:
if result.NotificationMessage.NotificationData and self.pub_request_callback:
# Acknowledgement is only expected when the Subscription is for a client.
self._notification_seq += 1
self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
......@@ -177,7 +190,6 @@ class InternalSubscription:
:param acks: Sequence number of the PublishResults to acknowledge
"""
# self.logger.info("publish request with acks %s", acks)
self._publish_cycles_count = 0
for nb in acks:
self._not_acknowledged_results.pop(nb, None)
......
......@@ -24,11 +24,7 @@ class SubscriptionService:
self.subscriptions: Dict[int, InternalSubscription] = {}
self._sub_id_counter = 77
@property
def active_subscription_ids(self):
return self.subscriptions.keys()
async def create_subscription(self, params, callback=None, external=False):
async def create_subscription(self, params, callback, request_callback=None):
self.logger.info("create subscription")
result = ua.CreateSubscriptionResult()
result.RevisedPublishingInterval = params.RequestedPublishingInterval
......@@ -36,14 +32,14 @@ class SubscriptionService:
result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
self._sub_id_counter += 1
result.SubscriptionId = self._sub_id_counter
internal_sub = InternalSubscription(result, self.aspace, callback=callback, no_acks=not external)
internal_sub = InternalSubscription(result, self.aspace, callback, request_callback=request_callback)
await internal_sub.start()
self.subscriptions[result.SubscriptionId] = internal_sub
return result
def modify_subscription(self, params, callback):
def modify_subscription(self, params):
# Requested params are ignored, result = params set during create_subscription.
self.logger.info("modify subscription with callback: %s", callback)
self.logger.info("modify subscription")
result = ua.ModifySubscriptionResult()
try:
sub = self.subscriptions[params.SubscriptionId]
......
import copy
import time
import logging
from typing import Deque, Optional
from typing import Deque, Optional, Dict
from collections import deque
from asyncua import ua
......@@ -34,8 +34,9 @@ class UaProcessor:
self._transport = transport
# deque for Publish Requests
self._publish_requests: Deque[PublishRequestData] = deque()
# used when we need to wait for PublishRequest
self._publish_results: Deque[ua.PublishResult] = deque()
# queue for publish results callbacks (using SubscriptionId)
# rely on dict insertion order (therefore can't use set())
self._publish_results_subs: Dict[ua.IntegerId, bool] = {}
self._limits = copy.deepcopy(limits) # Copy limits because they get overriden
self._connection = SecureConnection(ua.SecurityPolicy(), self._limits)
......@@ -63,20 +64,17 @@ class UaProcessor:
response.Parameters = channel
self.send_response(request.RequestHeader.RequestHandle, seqhdr, response, ua.MessageType.SecureOpen)
async def forward_publish_response(self, result: ua.PublishResult):
"""
Try to send a `PublishResponse` with the given `PublishResult`.
"""
# _logger.info("forward publish response %s", result)
def get_publish_request(self, subscription_id: ua.IntegerId):
while True:
if not self._publish_requests:
self._publish_results.append(result)
# only store one callback per subscription
self._publish_results_subs[subscription_id] = True
_logger.info(
"Server wants to send publish answer but no publish request is available,"
"enqueuing notification, length of result queue is %s",
len(self._publish_results)
"enqueuing publish results callback, length of queue is %s",
len(self._publish_results_subs)
)
return
return None
# We pop left from the Publish Request deque (FIFO)
requestdata = self._publish_requests.popleft()
if (requestdata.requesthdr.TimeoutHint == 0 or
......@@ -84,6 +82,13 @@ class UaProcessor:
time.time() - requestdata.timestamp < requestdata.requesthdr.TimeoutHint / 1000):
# Continue and use `requestdata` only if there was no timeout
break
return requestdata
async def forward_publish_response(self, result: ua.PublishResult, requestdata: PublishRequestData):
"""
Try to send a `PublishResponse` with the given `PublishResult`.
"""
# _logger.info("forward publish response %s", result)
response = ua.PublishResponse()
response.Parameters = result
self.send_response(requestdata.requesthdr.RequestHandle, requestdata.seqhdr, response)
......@@ -345,7 +350,7 @@ class UaProcessor:
elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
_logger.info("create subscription request (%s)", user)
params = struct_from_binary(ua.CreateSubscriptionParameters, body)
result = await self.session.create_subscription(params, callback=self.forward_publish_response)
result = await self.session.create_subscription(params, self.forward_publish_response, request_callback=self.get_publish_request)
response = ua.CreateSubscriptionResponse()
response.Parameters = result
# _logger.info("sending create subscription response")
......@@ -355,7 +360,7 @@ class UaProcessor:
_logger.info("modify subscription request")
params = struct_from_binary(ua.ModifySubscriptionParameters, body)
result = self.session.modify_subscription(params, self.forward_publish_response)
result = self.session.modify_subscription(params)
response = ua.ModifySubscriptionResponse()
response.Parameters = result
......@@ -429,19 +434,22 @@ class UaProcessor:
if not self.session:
return False
params = struct_from_binary(ua.PublishParameters, body)
self.session.publish(params.SubscriptionAcknowledgements)
data = PublishRequestData(requesthdr=requesthdr, seqhdr=seqhdr)
# Store the Publish Request (will be used to send publish answers from server)
self._publish_requests.append(data)
# If there is an enqueued result forward it immediately
while self._publish_results:
result = self._publish_results.popleft()
if result.SubscriptionId not in self.session.subscription_service.active_subscription_ids:
# Discard the result if the subscription is no longer active
# If there is an enqueued publish results callback, try to call it immediately
while self._publish_results_subs:
subscription_id = next(iter(self._publish_results_subs))
self._publish_results_subs.pop(subscription_id)
sub = self.session.subscription_service.subscriptions.get(subscription_id)
if sub is None:
# subscription is no longer active
continue
await self.forward_publish_response(result)
break
self.session.publish(params.SubscriptionAcknowledgements)
# _logger.debug("publish forward to server")
if await sub.publish_results(data):
# publish request has been consumed
break
else:
# Store the Publish Request (will be used to send publish answers from server)
self._publish_requests.append(data)
elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
_logger.info("re-publish request (%s)", user)
......
......@@ -1021,3 +1021,63 @@ async def test_maxkeepalive_count(opc, mocker):
mock_create_subscription.reset_mock()
sub = await client.create_subscription(mock_period, sub_handler)
mock_update_subscription.assert_not_called()
@pytest.mark.parametrize("opc", ["client"], indirect=True)
async def test_publish(opc, mocker):
client, _ = opc
o = opc.opc.nodes.objects
var = await o.add_variable(3, 'SubscriptionVariable', 0)
publish_event = asyncio.Event()
publish_org = client.uaclient.publish
async def publish(acks):
await publish_event.wait()
publish_event.clear()
return await publish_org(acks)
class PublishCallback:
def __init__(self):
self.fut = asyncio.Future()
def reset(self):
self.fut = Future()
def set_result(self, publish_result):
values = []
if publish_result.NotificationMessage.NotificationData is not None:
for notif in publish_result.NotificationMessage.NotificationData:
if isinstance(notif, ua.DataChangeNotification):
values.extend((item.Value.Value.Value for item in notif.MonitoredItems))
self.fut.set_result(values)
async def result(self):
return await wait_for(asyncio.shield(self.fut), 1)
publish_callback = PublishCallback()
mocker.patch.object(asyncua.common.subscription.Subscription, "publish_callback", publish_callback.set_result)
mocker.patch.object(client.uaclient, "publish", publish)
sub = await client.create_subscription(30, None)
await sub.subscribe_data_change(var, queuesize=2)
with pytest.raises(asyncio.TimeoutError):
await publish_callback.result()
publish_event.set()
result = await publish_callback.result()
publish_callback.reset()
assert result == [0]
for val in [1, 2, 3, 4]:
await var.write_value(val)
await asyncio.sleep(0.1)
with pytest.raises(asyncio.TimeoutError):
await publish_callback.result()
publish_event.set()
result = await publish_callback.result()
publish_callback.reset()
assert result == [3, 4]
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment