Commit 80343fd8 authored by Tim Peters's avatar Tim Peters

Massive refactoring, to move the bulk of the trigger code into

an OS-indepedent base class.

__repr__:  Use the postive_id function to embed the machine address.
Addresses with the high bit set trigger warnings before Python 2.4,
and come out as negative numbers in 2.4+.

Windows trigger.__init__:  Don't make 50 guesses at a port number
to use, let Windows pick an available port for us.  Also documented
the baffling single-thread socket setup dance, which took an hour to
reverse-engineer (in large part because it used a bare "except" w/ no
clue as to why).
parent 8824506c
...@@ -17,64 +17,57 @@ import os ...@@ -17,64 +17,57 @@ import os
import socket import socket
import thread import thread
if os.name == 'posix': from ZODB.utils import positive_id
class trigger(asyncore.file_dispatcher): # Original comments follow; they're hard to follow in the context of
# ZEO's use of triggers. TODO: rewrite from a ZEO perspective.
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__(self): # Wake up a call to select() running in the main thread.
r, w = self._fds = os.pipe() #
self.trigger = w # This is useful in a context where you are using Medusa's I/O
asyncore.file_dispatcher.__init__(self, r) # subsystem to deliver data, but the data is generated by another
self.lock = thread.allocate_lock() # thread. Normally, if Medusa is in the middle of a call to
self.thunks = [] # select(), new output data generated by another thread will have
self._closed = 0 # to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
#
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
#
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
# Override the asyncore close() method, because it seems that class _triggerbase:
# it would only close the r file descriptor and not w. The """OS-independent base class for OS-dependent trigger class."""
# constructor calls file_dispatcher.__init__ and passes r,
# which would get stored in a file_wrapper and get closed by
# the default close. But that would leave w open...
def close(self): kind = None # subclass must set to "pipe" or "loopback"; used by repr
if not self._closed:
self._closed = 1
self.del_channel()
for fd in self._fds:
os.close(fd)
self._fds = []
def __repr__(self): def __init__(self):
return '<select-trigger (pipe) at %x>' % id(self) self._closed = False
# `lock` protects the `thunks` list from being traversed and
# appended to simultaneously.
self.lock = thread.allocate_lock()
# List of no-argument callbacks to invoke when the trigger is
# pulled. These run in the thread running the asyncore mainloop,
# regardless of which thread pulls the trigger.
self.thunks = []
def readable(self): def readable(self):
return 1 return 1
...@@ -88,6 +81,19 @@ if os.name == 'posix': ...@@ -88,6 +81,19 @@ if os.name == 'posix':
def handle_close(self): def handle_close(self):
self.close() self.close()
# Override the asyncore close() method, because it doesn't know about
# (so can't close) all the gimmicks we have open. Subclass must
# supply a _close() method to do platform-specific closing work. _close()
# will be called iff we're not already closed.
def close(self):
if not self._closed:
self._closed = True
self.del_channel()
self._close() # subclass does OS-specific stuff
def _close(self): # see close() above; subclass must supply
raise NotImplementedError
def pull_trigger(self, thunk=None): def pull_trigger(self, thunk=None):
if thunk: if thunk:
self.lock.acquire() self.lock.acquire()
...@@ -95,7 +101,12 @@ if os.name == 'posix': ...@@ -95,7 +101,12 @@ if os.name == 'posix':
self.thunks.append(thunk) self.thunks.append(thunk)
finally: finally:
self.lock.release() self.lock.release()
os.write(self.trigger, 'x') self._physical_pull()
# Subclass must supply _physical_pull, which does whatever the OS
# needs to do to provoke the "write" end of the trigger.
def _physical_pull(self):
raise NotImplementedError
def handle_read(self): def handle_read(self):
try: try:
...@@ -115,102 +126,72 @@ if os.name == 'posix': ...@@ -115,102 +126,72 @@ if os.name == 'posix':
finally: finally:
self.lock.release() self.lock.release()
else: def __repr__(self):
return '<select-trigger (%s) at %x>' % (self.kind, positive_id(self))
# TODO: Should define a base class that has the common methods and if os.name == 'posix':
# then put the platform-specific in a subclass named trigger.
# win32-safe version class trigger(_triggerbase, asyncore.file_dispatcher):
kind = "pipe"
HOST = '127.0.0.1' def __init__(self):
MINPORT = 19950 _triggerbase.__init__(self)
NPORTS = 50 r, self.trigger = self._fds = os.pipe()
asyncore.file_dispatcher.__init__(self, r)
class trigger(asyncore.dispatcher): def _close(self):
for fd in self._fds:
os.close(fd)
self._fds = []
portoffset = 0 def _physical_pull(self):
os.write(self.trigger, 'x')
def __init__(self): else:
a = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Windows version; uses just sockets, because a pipe isn't select'able
w = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # on Windows.
class trigger(_triggerbase, asyncore.dispatcher):
kind = "loopback"
def __init__(self):
_triggerbase.__init__(self)
# Get a pair of connected sockets. The trigger is the 'w'
# end of the pair, which is connected to 'r'. 'r' is put
# in the asyncore socket map. "pulling the trigger" then
# means writing something on w, which will wake up r.
a = socket.socket() # temporary, to set up the connection
w = socket.socket()
self.trigger = w
# set TCP_NODELAY to true to avoid buffering # set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1) w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets # Specifying port 0 tells Windows to pick a port for us.
for i in range(NPORTS): a.bind(("127.0.0.1", 0))
trigger.portoffset = (trigger.portoffset + 1) % NPORTS connect_address = a.getsockname() # actual (host, port) pair
port = MINPORT + trigger.portoffset
address = (HOST, port)
try:
a.bind(address)
except socket.error:
continue
else:
break
else:
raise RuntimeError, 'Cannot bind trigger!'
a.listen(1) a.listen(1)
# Before connecting, set w non-blocking, because the connect can't
# succeed before we call a.accept() -- while a.accept() can't
# succeed before we try to connect. Maybe it would be clearer
# to spin off a thread to do this, but that's much more expensive
# than this hack.
w.setblocking(0) w.setblocking(0)
try: try:
w.connect(address) w.connect(connect_address)
except: except socket.error:
# Expected exception, since a.accept() hasn't been called
# yet.
pass pass
r, addr = a.accept()
a.close()
w.setblocking(1) w.setblocking(1)
self.trigger = w r, addr = a.accept() # r becomes asyncore's socket
a.close()
asyncore.dispatcher.__init__(self, r) asyncore.dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
self._closed = 0
def close(self): def _close(self):
if not self._closed: # self.socket is r, self.trigger is w from __init__
self._closed = 1
self.del_channel()
# self.socket is a, self.trigger is w from __init__
self.socket.close() self.socket.close()
self.trigger.close() self.trigger.close()
def __repr__(self): def _physical_pull(self):
return '<select-trigger (loopback) at %x>' % id(self)
def readable(self):
return 1
def writable(self):
return 0
def handle_connect(self):
pass
def pull_trigger(self, thunk=None):
if thunk:
self.lock.acquire()
try:
self.thunks.append(thunk)
finally:
self.lock.release()
self.trigger.send('x') self.trigger.send('x')
def handle_read(self):
try:
self.recv(8192)
except socket.error:
return
self.lock.acquire()
try:
for thunk in self.thunks:
try:
thunk()
except:
nil, t, v, tbinfo = asyncore.compact_traceback()
print ('exception in trigger thunk:'
' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment