Commit fd83f922 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge in zeo-1_0-branch

Fixed a bug in building undo invalidation info that caused client
storages to raise an exception that caused connections to be reset
and that, for some of reason, caused undo's to not propigate to
the clients.

Avoid long delays at the end of a pack.

    When a client calls pack, a separate thread is started to call the
    pack call on the storage.  When the thread finishes, it calls
    message_output() to send a response to the client.  If asyncore is
    currently in a poll call when this happens, the output won't be
    detected until the next poll call.  If there is little I/O on the
    connection, this won't happen until the select call times out
    after 30 seconds.

    The trigger is the standard gimmick for one thread to notify a
    mainloop in another thread that the first thread has some I/O to
    do.  It exits the current select.  The next poll call detects that
    the triggering thread is ready to do I/O and handles it.

Use asyncwrap to call asyncore.loop() and asyncore.poll()
parent 04d9377e
...@@ -83,7 +83,7 @@ ...@@ -83,7 +83,7 @@
# #
############################################################################## ##############################################################################
__version__ = "$Revision: 1.28 $"[11:-2] __version__ = "$Revision: 1.29 $"[11:-2]
import asyncore, socket, string, sys, os import asyncore, socket, string, sys, os
from smac import SizedMessageAsyncConnection from smac import SizedMessageAsyncConnection
...@@ -93,10 +93,12 @@ from cPickle import Unpickler ...@@ -93,10 +93,12 @@ from cPickle import Unpickler
from ZODB.POSException import TransactionError, UndoError, VersionCommitError from ZODB.POSException import TransactionError, UndoError, VersionCommitError
from ZODB.Transaction import Transaction from ZODB.Transaction import Transaction
import traceback import traceback
from zLOG import LOG, INFO, ERROR, TRACE from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from thread import start_new_thread from thread import start_new_thread
from cStringIO import StringIO from cStringIO import StringIO
from ZEO import trigger
from ZEO import asyncwrap
class StorageServerError(POSException.StorageError): pass class StorageServerError(POSException.StorageError): pass
...@@ -124,7 +126,6 @@ class StorageServer(asyncore.dispatcher): ...@@ -124,7 +126,6 @@ class StorageServer(asyncore.dispatcher):
self.__connections={} self.__connections={}
self.__get_connections=self.__connections.get self.__get_connections=self.__connections.get
asyncore.dispatcher.__init__(self) asyncore.dispatcher.__init__(self)
if type(connection) is type(''): if type(connection) is type(''):
...@@ -184,8 +185,8 @@ class StorageServer(asyncore.dispatcher): ...@@ -184,8 +185,8 @@ class StorageServer(asyncore.dispatcher):
sock, addr = self.accept() sock, addr = self.accept()
except socket.error: except socket.error:
sys.stderr.write('warning: accept failed\n') sys.stderr.write('warning: accept failed\n')
else:
ZEOConnection(self, sock, addr) ZEOConnection(self, sock, addr)
def log_info(self, message, type='info'): def log_info(self, message, type='info'):
if type=='error': type=ERROR if type=='error': type=ERROR
...@@ -234,6 +235,7 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -234,6 +235,7 @@ class ZEOConnection(SizedMessageAsyncConnection):
self.__server=server self.__server=server
self.__invalidated=[] self.__invalidated=[]
self.__closed=None self.__closed=None
self._pack_trigger = trigger.trigger()
if __debug__: debug='ZEO Server' if __debug__: debug='ZEO Server'
else: debug=0 else: debug=0
SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug) SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug)
...@@ -384,16 +386,21 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -384,16 +386,21 @@ class ZEOConnection(SizedMessageAsyncConnection):
if wait: return _noreturn if wait: return _noreturn
def _pack(self, t, wait=0): def _pack(self, t, wait=0):
try: try:
LOG('ZEO Server', BLATHER, 'pack begin')
self.__storage.pack(t, referencesf) self.__storage.pack(t, referencesf)
LOG('ZEO Server', BLATHER, 'pack end')
except: except:
LOG('ZEO Server', ERROR, LOG('ZEO Server', ERROR,
'Pack failed for %s' % self.__storage_id, 'Pack failed for %s' % self.__storage_id,
error=sys.exc_info()) error=sys.exc_info())
if wait: self.return_error(sys.exc_info()[0], sys.exc_info()[1]) if wait:
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
self._pack_trigger.pull_trigger()
else: else:
if wait: if wait:
self.message_output('RN.') self.message_output('RN.')
self._pack_trigger.pull_trigger()
else: else:
# Broadcast new size statistics # Broadcast new size statistics
self.__server.invalidate(0, self.__storage_id, (), self.__server.invalidate(0, self.__storage_id, (),
...@@ -469,7 +476,7 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -469,7 +476,7 @@ class ZEOConnection(SizedMessageAsyncConnection):
oids=self.__storage.undo(transaction_id) oids=self.__storage.undo(transaction_id)
if oids: if oids:
self.__server.invalidate( self.__server.invalidate(
self, self.__storage_id, map(lambda oid: (oid,None,''), oids) self, self.__storage_id, map(lambda oid: (oid,None), oids)
) )
return oids return oids
return () return ()
...@@ -571,7 +578,10 @@ if __name__=='__main__': ...@@ -571,7 +578,10 @@ if __name__=='__main__':
import ZODB.FileStorage import ZODB.FileStorage
name, port = sys.argv[1:3] name, port = sys.argv[1:3]
blather(name, port) blather(name, port)
try: port='',string.atoi(port) try:
except: pass port='', int(port)
except:
pass
StorageServer(port, ZODB.FileStorage.FileStorage(name)) StorageServer(port, ZODB.FileStorage.FileStorage(name))
asyncore.loop() asyncwrap.loop()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment