Commit 65df3689 authored by David Wilson's avatar David Wilson

issue #140: prevent duplicate watcher thread creation

When a Broker() is running with install_watcher=True, arrange for only
one watcher thread to exist for each target thread, and to reset the
mapping of watchers to targets after process fork.

This is probably the last change I want to make to the watcher feature
before deciding to rip it out, it may be more trouble than it is worth.
parent 1b93a4f5
...@@ -117,16 +117,46 @@ def scan_code_imports(co): ...@@ -117,16 +117,46 @@ def scan_code_imports(co):
co.co_consts[arg2] or ()) co.co_consts[arg2] or ())
_join_lock = threading.Lock()
_join_process_id = None
_join_callbacks_by_target = {}
_join_thread_by_target = {}
def _join_thread_reset():
"""If we have forked since the watch dictionaries were initialized, all
that has is garbage, so clear it."""
global _join_process_id
if os.getpid() != _join_process_id:
_join_process_id = os.getpid()
_join_callbacks_by_target.clear()
_join_thread_by_target.clear()
def join_thread_async(target_thread, on_join): def join_thread_async(target_thread, on_join):
"""Start a thread that waits for another thread to shutdown, before """Start a thread that waits for another thread to shutdown, before
invoking `on_join()`. In CPython it seems possible to use this method to invoking `on_join()`. In CPython it seems possible to use this method to
ensure a non-main thread is signalled when the main thread has exitted, ensure a non-main thread is signalled when the main thread has exitted,
using yet another thread as a proxy.""" using yet another thread as a proxy."""
def _watch(): def _watch():
target_thread.join() target_thread.join()
for on_join in _join_callbacks_by_target[target_thread]:
on_join() on_join()
thread = threading.Thread(target=_watch)
thread.start() _join_lock.acquire()
try:
_join_thread_reset()
_join_callbacks_by_target.setdefault(target_thread, []).append(on_join)
if target_thread not in _join_thread_by_target:
_join_thread_by_target[target_thread] = threading.Thread(
name='mitogen.master.join_thread_async',
target=_watch,
)
_join_thread_by_target[target_thread].start()
finally:
_join_lock.release()
class SelectError(mitogen.core.Error): class SelectError(mitogen.core.Error):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment