Commit 03cd962f authored by Christian Bergmiller's avatar Christian Bergmiller Committed by oroulet

removed _send_request callback param

parent 8b55fd35
......@@ -20,7 +20,7 @@ class UASocketProtocol(asyncio.Protocol):
CLOSED = 'closed'
def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
self.logger = logging.getLogger(__name__ + ".UASocketProtocol")
self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
self.loop = loop or asyncio.get_event_loop()
self.transport = None
self.receive_buffer: bytes = None
......@@ -89,7 +89,7 @@ class UASocketProtocol(asyncio.Protocol):
else:
raise ua.UaError(f"Unsupported message type: {msg}")
def _send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage):
def _send_request(self, request, timeout=1000, message_type=ua.MessageType.SecureMessage):
"""
Send request to server, lower-level method.
Timeout is the timeout written in ua header.
......@@ -106,25 +106,23 @@ class UASocketProtocol(asyncio.Protocol):
raise
self._request_id += 1
future = asyncio.Future()
if callback:
future.add_done_callback(callback)
self._callbackmap[self._request_id] = future
msg = self._connection.message_to_binary(binreq, message_type=message_type, request_id=self._request_id)
self.transport.write(msg)
return future
async def send_request(self, request, callback=None, timeout=10, message_type=ua.MessageType.SecureMessage):
async def send_request(self, request, timeout=10, message_type=ua.MessageType.SecureMessage):
"""
Send a request to the server.
Timeout is the timeout written in ua header.
Returns response object if no callback is provided.
"""
future = self._send_request(request, callback, timeout, message_type)
if not callback:
await asyncio.wait_for(future, timeout if timeout else None)
data = future.result()
self.check_answer(data, " in response to " + request.__class__.__name__)
return data
data = await asyncio.wait_for(
self._send_request(request, timeout, message_type),
timeout if timeout else None
)
self.check_answer(data, f" in response to {request.__class__.__name__}")
return data
def check_answer(self, data, context):
data = data.copy()
......@@ -213,7 +211,7 @@ class UaClient:
"""
def __init__(self, timeout=1, loop=None):
self.logger = logging.getLogger(__name__ + '.UaClient')
self.logger = logging.getLogger(f'{__name__}.UaClient')
self.loop = loop or asyncio.get_event_loop()
self._publish_callbacks = {}
self._timeout = timeout
......@@ -442,11 +440,8 @@ class UaClient:
acks = []
request = ua.PublishRequest()
request.Parameters.SubscriptionAcknowledgements = acks
await self.protocol.send_request(request, self._sub_data_received, timeout=0)
def _sub_data_received(self, future):
data = future.result()
self.loop.create_task(self._call_publish_callback(data))
# We do not wait for publish response in this task
self.loop.create_task(self._send_publish_request(request))
"""
# to avoid fire and forget of task we could use a loop:
......@@ -467,8 +462,13 @@ class UaClient:
await self._call_publish_callback(data)
"""
async def _call_publish_callback(self, data):
self.logger.info("call_publish_callback")
async def _send_publish_request(self, request):
"""
Send publish request and wait for publish response.
:param request:
:return:
"""
data = await self.protocol.send_request(request, timeout=0)
try:
self.protocol.check_answer(data, "while waiting for publish response")
except BadTimeout:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment