Commit ee0f21d5 authored by David Wilson's avatar David Wilson

core: remove Queue locking from broker loop.

Move defer handling out of Broker and into Waker (where it belongs?).
Now the lock must only be taken if Waker was actually woken.

Knocks 400-item run_hostname_100_times from 10.62s to 10.05s (-5.3%).
parent 17aef51e
......@@ -26,7 +26,6 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import Queue
import cPickle
import cStringIO
import collections
......@@ -41,6 +40,7 @@ import signal
import socket
import struct
import sys
import thread
import threading
import time
import traceback
......@@ -1039,14 +1039,19 @@ class Latch(object):
class Waker(BasicStream):
"""
:py:class:`BasicStream` subclass implementing the
`UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when
some of its state has been changed by another thread.
:py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_.
Used to wake the multiplexer when another thread needs to modify its state
(via a cross-thread function call).
.. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html
"""
broker_ident = None
def __init__(self, broker):
self._broker = broker
self._lock = threading.Lock()
self._deferred = []
rfd, wfd = os.pipe()
self.receive_side = Side(self, rfd)
self.transmit_side = Side(self, wfd)
......@@ -1058,24 +1063,63 @@ class Waker(BasicStream):
self.transmit_side.fd,
)
def on_receive(self, broker):
@property
def keep_alive(self):
"""
Read a byte from the self-pipe.
Prevent immediate Broker shutdown while deferred functions remain.
"""
self.receive_side.read(256)
self._lock.acquire()
try:
return len(self._deferred)
finally:
self._lock.release()
def wake(self):
def on_receive(self, broker):
"""
Write a byte to the self-pipe, causing the IO multiplexer to wake up.
Nothing is written if the current thread is the IO multiplexer thread.
Drain the pipe and fire callbacks. Reading multiple bytes is safe since
new bytes corresponding to future .defer() calls are written only after
.defer() takes _lock: either a byte we read corresponds to something
already on the queue by the time we take _lock, or a byte remains
buffered, causing another wake up, because it was written after we
released _lock.
"""
_vv and IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
if threading.currentThread() != self._broker._thread:
_vv and IOLOG.debug('%r.on_receive()', self)
self.receive_side.read(128)
self._lock.acquire()
try:
deferred = self._deferred
self._deferred = []
finally:
self._lock.release()
for func, args, kwargs in deferred:
try:
self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
func(*args, **kwargs)
except Exception:
LOG.exception('defer() crashed: %r(*%r, **%r)',
func, args, kwargs)
self._broker.shutdown()
def defer(self, func, *args, **kwargs):
if thread.get_ident() == self.broker_ident:
_vv and IOLOG.debug('%r.defer() [immediate]', self)
return func(*args, **kwargs)
_vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd)
self._lock.acquire()
try:
self._deferred.append((func, args, kwargs))
finally:
self._lock.release()
# Wake the multiplexer by writing a byte. If the broker is in the midst
# of tearing itself down, the waker fd may already have been closed, so
# ignore EBADF here.
try:
self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
class IoLogger(BasicStream):
......@@ -1242,24 +1286,17 @@ class Broker(object):
def __init__(self):
self._alive = True
self._queue = Queue.Queue()
self._readers = []
self._writers = []
self._waker = Waker(self)
self.start_receive(self._waker)
self.defer = self._waker.defer
self._readers = [self._waker.receive_side]
self._writers = []
self._thread = threading.Thread(
target=_profile_hook,
args=('broker', self._broker_main),
name='mitogen-broker'
)
self._thread.start()
def defer(self, func, *args, **kwargs):
if threading.currentThread() == self._thread:
func(*args, **kwargs)
else:
self._queue.put((func, args, kwargs))
self._waker.wake()
self._waker.broker_ident = self._thread.ident
def _list_discard(self, lst, value):
try:
......@@ -1296,19 +1333,8 @@ class Broker(object):
LOG.exception('%r crashed', stream)
stream.on_disconnect(self)
def _run_defer(self):
while not self._queue.empty():
func, args, kwargs = self._queue.get()
try:
func(*args, **kwargs)
except Exception:
LOG.exception('defer() crashed: %r(*%r, **%r)',
func, args, kwargs)
self.shutdown()
def _loop_once(self, timeout=None):
_vv and IOLOG.debug('%r._loop_once(%r)', self, timeout)
self._run_defer()
#IOLOG.debug('readers = %r', self._readers)
#IOLOG.debug('writers = %r', self._writers)
......@@ -1327,15 +1353,13 @@ class Broker(object):
self._call(side.stream, side.stream.on_transmit)
def keep_alive(self):
return (sum((side.keep_alive for side in self._readers), 0) +
(not self._queue.empty()))
return sum((side.keep_alive for side in self._readers), 0)
def _broker_main(self):
try:
while self._alive:
self._loop_once()
self._run_defer()
fire(self, 'shutdown')
for side in set(self._readers).union(self._writers):
......@@ -1361,8 +1385,9 @@ class Broker(object):
def shutdown(self):
_v and LOG.debug('%r.shutdown()', self)
self._alive = False
self._waker.wake()
def _shutdown():
self._alive = False
self.defer(_shutdown)
def join(self):
self._thread.join()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment