Commit 22cc1a36 authored by David Wilson's avatar David Wilson

issue #155: core: refactor Latch to avoid TLS use

TLS destructors are not called after fork, therefore we must explicitly
track a global list of free file descriptors, and arrange for that list
to explicitly be destroyed from fork.py.
parent 2cf9edc8
...@@ -899,12 +899,20 @@ def _unpickle_context(router, context_id, name): ...@@ -899,12 +899,20 @@ def _unpickle_context(router, context_id, name):
class Latch(object): class Latch(object):
closed = False closed = False
_waking = 0 _waking = 0
_sockets = []
def __init__(self): def __init__(self):
self._lock = threading.Lock() self._lock = threading.Lock()
self._queue = [] self._queue = []
self._sleeping = [] self._sleeping = []
@classmethod
def _on_fork(cls):
while cls._sockets:
rsock, wsock = cls._sockets.pop()
rsock.close()
wsock.close()
def close(self): def close(self):
self._lock.acquire() self._lock.acquire()
try: try:
...@@ -919,10 +927,12 @@ class Latch(object): ...@@ -919,10 +927,12 @@ class Latch(object):
return len(self._queue) == 0 return len(self._queue) == 0
def _tls_init(self): def _tls_init(self):
if not hasattr(_tls, 'rsock'): if self._sockets:
_tls.rsock, _tls.wsock = socket.socketpair() return self._sockets.pop()
set_cloexec(_tls.rsock.fileno()) rsock, wsock = socket.socketpair()
set_cloexec(_tls.wsock.fileno()) set_cloexec(rsock.fileno())
set_cloexec(wsock.fileno())
return rsock, wsock
def get(self, timeout=None, block=True): def get(self, timeout=None, block=True):
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)',
...@@ -938,30 +948,31 @@ class Latch(object): ...@@ -938,30 +948,31 @@ class Latch(object):
return self._queue.pop(i) return self._queue.pop(i)
if not block: if not block:
raise TimeoutError() raise TimeoutError()
self._tls_init() rsock, wsock = self._tls_init()
self._sleeping.append(_tls.wsock) self._sleeping.append(wsock)
finally: finally:
self._lock.release() self._lock.release()
return self._get_sleep(timeout, block) return self._get_sleep(timeout, block, rsock, wsock)
def _get_sleep(self, timeout, block): def _get_sleep(self, timeout, block, rsock, wsock):
_vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r)', _vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r)',
self, timeout, block) self, timeout, block)
e = None e = None
try: try:
io_op(select.select, [_tls.rsock], [], [], timeout) io_op(select.select, [rsock], [], [], timeout)
except Exception, e: except Exception, e:
pass pass
self._lock.acquire() self._lock.acquire()
try: try:
i = self._sleeping.index(_tls.wsock) i = self._sleeping.index(wsock)
del self._sleeping[i] del self._sleeping[i]
self._sockets.append((rsock, wsock))
if i >= self._waking: if i >= self._waking:
raise TimeoutError() raise TimeoutError()
self._waking -= 1 self._waking -= 1
if _tls.rsock.recv(2) != '\x7f': if rsock.recv(2) != '\x7f':
raise LatchError('internal error: received >1 wakeups') raise LatchError('internal error: received >1 wakeups')
if e: if e:
raise e raise e
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment