Commit 553789ed authored by Christian Bergmiller's avatar Christian Bergmiller Committed by oroulet

end _publish task on BadNoSubscription

parent d35d5e74
......@@ -437,7 +437,7 @@ class UaClient:
self._subscription_callbacks[response.Parameters.SubscriptionId] = callback
self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
if not self._publish_task or self._publish_task.done():
# Start the publish cycle on the first subscription
# Start the publish cycle if it is not yet running
self._publish_task = self.loop.create_task(self._publish())
return response.Parameters
......@@ -451,7 +451,7 @@ class UaClient:
data
)
response.ResponseHeader.ServiceResult.check()
self.logger.info("remove subscription callbacks")
self.logger.info("remove subscription callbacks for %r", subscription_ids)
for sid in subscription_ids:
self._subscription_callbacks.pop(sid)
return response.Results
......@@ -465,31 +465,25 @@ class UaClient:
async def _publish(self):
"""
Cyclically send publish request and wait for the publish response.
Send the `PublishResult` to the matching `Subscription`.
Send publish request and wait for the publish response in an endless loop.
Forward the `PublishResult` to the matching `Subscription` by callback.
"""
ack = None
while True:
if not self._subscription_callbacks:
# End Task if there are no more subscriptions
break
self.logger.debug("publish")
request = ua.PublishRequest()
request.Parameters.SubscriptionAcknowledgements = [ack] if ack else []
data = await self.protocol.send_request(request, timeout=0)
try:
self.protocol.check_answer(data, "while waiting for publish response")
except BadTimeout:
# Spec Part 4, 7.28
# Repeat without Acknowledgement
except BadTimeout: # See Spec. Part 4, 7.28
# Repeat without acknowledgement
ack = None
continue
except BadNoSubscription: # Spec Part 5, 13.8.1
# BadNoSubscription is expected after deleting the last subscription.
#
# We should therefore also check for len(self._publishcallbacks) == 0, but
# this gets us into trouble if a Publish response arrives before the
# DeleteSubscription response.
except BadNoSubscription: # See Spec. Part 5, 13.8.1
# BadNoSubscription is expected to be received after deleting the last subscription.
# We use this as a signal to exit this task and stop sending PublishRequests. This is easier then
# checking if there are no more subscriptions registered in this client (). A Publish response
# could still arrive before the DeleteSubscription response.
#
# We could remove the callback already when sending the DeleteSubscription request,
# but there are some legitimate reasons to keep them around, such as when the server
......@@ -497,14 +491,13 @@ class UaClient:
# the subscription client-side.
#
# There are a variety of ways to act correctly, but the most practical solution seems
# to be to just ignore any BadNoSubscription responses.
# to be to just silently ignore any BadNoSubscription responses.
self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
ack = None
continue
# End task
return
# parse publish response
try:
response = struct_from_binary(ua.PublishResponse, data)
self.logger.debug(response)
except Exception:
# INFO: catching the exception here might be obsolete because we already
# catch BadTimeout above. However, it's not really clear what this code
......@@ -521,11 +514,9 @@ class UaClient:
self.logger.warning("Received data for unknown subscription %s active are %s", subscription_id,
self._subscription_callbacks.keys())
else:
# do callback
try:
callback(response.Parameters)
except Exception:
# we call client code, catch everything!
except Exception: # we call client code, catch everything!
self.logger.exception("Exception while calling user callback: %s")
# Repeat with acknowledgement
ack = ua.SubscriptionAcknowledgement()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment