Commit 941ec59a authored by Jeremy Hylton's avatar Jeremy Hylton

Merge changes from zeo-1_0-branch to the trunk.

The trunk now has the same code ZEO 1.0b5 plus a few minor changes.
parent 95a9fded
...@@ -144,7 +144,7 @@ file 0 and file 1. ...@@ -144,7 +144,7 @@ file 0 and file 1.
""" """
__version__ = "$Revision: 1.17 $"[11:-2] __version__ = "$Revision: 1.18 $"[11:-2]
import os, tempfile import os, tempfile
from struct import pack, unpack from struct import pack, unpack
...@@ -217,7 +217,7 @@ class ClientCache: ...@@ -217,7 +217,7 @@ class ClientCache:
def close(self): def close(self):
try: try:
self._f[self._current].close() self._f[self._current].close()
except OSError: except (os.error, ValueError):
pass pass
def open(self): def open(self):
...@@ -373,6 +373,8 @@ class ClientCache: ...@@ -373,6 +373,8 @@ class ClientCache:
self._f[current]=open(self._p[current],'w+b') self._f[current]=open(self._p[current],'w+b')
else: else:
# Temporary cache file: # Temporary cache file:
if self._f[current] is not None:
self._f[current].close()
self._f[current] = tempfile.TemporaryFile(suffix='.zec') self._f[current] = tempfile.TemporaryFile(suffix='.zec')
self._f[current].write(magic) self._f[current].write(magic)
self._pos=pos=4 self._pos=pos=4
......
...@@ -84,7 +84,8 @@ ...@@ -84,7 +84,8 @@
############################################################################## ##############################################################################
"""Network ZODB storage client """Network ZODB storage client
""" """
__version__='$Revision: 1.34 $'[11:-2]
__version__='$Revision: 1.35 $'[11:-2]
import struct, time, os, socket, string, Sync, zrpc, ClientCache import struct, time, os, socket, string, Sync, zrpc, ClientCache
import tempfile, Invalidator, ExtensionClass, thread import tempfile, Invalidator, ExtensionClass, thread
...@@ -168,17 +169,16 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -168,17 +169,16 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
# Among other things, we know that our data methods won't get # Among other things, we know that our data methods won't get
# called until after this call. # called until after this call.
invalidator=Invalidator.Invalidator( self.invalidator = Invalidator.Invalidator(db.invalidate,
db.invalidate, self._cache.invalidate)
self._cache.invalidate)
def out_of_band_hook( def out_of_band_hook(
code, args, code, args,
get_hook={ get_hook={
'b': (invalidator.begin, 0), 'b': (self.invalidator.begin, 0),
'i': (invalidator.invalidate, 1), 'i': (self.invalidator.invalidate, 1),
'e': (invalidator.end, 0), 'e': (self.invalidator.end, 0),
'I': (invalidator.Invalidate, 1), 'I': (self.invalidator.Invalidate, 1),
'U': (self._commit_lock_release, 0), 'U': (self._commit_lock_release, 0),
's': (self._serials.append, 1), 's': (self._serials.append, 1),
'S': (self._info.update, 1), 'S': (self._info.update, 1),
...@@ -307,8 +307,18 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -307,8 +307,18 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
try: try:
LOG("ClientStorage", INFO, "close") LOG("ClientStorage", INFO, "close")
self._call.closeIntensionally() self._call.closeIntensionally()
try:
self._tfile.close()
except os.error:
# On Windows, this can fail if it is called more than
# once, because it tries to delete the file each
# time.
pass
self._cache.close() self._cache.close()
self.closed = 1 if self.invalidator is not None:
self.invalidator.close()
self.invalidator = None
self.closed = 1
finally: self._lock_release() finally: self._lock_release()
def commitVersion(self, src, dest, transaction): def commitVersion(self, src, dest, transaction):
...@@ -317,7 +327,6 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -317,7 +327,6 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
self._lock_acquire() self._lock_acquire()
try: try:
oids=self._call('commitVersion', src, dest, self._serial) oids=self._call('commitVersion', src, dest, self._serial)
invalidate=self._cache.invalidate
if dest: if dest:
vlen = pack(">H", len(src)) vlen = pack(">H", len(src))
# just invalidate our version data # just invalidate our version data
...@@ -436,12 +445,17 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -436,12 +445,17 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
finally: self._lock_release() finally: self._lock_release()
def supportsUndo(self):
return self._info['supportsUndo']
def supportsUndo(self): return self._info['supportsUndo']
def supportsVersions(self): return self._info['supportsVersions'] def supportsVersions(self):
return self._info['supportsVersions']
def supportsTransactionalUndo(self): def supportsTransactionalUndo(self):
return self._info['supportsTransactionalUndo'] try:
return self._info['supportsTransactionalUndo']
except KeyError:
return 0
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
self._lock_acquire() self._lock_acquire()
...@@ -522,7 +536,6 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -522,7 +536,6 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
seek=tfile.seek seek=tfile.seek
read=tfile.read read=tfile.read
cache=self._cache cache=self._cache
update=cache.update
size=tfile.tell() size=tfile.tell()
cache.checkSize(size) cache.checkSize(size)
seek(0) seek(0)
...@@ -543,9 +556,9 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -543,9 +556,9 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
"temporary file." "temporary file."
) )
if s==ResolvedSerial: if s==ResolvedSerial:
cache.invalidate(oid, v) self._cache.invalidate(oid, v)
else: else:
update(oid, s, v, p) self._cache.update(oid, s, v, p)
i=i+15+vlen+dlen i=i+15+vlen+dlen
elif opcode == "i": elif opcode == "i":
oid=read(8) oid=read(8)
...@@ -578,7 +591,8 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -578,7 +591,8 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
try: try:
oids=self._call('undo', transaction_id) oids=self._call('undo', transaction_id)
cinvalidate=self._cache.invalidate cinvalidate=self._cache.invalidate
for oid in oids: cinvalidate(oid,'') for oid in oids:
cinvalidate(oid,'')
return oids return oids
finally: self._lock_release() finally: self._lock_release()
......
...@@ -98,6 +98,10 @@ class Invalidator: ...@@ -98,6 +98,10 @@ class Invalidator:
self.dinvalidate=dinvalidate self.dinvalidate=dinvalidate
self.cinvalidate=cinvalidate self.cinvalidate=cinvalidate
def close(self):
self.dinvalidate = None
self.cinvalidate = None
def begin(self): def begin(self):
self._tfile=tempfile.TemporaryFile() self._tfile=tempfile.TemporaryFile()
pickler=cPickle.Pickler(self._tfile, 1) pickler=cPickle.Pickler(self._tfile, 1)
......
...@@ -83,7 +83,7 @@ ...@@ -83,7 +83,7 @@
# #
############################################################################## ##############################################################################
__version__ = "$Revision: 1.29 $"[11:-2] __version__ = "$Revision: 1.30 $"[11:-2]
import asyncore, socket, string, sys, os import asyncore, socket, string, sys, os
from smac import SizedMessageAsyncConnection from smac import SizedMessageAsyncConnection
...@@ -104,8 +104,17 @@ class StorageServerError(POSException.StorageError): pass ...@@ -104,8 +104,17 @@ class StorageServerError(POSException.StorageError): pass
max_blather=120 max_blather=120
def blather(*args): def blather(*args):
m=string.join(map(str,args)) accum = []
if len(m) > max_blather: m=m[:max_blather]+' ...' total_len = 0
for arg in args:
if not isinstance(arg, StringType):
arg = str(arg)
accum.append(arg)
total_len = total_len + len(arg)
if total_len >= max_blather:
break
m = string.join(accum)
if len(m) > max_blather: m = m[:max_blather] + ' ...'
LOG('ZEO Server', TRACE, m) LOG('ZEO Server', TRACE, m)
...@@ -121,11 +130,13 @@ class StorageServer(asyncore.dispatcher): ...@@ -121,11 +130,13 @@ class StorageServer(asyncore.dispatcher):
def __init__(self, connection, storages): def __init__(self, connection, storages):
self.__storages=storages self.__storages=storages
for n, s in storages.items(): init_storage(s) for n, s in storages.items():
init_storage(s)
self.__connections={} self.__connections={}
self.__get_connections=self.__connections.get self.__get_connections=self.__connections.get
self._pack_trigger = trigger.trigger()
asyncore.dispatcher.__init__(self) asyncore.dispatcher.__init__(self)
if type(connection) is type(''): if type(connection) is type(''):
...@@ -258,7 +269,12 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -258,7 +269,12 @@ class ZEOConnection(SizedMessageAsyncConnection):
def message_input(self, message, def message_input(self, message,
dump=dump, Unpickler=Unpickler, StringIO=StringIO, dump=dump, Unpickler=Unpickler, StringIO=StringIO,
None=None): None=None):
if __debug__: blather('message_input', id(self), `message`) if __debug__:
if len(message) > max_blather:
tmp = `message[:max_blather]`
else:
tmp = `message`
blather('message_input', id(self), tmp)
if self.__storage is None: if self.__storage is None:
# This is the first communication from the client # This is the first communication from the client
...@@ -276,7 +292,9 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -276,7 +292,9 @@ class ZEOConnection(SizedMessageAsyncConnection):
args=unpickler.load() args=unpickler.load()
name, args = args[0], args[1:] name, args = args[0], args[1:]
if __debug__: blather('call %s: %s%s' % (id(self), name, `args`)) if __debug__:
apply(blather,
("call", id(self), ":", name,) + args)
if not storage_method(name): if not storage_method(name):
raise 'Invalid Method Name', name raise 'Invalid Method Name', name
...@@ -294,7 +312,8 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -294,7 +312,8 @@ class ZEOConnection(SizedMessageAsyncConnection):
self.return_error(sys.exc_info()[0], sys.exc_info()[1]) self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return return
if __debug__: blather("%s R: %s" % (id(self), `r`)) if __debug__:
blather("%s R: %s" % (id(self), `r`))
r=dump(r,1) r=dump(r,1)
self.message_output('R'+r) self.message_output('R'+r)
...@@ -303,7 +322,8 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -303,7 +322,8 @@ class ZEOConnection(SizedMessageAsyncConnection):
if type(err_value) is not type(self): if type(err_value) is not type(self):
err_value = err_type, err_value err_value = err_type, err_value
if __debug__: blather("%s E: %s" % (id(self), `err_value`)) if __debug__:
blather("%s E: %s" % (id(self), `err_value`))
try: r=dump(err_value, 1) try: r=dump(err_value, 1)
except: except:
...@@ -396,11 +416,12 @@ class ZEOConnection(SizedMessageAsyncConnection): ...@@ -396,11 +416,12 @@ class ZEOConnection(SizedMessageAsyncConnection):
error=sys.exc_info()) error=sys.exc_info())
if wait: if wait:
self.return_error(sys.exc_info()[0], sys.exc_info()[1]) self.return_error(sys.exc_info()[0], sys.exc_info()[1])
self._pack_trigger.pull_trigger() self.__server._pack_trigger.pull_trigger()
else: else:
if wait: if wait:
self.message_output('RN.') self.message_output('RN.')
self._pack_trigger.pull_trigger() self.__server._pack_trigger.pull_trigger()
else: else:
# Broadcast new size statistics # Broadcast new size statistics
self.__server.invalidate(0, self.__storage_id, (), self.__server.invalidate(0, self.__storage_id, (),
...@@ -582,6 +603,8 @@ if __name__=='__main__': ...@@ -582,6 +603,8 @@ if __name__=='__main__':
port='', int(port) port='', int(port)
except: except:
pass pass
StorageServer(port, ZODB.FileStorage.FileStorage(name)) d = {'1': ZODB.FileStorage.FileStorage(name)}
StorageServer(port, d)
asyncwrap.loop() asyncwrap.loop()
...@@ -86,7 +86,7 @@ ...@@ -86,7 +86,7 @@
"""Start the server storage. """Start the server storage.
""" """
__version__ = "$Revision: 1.25 $"[11:-2] __version__ = "$Revision: 1.26 $"[11:-2]
import sys, os, getopt, string import sys, os, getopt, string
...@@ -359,7 +359,7 @@ def shutdown(storages, die=1): ...@@ -359,7 +359,7 @@ def shutdown(storages, die=1):
for storage in storages.values(): for storage in storages.values():
try: storage.close() try: storage.close()
finally: pass except: pass
try: try:
from zLOG import LOG, INFO from zLOG import LOG, INFO
......
...@@ -13,7 +13,17 @@ class TransUndoStorageWithCache: ...@@ -13,7 +13,17 @@ class TransUndoStorageWithCache:
revid = self._dostore(oid, revid=revid, data=MinPO(25)) revid = self._dostore(oid, revid=revid, data=MinPO(25))
info = self._storage.undoInfo() info = self._storage.undoInfo()
if not info:
# XXX perhaps we have an old storage implementation that
# does do the negative nonsense
info = self._storage.undoInfo(0, 20)
tid = info[0]['id'] tid = info[0]['id']
# We may need to bail at this point if the storage doesn't
# support transactional undo
if not self._storage.supportsTransactionalUndo():
return
# Now start an undo transaction # Now start an undo transaction
self._transaction.note('undo1') self._transaction.note('undo1')
self._storage.tpc_begin(self._transaction) self._storage.tpc_begin(self._transaction)
......
...@@ -23,10 +23,13 @@ def get_port(): ...@@ -23,10 +23,13 @@ def get_port():
port = random.randrange(20000, 30000) port = random.randrange(20000, 30000)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: try:
s.connect(('localhost', port)) try:
except socket.error: s.connect(('localhost', port))
# XXX check value of error? except socket.error:
return port # XXX check value of error?
return port
finally:
s.close()
raise RuntimeError, "Can't find port" raise RuntimeError, "Can't find port"
if os.name == "nt": if os.name == "nt":
...@@ -37,14 +40,15 @@ if os.name == "nt": ...@@ -37,14 +40,15 @@ if os.name == "nt":
Returns the ZEO port, the test server port, and the pid. Returns the ZEO port, the test server port, and the pid.
""" """
import ZEO.tests.winserver import ZEO.tests.winserver
port = get_port() if port is None:
port = get_port()
script = ZEO.tests.winserver.__file__ script = ZEO.tests.winserver.__file__
if script.endswith('.pyc'): if script.endswith('.pyc'):
script = script[:-1] script = script[:-1]
args = (sys.executable, script, str(port), storage_name) + args args = (sys.executable, script, str(port), storage_name) + args
d = os.environ.copy() d = os.environ.copy()
d['PYTHONPATH'] = os.pathsep.join(sys.path) d['PYTHONPATH'] = os.pathsep.join(sys.path)
pid = os.spawnve(os.P_NOWAIT, sys.executable, args, d) pid = os.spawnve(os.P_NOWAIT, sys.executable, args, os.environ)
return ('localhost', port), ('localhost', port + 1), pid return ('localhost', port), ('localhost', port + 1), pid
else: else:
...@@ -74,6 +78,7 @@ else: ...@@ -74,6 +78,7 @@ else:
def close(self): def close(self):
os.write(self.pipe, "done") os.write(self.pipe, "done")
os.close(self.pipe)
def start_zeo_server(storage, addr): def start_zeo_server(storage, addr):
rd, wr = os.pipe() rd, wr = os.pipe()
...@@ -97,6 +102,7 @@ else: ...@@ -97,6 +102,7 @@ else:
ZEOServerExit(rd) ZEOServerExit(rd)
serv = ZEO.StorageServer.StorageServer(addr, {'1':storage}) serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
asyncore.loop() asyncore.loop()
os.close(rd)
storage.close() storage.close()
if isinstance(addr, types.StringType): if isinstance(addr, types.StringType):
os.unlink(addr) os.unlink(addr)
......
"""A ZEO client-server stress test to look for leaks.
The stress test should run in an infinite loop and should involve
multiple connections.
"""
from __future__ import nested_scopes
import ZODB
from ZEO.ClientStorage import ClientStorage
from ZODB.MappingStorage import MappingStorage
from ZEO.tests import forker
from ZODB.tests import MinPO
import zLOG
import os
import random
import sys
import types
NUM_TRANSACTIONS_PER_CONN = 10
NUM_CONNECTIONS = 10
NUM_ROOTS = 20
MAX_DEPTH = 20
MIN_OBJSIZE = 128
MAX_OBJSIZE = 2048
def an_object():
"""Return an object suitable for a PersistentMapping key"""
size = random.randrange(MIN_OBJSIZE, MAX_OBJSIZE)
if os.path.exists("/dev/urandom"):
f = open("/dev/urandom")
buf = f.read(size)
f.close()
return buf
else:
f = open(MinPO.__file__)
l = list(f.read(size))
f.close()
random.shuffle(l)
return "".join(l)
def setup(cn):
"""Initialize the database with some objects"""
root = cn.root()
for i in range(NUM_ROOTS):
prev = an_object()
for j in range(random.randrange(1, MAX_DEPTH)):
o = MinPO.MinPO(prev)
prev = o
root[an_object()] = o
get_transaction().commit()
cn.close()
def work(cn):
"""Do some work with a transaction"""
cn.sync()
root = cn.root()
obj = random.choice(root.values())
# walk down to the bottom
while not isinstance(obj.value, types.StringType):
obj = obj.value
obj.value = an_object()
get_transaction().commit()
def main():
# Yuck! Need to cleanup forker so that the API is consistent
# across Unix and Windows, at least if that's possible.
if os.name == "nt":
zaddr, tport, pid = forker.start_zeo_server('MappingStorage', ())
def exitserver():
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(tport)
s.close()
else:
zaddr = '', random.randrange(20000, 30000)
pid, exitobj = forker.start_zeo_server(MappingStorage(), zaddr)
def exitserver():
exitobj.close()
while 1:
pid = start_child(zaddr)
print "started", pid
os.waitpid(pid, 0)
exitserver()
def start_child(zaddr):
pid = os.fork()
if pid != 0:
return pid
storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5)
db = ZODB.DB(storage, pool_size=NUM_CONNECTIONS)
setup(db.open())
conns = []
conn_count = 0
for i in range(NUM_CONNECTIONS):
c = db.open()
c.__count = 0
conns.append(c)
conn_count += 1
while conn_count < 25:
c = random.choice(conns)
if c.__count > NUM_TRANSACTIONS_PER_CONN:
conns.remove(c)
c.close()
conn_count += 1
c = db.open()
c.__count = 0
conns.append(c)
else:
c.__count += 1
work(c)
os._exit(0)
if __name__ == "__main__":
main()
...@@ -98,8 +98,26 @@ class ZEOTestBase(StorageTestBase.StorageTestBase): ...@@ -98,8 +98,26 @@ class ZEOTestBase(StorageTestBase.StorageTestBase):
raise serial raise serial
d[oid] = serial d[oid] = serial
return d return d
# Some of the ZEO tests depend on the version of FileStorage available
# for the tests. If we run these tests using Zope 2.3, FileStorage
# doesn't support TransactionalUndo.
if hasattr(FileStorage, 'supportsTransactionalUndo'):
# XXX Assume that a FileStorage that supports transactional undo
# also supports conflict resolution.
class VersionDependentTests(
TransactionalUndoStorage.TransactionalUndoStorage,
TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage):
pass
else:
class VersionDependentTests:
pass
class GenericTests(ZEOTestBase, class GenericTests(ZEOTestBase,
VersionDependentTests,
Cache.StorageWithCache, Cache.StorageWithCache,
Cache.TransUndoStorageWithCache, Cache.TransUndoStorageWithCache,
BasicStorage.BasicStorage, BasicStorage.BasicStorage,
...@@ -107,10 +125,6 @@ class GenericTests(ZEOTestBase, ...@@ -107,10 +125,6 @@ class GenericTests(ZEOTestBase,
RevisionStorage.RevisionStorage, RevisionStorage.RevisionStorage,
PackableStorage.PackableStorage, PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage, Synchronization.SynchronizedStorage,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage,
TransactionalUndoStorage.TransactionalUndoStorage,
TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
): ):
"""An abstract base class for ZEO tests """An abstract base class for ZEO tests
...@@ -187,7 +201,7 @@ class WindowsGenericTests(GenericTests): ...@@ -187,7 +201,7 @@ class WindowsGenericTests(GenericTests):
zeo_addr, self.test_addr, self.test_pid = \ zeo_addr, self.test_addr, self.test_pid = \
forker.start_zeo_server(name, args) forker.start_zeo_server(name, args)
storage = ZEO.ClientStorage.ClientStorage(zeo_addr, debug=1, storage = ZEO.ClientStorage.ClientStorage(zeo_addr, debug=1,
min_disconnect_poll=0.5) min_disconnect_poll=0.1)
self._storage = PackWaitWrapper(storage) self._storage = PackWaitWrapper(storage)
storage.registerDB(DummyDB(), None) storage.registerDB(DummyDB(), None)
...@@ -195,6 +209,7 @@ class WindowsGenericTests(GenericTests): ...@@ -195,6 +209,7 @@ class WindowsGenericTests(GenericTests):
self._storage.close() self._storage.close()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(self.test_addr) s.connect(self.test_addr)
s.close()
# the connection should cause the storage server to die # the connection should cause the storage server to die
## os.waitpid(self.test_pid, 0) ## os.waitpid(self.test_pid, 0)
time.sleep(0.5) time.sleep(0.5)
......
...@@ -130,12 +130,16 @@ if os.name == 'posix': ...@@ -130,12 +130,16 @@ if os.name == 'posix':
# the main thread is trying to remove some] # the main thread is trying to remove some]
def __init__ (self): def __init__ (self):
r, w = os.pipe() r, w = self._fds = os.pipe()
self.trigger = w self.trigger = w
asyncore.file_dispatcher.__init__ (self, r) asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock() self.lock = thread.allocate_lock()
self.thunks = [] self.thunks = []
def __del__(self):
os.close(self._fds[0])
os.close(self._fds[1])
def __repr__ (self): def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self) return '<select-trigger (pipe) at %x>' % id(self)
......
...@@ -85,7 +85,7 @@ ...@@ -85,7 +85,7 @@
"""Simple rpc mechanisms """Simple rpc mechanisms
""" """
__version__ = "$Revision: 1.19 $"[11:-2] __version__ = "$Revision: 1.20 $"[11:-2]
from cPickle import loads from cPickle import loads
import cPickle import cPickle
...@@ -166,8 +166,10 @@ class asyncRPC(SizedMessageAsyncConnection): ...@@ -166,8 +166,10 @@ class asyncRPC(SizedMessageAsyncConnection):
return 1 return 1
def finishConnect(self, s): def finishConnect(self, s):
if self.__haveMainLoop: map=None # use the main loop map if self.__haveMainLoop:
else: map = {} # provide a dummy map map = None # use the main loop map
else:
map = {} # provide a dummy map
SizedMessageAsyncConnection.__init__(self, s, '', map) SizedMessageAsyncConnection.__init__(self, s, '', map)
# we are our own socket map! # we are our own socket map!
...@@ -221,12 +223,21 @@ class asyncRPC(SizedMessageAsyncConnection): ...@@ -221,12 +223,21 @@ class asyncRPC(SizedMessageAsyncConnection):
if c=='R': if c=='R':
if r=='RN.': return None # Common case! if r=='RN.': return None # Common case!
return loads(r[1:]) return loads(r[1:])
# If c == 'E', an error occured on the server. In
# this case, the return value is a pickled exception.
# Unpickle it and raise it on the client side. The
# traceback for this exception ends at this method,
# but the real error occurred somewhere in the server
# code. To diagnose the error, look for the real
# traceback in the server's zLOG output.
if c=='E': if c=='E':
try: r=loads(r[1:]) try: r=loads(r[1:])
except: except:
raise UnUnPickleableError(r[1:]) raise UnUnPickleableError(r[1:])
if type(r) is TupleType: raise r[0], r[1] if type(r) is TupleType:
raise r raise r[0], r[1] # see server log for real traceback
raise r
oob=self._outOfBand oob=self._outOfBand
if oob is not None: if oob is not None:
r=r[1:] r=r[1:]
...@@ -260,8 +271,10 @@ class asyncRPC(SizedMessageAsyncConnection): ...@@ -260,8 +271,10 @@ class asyncRPC(SizedMessageAsyncConnection):
def message_input(self, m): def message_input(self, m):
if self._debug: if self._debug:
md=`m` if len(m) > 60:
if len(m) > 60: md=md[:60]+' ...' md = repr(m[:60]) + ' ...'
else:
md = repr(m)
LOG(self._debug, TRACE, 'message_input %s' % md) LOG(self._debug, TRACE, 'message_input %s' % md)
c=m[:1] c=m[:1]
...@@ -292,6 +305,7 @@ class asyncRPC(SizedMessageAsyncConnection): ...@@ -292,6 +305,7 @@ class asyncRPC(SizedMessageAsyncConnection):
self.__Wakeup(lambda self=self: self.close()) self.__Wakeup(lambda self=self: self.close())
else: else:
self.close() self.close()
self._outOfBand = None
self.__closed = 1 self.__closed = 1
def close(self): def close(self):
...@@ -299,6 +313,8 @@ class asyncRPC(SizedMessageAsyncConnection): ...@@ -299,6 +313,8 @@ class asyncRPC(SizedMessageAsyncConnection):
self.aq_parent.notifyDisconnected(self) self.aq_parent.notifyDisconnected(self)
# causes read call to raise last exception, which should be # causes read call to raise last exception, which should be
# the socket error that caused asyncore to close the socket. # the socket error that caused asyncore to close the socket.
self.__r='E'+dump(sys.exc_info()[:2], 1) self.__r = 'E' + dump(sys.exc_info()[:2], 1)
try: self.__lr() try:
except: pass self.__lr()
except:
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment