Commit f48efc42 authored by oroulet's avatar oroulet

since we cannot rely on cancel() (==python bug) to stop a task then use a variable

parent 3cba3a95
......@@ -80,6 +80,7 @@ class Client:
self._monitor_server_task = None
self._locale = ["en"]
self._watchdog_intervall = watchdog_intervall
self._closing: bool = False
async def __aenter__(self):
await self.connect()
......@@ -449,7 +450,7 @@ class Client:
"""
timeout = min(self.session_timeout / 1000 / 2, self._watchdog_intervall)
try:
while True:
while not self._closing:
await asyncio.sleep(timeout)
# @FIXME handle state change
_ = await self.nodes.server_state.read_value()
......@@ -467,7 +468,7 @@ class Client:
# Part4 5.5.2.1:
# Clients should request a new SecurityToken after 75 % of its lifetime has elapsed
duration = self.secure_channel_timeout * 0.75 / 1000
while True:
while not self._closing:
await asyncio.sleep(duration)
_logger.debug("renewing channel")
await self.open_secure_channel(renew=True)
......@@ -581,6 +582,7 @@ class Client:
"""
Close session
"""
self._closing = True
if self._monitor_server_task:
self._monitor_server_task.cancel()
try:
......
......@@ -264,6 +264,7 @@ class UaClient:
self.protocol: UASocketProtocol = None
self._publish_task = None
self._pre_request_hook: Optional[Callable[[], Awaitable[None]]] = None
self._closing: bool = False
def set_security(self, policy: ua.SecurityPolicy):
self.security_policy = policy
......@@ -286,6 +287,7 @@ class UaClient:
async def connect_socket(self, host: str, port: int):
"""Connect to server socket."""
self.logger.info("opening connection")
self._closing: bool = False
# Timeout the connection when the server isn't available
await asyncio.wait_for(asyncio.get_running_loop().create_connection(self._make_protocol, host, port), self._timeout)
......@@ -316,6 +318,8 @@ class UaClient:
async def create_session(self, parameters):
self.logger.info("create_session")
# FIXME: setting a value on an object to set it its state is suspicious,
# especially when that object has its own state
self.protocol.closed = False
request = ua.CreateSessionRequest()
request.Parameters = parameters
......@@ -342,6 +346,7 @@ class UaClient:
self.logger.warning("close_session but connection wasn't established")
return
self.protocol.closed = True
self._closing = True
if self._publish_task and not self._publish_task.done():
self._publish_task.cancel()
if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
......@@ -477,7 +482,7 @@ class UaClient:
"create_subscription success SubscriptionId %s",
response.Parameters.SubscriptionId
)
if not self._publish_task or self._publish_task.done() :
if not self._publish_task or self._publish_task.done():
# Start the publish loop if it is not yet running
# The current strategy is to have only one open publish request per UaClient. This might not be enough
# in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
......@@ -533,7 +538,7 @@ class UaClient:
Forward the `PublishResult` to the matching `Subscription` by callback.
"""
ack = None
while True:
while not self._closing:
try:
response = await self.publish([ack] if ack else [])
except BadTimeout: # See Spec. Part 4, 7.28
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment