Commit 63f03ca5 authored by Jeremy Hylton's avatar Jeremy Hylton

There were two copies trigger!

The one in ZEO/trigger.py was the good code, but we want it to live in
ZEO/zrpc/trigger.py.  This checkin makes that change.

The key checkin of ZEO/trigger.py had this checkin comment:
Fix trigger close/__del__.

The close() mechanism for an asyncore file_dispatcher is not safe to
call multiple times.  It's calling os.close() on a file descriptor
(int).  Guido observed that if you call close() twice, you could be in
trouble:  1) First close() call closes FD 6.  2) Another bit of code
opens a new file, getting FD 6.  3) Second close() call closes FD 6.
Waah!  FD 6 is some other file.

The workaround attempt here is to define a close() method on a trigger
that only closes the file descriptors the first time.

Also, make sure that both file descriptors are closed.  The previous
version only closed the read-end of the pipe.
parent e38b0c9b
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import os
import socket
import string
import thread
if os.name == 'posix':
class trigger (asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__ (self):
r, w = self._fds = os.pipe()
self.trigger = w
asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._closed = None
# Override the asyncore close() method, because it seems that
# it would only close the r file descriptor and not w. The
# constructor calls file_dispactcher.__init__ and passes r,
# which would get stored in a file_wrapper and get closed by
# the default close. But that would leave w open...
def close(self):
if self._closed is None:
self._closed = 1
self.del_channel()
for fd in self._fds:
os.close(fd)
def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self)
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
os.write (self.trigger, 'x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
nil, t, v, tbinfo = asyncore.compact_traceback()
print ('exception in trigger thunk:'
' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()
else:
# XXX Should define a base class that has the common methods and
# then put the platform-specific in a subclass named trigger.
# win32-safe version
class trigger (asyncore.dispatcher):
address = ('127.9.9.9', 19999)
def __init__ (self):
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets
host = '127.0.0.1'
port = 19999
while 1:
try:
self.address = host, port
a.bind(self.address)
break
except:
if port <= 19950:
raise 'Bind Error', 'Cannot bind trigger!'
port -= 1
a.listen(1)
w.setblocking(0)
try:
w.connect(self.address)
except:
pass
r, addr = a.accept()
a.close()
w.setblocking(1)
self.trigger = w
asyncore.dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
def __repr__(self):
return '<select-trigger (loopback) at %x>' % id(self)
def readable(self):
return 1
def writable(self):
return 0
def handle_connect(self):
pass
def pull_trigger(self, thunk=None):
if thunk:
try:
self.lock.acquire()
self.thunks.append(thunk)
finally:
self.lock.release()
self.trigger.send('x')
def handle_read(self):
self.recv(8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
nil, t, v, tbinfo = asyncore.compact_traceback()
print ('exception in trigger thunk:'
' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()
...@@ -11,13 +11,11 @@ ...@@ -11,13 +11,11 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
# This module is a simplified version of the select_trigger module
# from Sam Rushing's Medusa server.
import asyncore import asyncore
import os import os
import socket import socket
import string
import thread import thread
if os.name == 'posix': if os.name == 'posix':
...@@ -55,16 +53,25 @@ if os.name == 'posix': ...@@ -55,16 +53,25 @@ if os.name == 'posix':
# the main thread is trying to remove some] # the main thread is trying to remove some]
def __init__ (self): def __init__ (self):
r, w = os.pipe() r, w = self._fds = os.pipe()
self.trigger = w self.trigger = w
asyncore.file_dispatcher.__init__ (self, r) asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock() self.lock = thread.allocate_lock()
self.thunks = [] self.thunks = []
self._closed = None
# Override the asyncore close() method, because it seems that
# it would only close the r file descriptor and not w. The
# constructor calls file_dispactcher.__init__ and passes r,
# which would get stored in a file_wrapper and get closed by
# the default close. But that would leave w open...
def close(self): def close(self):
if self._closed is None:
self._closed = 1
self.del_channel() self.del_channel()
self.socket.close() # the read side of the pipe for fd in self._fds:
os.close(self.trigger) # the write side of the pipe os.close(fd)
def __repr__ (self): def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self) return '<select-trigger (pipe) at %x>' % id(self)
...@@ -79,7 +86,6 @@ if os.name == 'posix': ...@@ -79,7 +86,6 @@ if os.name == 'posix':
pass pass
def pull_trigger (self, thunk=None): def pull_trigger (self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks)
if thunk: if thunk:
try: try:
self.lock.acquire() self.lock.acquire()
...@@ -96,14 +102,18 @@ if os.name == 'posix': ...@@ -96,14 +102,18 @@ if os.name == 'posix':
try: try:
thunk() thunk()
except: except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback() nil, t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo) print ('exception in trigger thunk:'
' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = [] self.thunks = []
finally: finally:
self.lock.release() self.lock.release()
else: else:
# XXX Should define a base class that has the common methods and
# then put the platform-specific in a subclass named trigger.
# win32-safe version # win32-safe version
class trigger (asyncore.dispatcher): class trigger (asyncore.dispatcher):
...@@ -118,65 +128,66 @@ else: ...@@ -118,65 +128,66 @@ else:
w.setsockopt(socket.IPPROTO_TCP, 1, 1) w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets # tricky: get a pair of connected sockets
host='127.0.0.1' host = '127.0.0.1'
port=19999 port = 19999
while 1: while 1:
try: try:
self.address=(host, port) self.address = host, port
a.bind(self.address) a.bind(self.address)
break break
except: except:
if port <= 19950: if port <= 19950:
raise 'Bind Error', 'Cannot bind trigger!' raise 'Bind Error', 'Cannot bind trigger!'
port=port - 1 port -= 1
a.listen (1) a.listen(1)
w.setblocking (0) w.setblocking(0)
try: try:
w.connect (self.address) w.connect(self.address)
except: except:
pass pass
r, addr = a.accept() r, addr = a.accept()
a.close() a.close()
w.setblocking (1) w.setblocking(1)
self.trigger = w self.trigger = w
asyncore.dispatcher.__init__ (self, r) asyncore.dispatcher.__init__(self, r)
self.lock = thread.allocate_lock() self.lock = thread.allocate_lock()
self.thunks = [] self.thunks = []
self._trigger_connected = 0 self._trigger_connected = 0
def __repr__ (self): def __repr__(self):
return '<select-trigger (loopback) at %x>' % id(self) return '<select-trigger (loopback) at %x>' % id(self)
def readable (self): def readable(self):
return 1 return 1
def writable (self): def writable(self):
return 0 return 0
def handle_connect (self): def handle_connect(self):
pass pass
def pull_trigger (self, thunk=None): def pull_trigger(self, thunk=None):
if thunk: if thunk:
try: try:
self.lock.acquire() self.lock.acquire()
self.thunks.append (thunk) self.thunks.append(thunk)
finally: finally:
self.lock.release() self.lock.release()
self.trigger.send ('x') self.trigger.send('x')
def handle_read (self): def handle_read(self):
self.recv (8192) self.recv(8192)
try: try:
self.lock.acquire() self.lock.acquire()
for thunk in self.thunks: for thunk in self.thunks:
try: try:
thunk() thunk()
except: except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback() nil, t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo) print ('exception in trigger thunk:'
' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = [] self.thunks = []
finally: finally:
self.lock.release() self.lock.release()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment