Commit 001e0163 authored by David Wilson's avatar David Wilson

issue #156: handle multiple _put() before wake of first sleeper

- If latch.get() is called and the queue is empty, a thread is put to
  sleep.

- If Latch.put() from another thread then appends an item to the queue and
  wakes the sleeping thread, and

- If a subsequent Latch.put() from the same or another thread manages to
  acquire `lock` before the sleeping thread is scheduled,

- The sleeping thread's wake socket would have multiple bytes written to
  it.

Therefore create a new _pending variable to track the only item assigned
to each thread (keyed by its write socket), and remove the socket from
`sleeping` from within put.
parent 168a954d
......@@ -911,13 +911,14 @@ class Latch(object):
def __init__(self):
self._lock = threading.Lock()
self._queue = []
self._waiters = []
self._sleeping = []
self._pending = {}
def close(self):
self._lock.acquire()
try:
self.closed = True
for wsock in self._waiters:
for wsock in self._sleeping:
self._wake(wsock)
finally:
self._lock.release()
......@@ -939,13 +940,13 @@ class Latch(object):
try:
if self.closed:
raise LatchError()
if self._queue and not self._waiters:
if self._queue:
_vv and IOLOG.debug('%r.get() -> %r', self, self._queue[0])
return self._queue.pop(0)
if not block:
raise TimeoutError()
self._tls_init()
self._waiters.append(_tls.wsock)
self._sleeping.append(_tls.wsock)
finally:
self._lock.release()
......@@ -955,15 +956,16 @@ class Latch(object):
self._lock.acquire()
try:
self._waiters.remove(_tls.wsock)
if self.closed:
raise LatchError()
if not rfds:
raise TimeoutError()
assert _tls.rsock.recv(1) == '\x7f'
if _tls.rsock.recv(2) != '\x7f':
raise LatchError('internal error: received >1 wakeups')
try:
_vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0])
return self._queue.pop(0)
obj = self._pending.pop(_tls.wsock)
_vv and IOLOG.debug('%r.get() wake -> %r', self, obj)
return obj
except IndexError:
IOLOG.exception('%r.get() INDEX ERROR', self)
raise
......@@ -976,11 +978,14 @@ class Latch(object):
try:
if self.closed:
raise LatchError()
self._queue.append(obj)
if self._waiters:
if self._sleeping:
sock = self._sleeping.pop(0)
self._pending[sock] = obj
_vv and IOLOG.debug('%r.put() -> waking wfd=%r',
self, self._waiters[0].fileno())
self._wake(self._waiters[0])
self, sock.fileno())
self._wake(sock)
else:
self._queue.append(obj)
finally:
self._lock.release()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment