Commit f3de377a authored by oroulet's avatar oroulet Committed by GitHub

Joey faulkner bugfix/server memory leak (#272)

* implement oroulet changes

* closing_task must be a task not onl run once

* do not catch CancelleError in closing_Task

* make sure we use the correct loop in asyncio binary_server
Co-authored-by: default avatarJoey Faulkner <joeymfaulkner@gmail.com>
parent 97820f0a
......@@ -18,7 +18,7 @@ class OPCUAProtocol(asyncio.Protocol):
Instantiated for every connection.
"""
def __init__(self, iserver: InternalServer, policies, clients, protocol_tasks):
def __init__(self, iserver: InternalServer, policies, clients, closing_tasks):
self.peer_name = None
self.transport = None
self.processor = None
......@@ -26,7 +26,7 @@ class OPCUAProtocol(asyncio.Protocol):
self.iserver: InternalServer = iserver
self.policies = policies
self.clients = clients
self.protocol_tasks = protocol_tasks
self.closing_tasks = closing_tasks
self.messages = asyncio.Queue()
self._task = None
......@@ -50,12 +50,11 @@ class OPCUAProtocol(asyncio.Protocol):
self.transport.close()
self.iserver.asyncio_transports.remove(self.transport)
closing_task = self.iserver.loop.create_task(self.processor.close())
self.protocol_tasks.append(closing_task)
self.closing_tasks.append(closing_task)
if self in self.clients:
self.clients.remove(self)
self.messages.put_nowait((None, None))
self._task.cancel()
self.protocol_tasks.append(self._task)
def data_received(self, data):
self._buffer += data
......@@ -111,7 +110,8 @@ class BinaryServer:
self._server: Optional[asyncio.AbstractServer] = None
self._policies = []
self.clients = []
self.protocol_tasks = []
self.closing_tasks = []
self.cleanup_task = None
def set_policies(self, policies):
self._policies = policies
......@@ -122,7 +122,7 @@ class BinaryServer:
iserver=self.iserver,
policies=self._policies,
clients=self.clients,
protocol_tasks=self.protocol_tasks,
closing_tasks=self.closing_tasks,
)
async def start(self):
......@@ -136,19 +136,35 @@ class BinaryServer:
self.hostname = sockname[0]
self.port = sockname[1]
self.logger.info('Listening on %s:%s', self.hostname, self.port)
self.cleanup_task = self.iserver.loop.create_task(self._await_closing_tasks())
async def stop(self):
self.logger.info('Closing asyncio socket server')
for transport in self.iserver.asyncio_transports:
transport.close()
# Wait for all transport closing tasks to complete
results = await asyncio.gather(*self.protocol_tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
self.logger.error(f"An error ocurred while closing a transport: {result}")
self.protocol_tasks = []
# stop cleanup process and run it a last time
self.cleanup_task.cancel()
try:
await self.cleanup_task
except asyncio.CancelledError:
pass
await self._await_closing_tasks(recursive=False)
if self._server:
self.iserver.loop.call_soon(self._server.close)
await self._server.wait_closed()
async def _await_closing_tasks(self, recursive=True):
while self.closing_tasks:
task = self.closing_tasks.pop()
try:
await task
except asyncio.CancelledError:
# this means a stop request has been sent, it should not be catched
raise
except Exception:
logger.exception("Unexpected crash in BinaryServer._await_closing_tasks")
if recursive:
await asyncio.sleep(10)
await self._await_closing_tasks()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment