Commit 512ff77a authored by David Wilson's avatar David Wilson

issue #156: prevent non-sleeping threads from starving sleeping threads.

See new docs
parent 9e514240
......@@ -856,6 +856,75 @@ means that Mitogen requires twice as many file descriptors as there are user
threads, with a minimum of 4 required in any configuration.
Latch Internals
~~~~~~~~~~~~~~~
Attributes:
* `lock` :py:class:`threading.Lock`.
* `queue` – enqueued items.
* `wake_socks` – the write sides of the socketpairs for each currently
sleeping thread. While the lock is held, a non-empty `wake_socks` indicates
not only the presence of sleeping threads, but threads that have recently
woken but have not yet to retrieved their item from `queue`.
* `closed` a simple boolean defaulting to :py:data:`False`. Every time `lock`
is acquired, `closed` must be tested, and if it is :py:data:`True`,
:py:class:`mitogen.core.LatchError` must be thrown.
Latch.put()
~~~~~~~~~~~
:py:meth:`mitogen.core.Latch.put` operates simply by acquiring `lock`,
appending the item on to `queue`, then if `wake_socks` is non-empty, a byte is
written to the first socket in the list before finally releasing `lock`.
Latch.close()
~~~~~~~~~~~
:py:meth:`mitogen.core.Latch.putclose` acquires `lock`, sets `closed` to
:py:data:`True`, then writes a byte to every socket in `wake_socks`. As above,
on waking from sleep, after removing itself from `wake_socks`, each sleeping
thread tests if `closed` is :py:data:`True`, and if so throws
:py:class:`mitogen.core.LatchError`.
Latch.get()
~~~~~~~~~~~
:py:meth:`mitogen.core.Latch.get` is far more fiddly, as there are a variety of
outcomes to handle. Queue ordering is strictly first-in first-out, and the
first thread to attempt to retrieve an item always receives the first available
item.
**1. Non-empty, No Waiters, No sleep**
On entry `lock` is taken, and if `queue` is non-empty, and `wake_socks` is
empty, it is safe to return `queue`'s first item without blocking.
**2. Non-empty, Waiters Present, Sleep**
In this case `wake_socks` is non-empty, and it is not safe to pop the item
even though we are holding `lock`, as it would bump the calling thread to
the front of the line, starving any sleeping thread of their item, since a
race exists between a thread waking from :py:func:`select.select` and its
re-acquiring of `lock`.
This avoids the need for a retry loop for waking threads, and a sleeping
thread being continually re-woken only to discover `queue` drained by a
thread that never slept.
**3. Sleep**
Since `queue` was empty, or `wake_socks` was non-empty, the thread adds its
socket to `wake_socks` before releasing `lock`, and sleeping in
:py:func:`select.select` waiting for a write from
:py:meth:`mitogen.core.Latch.put`.
**4. Wake, Non-empty**
On wake it re-acquires `lock`, removes itself from `wake_socks`, throws
:py:class:`mitogen.core.TimeoutError` if no byte was written, otherwise
pops and returns the first item in `queue` that is guaranteed to exist.
.. rubric:: Footnotes
.. [#f1] Compression may seem redundant, however it is basically free and reducing IO
......
......@@ -917,8 +917,8 @@ class Latch(object):
self.lock.acquire()
try:
self.closed = True
while self.wake_socks:
self._wake(self.wake_socks.pop())
for wsock in self.wake_socks:
self._wake(wsock)
finally:
self.lock.release()
......@@ -932,16 +932,14 @@ class Latch(object):
set_cloexec(_tls.wsock.fileno())
def get(self, timeout=None, block=True):
_vv and IOLOG.debug(
'%r.get(timeout=%r, block=%r)',
self, timeout, block
)
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)',
self, timeout, block)
self.lock.acquire()
try:
if self.closed:
raise LatchError()
if self.queue:
if self.queue and not self.wake_socks:
_vv and IOLOG.debug('%r.get() -> %r', self, self.queue[0])
return self.queue.pop(0)
if not block:
......@@ -957,14 +955,11 @@ class Latch(object):
self.lock.acquire()
try:
self.wake_socks.remove(_tls.wsock)
if self.closed:
raise LatchError()
if _tls.wsock in self.wake_socks:
# Nothing woke us, remove stale entry.
self.wake_socks.remove(_tls.wsock)
if not rfds:
raise TimeoutError()
assert _tls.rsock in rfds
assert _tls.rsock.recv(1) == '\x7f'
try:
_vv and IOLOG.debug('%r.get() wake -> %r', self, self.queue[0])
......@@ -982,14 +977,12 @@ class Latch(object):
if self.closed:
raise LatchError()
self.queue.append(obj)
woken = len(self.wake_socks) > 0
if woken:
if self.wake_socks:
_vv and IOLOG.debug('%r.put() -> waking wfd=%r',
self, self.wake_socks[0].fileno())
self._wake(self.wake_socks.pop(0))
self._wake(self.wake_socks[0])
finally:
self.lock.release()
_v and LOG.debug('put() done. woken? %s', woken)
def _wake(self, sock):
try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment