Commit a5cc7cb4 authored by David Wilson's avatar David Wilson

issue #156: add extra debugging around Latch

Change from writing '\x00' to writing '\x7f', and verify that is the
byte that woke the sleeping thread. Add a bunch more IO logging.
parent ac7a64df
...@@ -188,7 +188,7 @@ def is_blacklisted_import(importer, fullname): ...@@ -188,7 +188,7 @@ def is_blacklisted_import(importer, fullname):
treated as blacklisted. treated as blacklisted.
""" """
return ((not any(fullname.startswith(s) for s in importer.whitelist)) or return ((not any(fullname.startswith(s) for s in importer.whitelist)) or
(any(fullname.startswith(s) for s in importer.blacklist))) (any(fullname.startswith(s) for s in importer.blacklist)))
def set_cloexec(fd): def set_cloexec(fd):
...@@ -913,30 +913,36 @@ class Latch(object): ...@@ -913,30 +913,36 @@ class Latch(object):
self.queue = [] self.queue = []
self.wake_socks = [] self.wake_socks = []
def _tls_init(self):
if not hasattr(_tls, 'rsock'):
_tls.rsock, _tls.wsock = socket.socketpair()
set_cloexec(_tls.rsock.fileno())
set_cloexec(_tls.wsock.fileno())
def close(self): def close(self):
self.lock.acquire() self.lock.acquire()
try: try:
self.closed = True self.closed = True
for sock in self.wake_socks: while self.wake_socks:
self._wake(sock) self._wake(self.wake_socks.pop())
finally: finally:
self.lock.release() self.lock.release()
def empty(self): def empty(self):
return len(self.queue) == 0 return len(self.queue) == 0
def _tls_init(self):
if not hasattr(_tls, 'rsock'):
_tls.rsock, _tls.wsock = socket.socketpair()
set_cloexec(_tls.rsock.fileno())
set_cloexec(_tls.wsock.fileno())
def get(self, timeout=None, block=True): def get(self, timeout=None, block=True):
_vv and IOLOG.debug(
'%r.get(timeout=%r, block=%r)',
self, timeout, block
)
self.lock.acquire() self.lock.acquire()
try: try:
if self.closed: if self.closed:
raise LatchError() raise LatchError()
if self.queue: if self.queue:
_vv and IOLOG.debug('%r.get() -> %r', self, self.queue[0])
return self.queue.pop(0) return self.queue.pop(0)
if not block: if not block:
raise TimeoutError() raise TimeoutError()
...@@ -945,6 +951,7 @@ class Latch(object): ...@@ -945,6 +951,7 @@ class Latch(object):
finally: finally:
self.lock.release() self.lock.release()
_vv and IOLOG.debug('%r.get() -> sleeping', self)
rfds, _, _ = restart(select.select, [_tls.rsock], [], [], timeout) rfds, _, _ = restart(select.select, [_tls.rsock], [], [], timeout)
assert len(rfds) or timeout is not None assert len(rfds) or timeout is not None
...@@ -958,7 +965,8 @@ class Latch(object): ...@@ -958,7 +965,8 @@ class Latch(object):
raise TimeoutError() raise TimeoutError()
assert _tls.rsock in rfds assert _tls.rsock in rfds
_tls.rsock.recv(1) assert _tls.rsock.recv(1) == '\x7f'
_vv and IOLOG.debug('%r.get() wake -> %r', self, self.queue[0])
return self.queue.pop(0) return self.queue.pop(0)
finally: finally:
self.lock.release() self.lock.release()
...@@ -979,11 +987,21 @@ class Latch(object): ...@@ -979,11 +987,21 @@ class Latch(object):
def _wake(self, sock): def _wake(self, sock):
try: try:
os.write(sock.fileno(), '\x00') os.write(sock.fileno(), '\x7f')
except OSError, e: except OSError, e:
if e[0] != errno.EBADF: if e[0] != errno.EBADF:
raise raise
def __repr__(self):
rsock = getattr(_tls, 'rsock', None)
wsock = getattr(_tls, 'wsock', None)
return 'Latch(%r, t=%r, r=%r, w=%r)' % (
id(self),
threading.currentThread().name,
rsock and rsock.fileno(),
wsock and wsock.fileno(),
)
class Waker(BasicStream): class Waker(BasicStream):
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment