Commit 526b0a51 authored by David Wilson's avatar David Wilson

issue #156: prevent Latch.close() triggering spurious wakeups

parent 18e2977b
...@@ -889,11 +889,17 @@ Latch.close() ...@@ -889,11 +889,17 @@ Latch.close()
~~~~~~~~~~~ ~~~~~~~~~~~
:py:meth:`mitogen.core.Latch.close` acquires `lock`, sets `closed` to :py:meth:`mitogen.core.Latch.close` acquires `lock`, sets `closed` to
:py:data:`True`, then writes a byte to every socket in `sleeping`. Per above, :py:data:`True`, then writes a byte to every `sleeping[waking]` socket, while
on waking from sleep, after removing itself from `sleeping`, each sleeping incrementing `waking`, until no more unwoken sockets exist. Per above, on
thread tests if `closed` is :py:data:`True`, and if so throws waking from sleep, after removing itself from `sleeping`, each sleeping thread
tests if `closed` is :py:data:`True`, and if so throws
:py:class:`mitogen.core.LatchError`. :py:class:`mitogen.core.LatchError`.
It is necessary to ensure at most one byte is delivered on each socket, even if
the latch is being torn down, as the sockets outlive the scope of a single
latch, and must never have extraneous data buffered on them, as this will cause
unexpected wakeups if future latches sleep on the same thread.
Latch.get() Latch.get()
~~~~~~~~~~~ ~~~~~~~~~~~
...@@ -930,6 +936,11 @@ item. ...@@ -930,6 +936,11 @@ item.
`waking`, then pops and returns the first item in `queue` that is `waking`, then pops and returns the first item in `queue` that is
guaranteed to exist. guaranteed to exist.
It is paramount that in every case, if :py:func:`select.select` indicates a
byte was written to the socket, that the byte is read away. The socket is
reused by subsequent latches sleeping on the same thread, and unexpected
wakeups are triggered if extraneous data remains buffered on the socket.
.. rubric:: Footnotes .. rubric:: Footnotes
......
...@@ -918,8 +918,9 @@ class Latch(object): ...@@ -918,8 +918,9 @@ class Latch(object):
self._lock.acquire() self._lock.acquire()
try: try:
self.closed = True self.closed = True
for wsock in self._sleeping: while self._waking < len(self._sleeping):
self._wake(wsock) self._wake(self._sleeping[self._waking])
self._waking += 1
finally: finally:
self._lock.release() self._lock.release()
...@@ -957,13 +958,13 @@ class Latch(object): ...@@ -957,13 +958,13 @@ class Latch(object):
self._lock.acquire() self._lock.acquire()
try: try:
self._sleeping.remove(_tls.wsock) self._sleeping.remove(_tls.wsock)
if self.closed:
raise LatchError()
if not rfds: if not rfds:
raise TimeoutError() raise TimeoutError()
self._waking -= 1
if _tls.rsock.recv(2) != '\x7f': if _tls.rsock.recv(2) != '\x7f':
raise LatchError('internal error: received >1 wakeups') raise LatchError('internal error: received >1 wakeups')
self._waking -= 1
if self.closed:
raise LatchError()
try: try:
_vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0]) _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0])
return self._queue.pop(0) return self._queue.pop(0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment