Commit c2482f8d authored by Olivier's avatar Olivier Committed by oroulet

fix usage of timeout for sync futures in ThreadLoop

parent 36673c75
...@@ -34,7 +34,7 @@ class ThreadLoopNotRunning(Exception): ...@@ -34,7 +34,7 @@ class ThreadLoopNotRunning(Exception):
class ThreadLoop(Thread): class ThreadLoop(Thread):
def __init__(self, timeout=None): def __init__(self, timeout: Optional[float] = 120) -> None:
Thread.__init__(self) Thread.__init__(self)
self.loop = None self.loop = None
self._cond = Condition() self._cond = Condition()
...@@ -61,8 +61,8 @@ class ThreadLoop(Thread): ...@@ -61,8 +61,8 @@ class ThreadLoop(Thread):
self.loop.close() self.loop.close()
def post(self, coro): def post(self, coro):
if not self.loop or not self.loop.is_running(): if not self.loop or not self.loop.is_running() or not self.is_alive():
raise ThreadLoopNotRunning(f"could not post {coro}") raise ThreadLoopNotRunning(f"could not post {coro} since asyncio loop in thread has not been started or has been stopped")
futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop) futur = asyncio.run_coroutine_threadsafe(coro, loop=self.loop)
return futur.result(self.timeout) return futur.result(self.timeout)
...@@ -219,18 +219,18 @@ class _SubHandler: ...@@ -219,18 +219,18 @@ class _SubHandler:
class Client: class Client:
def __init__(self, url: str, timeout: int = 4, tloop=None): """
Sync Client, see doc for async Client
the sync client has one extra parameter: sync_wrapper_timeout.
if no ThreadLoop is provided this timeout is used to define how long the sync wrapper
waits for an async call to return. defualt is 120s and hopefully should fit most applications
"""
def __init__(self, url: str, timeout: int = 4, tloop=None, sync_wrapper_timeout: Optional[float] = 120,) -> None:
self.tloop = tloop self.tloop = tloop
self.close_tloop = False self.close_tloop = False
if not self.tloop: if not self.tloop:
# setting timeout here in sync layer is not very correct, that timeout is in reality self.tloop = ThreadLoop(sync_wrapper_timeout)
# used for every network calls. Operations like connect() generate several calls
# but anyway that timeout must be much longer than network latency so that default
# should work for everyone
if timeout > 0:
# we do not want to go into timeout at the same time as lower level async code
timeout = timeout * 0.1
self.tloop = ThreadLoop(timeout)
self.tloop.start() self.tloop.start()
self.close_tloop = True self.close_tloop = True
self.aio_obj = client.Client(url, timeout) self.aio_obj = client.Client(url, timeout)
...@@ -531,11 +531,18 @@ class Shortcuts: ...@@ -531,11 +531,18 @@ class Shortcuts:
class Server: class Server:
def __init__(self, shelf_file=None, tloop=None): """
Sync Server, see doc for async Server
the sync server has one extra parameter: sync_wrapper_timeout.
if no ThreadLoop is provided this timeout is used to define how long the sync wrapper
waits for an async call to return. defualt is 120s and hopefully should fit most applications
"""
def __init__(self, shelf_file=None, tloop=None, sync_wrapper_timeout: Optional[float] = 120,):
self.tloop = tloop self.tloop = tloop
self.close_tloop = False self.close_tloop = False
if not self.tloop: if not self.tloop:
self.tloop = ThreadLoop() self.tloop = ThreadLoop(timeout=sync_wrapper_timeout)
self.tloop.start() self.tloop.start()
self.close_tloop = True self.close_tloop = True
self.aio_obj = server.Server() self.aio_obj = server.Server()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment