Commit d0e49b1e authored by oroulet's avatar oroulet

create, cancel and await session task correctly

parent 7e4631ae
......@@ -59,6 +59,7 @@ class Client(object):
self.nodes = Shortcuts(self.uaclient)
self.max_messagesize = 0 # No limits
self.max_chunkcount = 0 # No limits
self._renew_channel_task = None
async def __aenter__(self):
await self.connect()
......@@ -233,7 +234,6 @@ class Client(object):
Send OPC-UA hello to server
"""
ack = await self.uaclient.send_hello(self.server_url.geturl(), self.max_messagesize, self.max_chunkcount)
# TODO: Handle ua.UaError
if isinstance(ack, ua.UaStatusCodeError):
raise ack
......@@ -338,30 +338,26 @@ class Client(object):
self._policy_ids = ep.UserIdentityTokens
# Actual maximum number of milliseconds that a Session shall remain open without activity
self.session_timeout = response.RevisedSessionTimeout
self._schedule_renew_session()
self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
return response
def _schedule_renew_session(self, renew_session: bool=False):
# if the session was intentionally closed `session_timeout` will be None
if renew_session and self.session_timeout:
self.loop.create_task(self._renew_session())
self.loop.call_later(
# 0.7 is from spec
min(self.session_timeout, self.secure_channel_timeout) * 0.7,
self._schedule_renew_session, True
)
async def _renew_session(self):
async def _renew_channel_loop(self):
"""
Renew the SecureChannel before the SessionTimeout will happen.
In theory we could do that only if no session activity
but it does not cost much..
"""
state_node = self.nodes.server_state
self.logger.debug("renewing channel")
await self.open_secure_channel(renew=True)
val = await state_node.get_value()
self.logger.debug("server state is: %s ", val)
try:
duration = min(self.session_timeout, self.secure_channel_timeout) * 0.7
while True:
# 0.7 is from spec
await asyncio.sleep(duration)
self.logger.debug("renewing channel")
await self.open_secure_channel(renew=True)
val = await self.nodes.server_state.get_value()
self.logger.debug("server state is: %s ", val)
except asyncio.CancelledError:
pass
def server_policy_id(self, token_type, default):
"""
......@@ -452,12 +448,13 @@ class Client(object):
data, uri = security_policies.encrypt_asymmetric(pubkey, etoken, policy_uri)
return data, uri
def close_session(self) -> Coroutine:
async def close_session(self) -> Coroutine:
"""
Close session
"""
self.session_timeout = None
return self.uaclient.close_session(True)
self._renew_channel_task.cancel()
await self._renew_channel_task
return await self.uaclient.close_session(True)
def get_root_node(self):
return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.RootFolder))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment