Commit 1dbb6eed authored by Jeremy Hylton's avatar Jeremy Hylton

Changes from the ZEO-ZRPC-Dev branch merge.

parent 59586e77
...@@ -144,18 +144,20 @@ file 0 and file 1. ...@@ -144,18 +144,20 @@ file 0 and file 1.
""" """
__version__ = "$Revision: 1.18 $"[11:-2] __version__ = "$Revision: 1.19 $"[11:-2]
import os, tempfile import os, tempfile
from struct import pack, unpack from struct import pack, unpack
from thread import allocate_lock from thread import allocate_lock
import zLOG
magic='ZEC0' import sys
import zLOG
def LOG(msg, level=zLOG.BLATHER): def log(msg, level=zLOG.INFO):
zLOG.LOG("ZEC", level, msg) zLOG.LOG("ZEC", level, msg)
magic='ZEC0'
class ClientCache: class ClientCache:
def __init__(self, storage='', size=20000000, client=None, var=None): def __init__(self, storage='', size=20000000, client=None, var=None):
...@@ -211,16 +213,14 @@ class ClientCache: ...@@ -211,16 +213,14 @@ class ClientCache:
f[0].write(magic) f[0].write(magic)
current=0 current=0
log("cache opened. current = %s" % current)
self._limit=size/2 self._limit=size/2
self._current=current self._current=current
def close(self):
try:
self._f[self._current].close()
except (os.error, ValueError):
pass
def open(self): def open(self):
# XXX open is overloaded to perform two tasks for
# optimization reasons
self._acquire() self._acquire()
try: try:
self._index=index={} self._index=index={}
...@@ -235,6 +235,19 @@ class ClientCache: ...@@ -235,6 +235,19 @@ class ClientCache:
return serial.items() return serial.items()
finally: self._release() finally: self._release()
def close(self):
for f in self._f:
if f is not None:
f.close()
def verify(self, verifyFunc):
"""Call the verifyFunc on every object in the cache.
verifyFunc(oid, serialno, version)
"""
for oid, (s, vs) in self.open():
verifyFunc(oid, s, vs)
def invalidate(self, oid, version): def invalidate(self, oid, version):
self._acquire() self._acquire()
try: try:
...@@ -373,8 +386,6 @@ class ClientCache: ...@@ -373,8 +386,6 @@ class ClientCache:
self._f[current]=open(self._p[current],'w+b') self._f[current]=open(self._p[current],'w+b')
else: else:
# Temporary cache file: # Temporary cache file:
if self._f[current] is not None:
self._f[current].close()
self._f[current] = tempfile.TemporaryFile(suffix='.zec') self._f[current] = tempfile.TemporaryFile(suffix='.zec')
self._f[current].write(magic) self._f[current].write(magic)
self._pos=pos=4 self._pos=pos=4
...@@ -383,55 +394,57 @@ class ClientCache: ...@@ -383,55 +394,57 @@ class ClientCache:
def store(self, oid, p, s, version, pv, sv): def store(self, oid, p, s, version, pv, sv):
self._acquire() self._acquire()
try: self._store(oid, p, s, version, pv, sv) try:
finally: self._release() self._store(oid, p, s, version, pv, sv)
finally:
self._release()
def _store(self, oid, p, s, version, pv, sv): def _store(self, oid, p, s, version, pv, sv):
if not s: if not s:
p='' p = ''
s='\0\0\0\0\0\0\0\0' s = '\0\0\0\0\0\0\0\0'
tlen=31+len(p) tlen = 31 + len(p)
if version: if version:
tlen=tlen+len(version)+12+len(pv) tlen = tlen + len(version) + 12 + len(pv)
vlen=len(version) vlen = len(version)
else: else:
vlen=0 vlen = 0
pos=self._pos stlen = pack(">I", tlen)
current=self._current # accumulate various data to write into a list
f=self._f[current] l = [oid, 'v', stlen, pack(">HI", vlen, len(p)), s]
f.seek(pos) if p:
stlen=pack(">I",tlen) l.append(p)
write=f.write
write(oid+'v'+stlen+pack(">HI", vlen, len(p))+s)
if p: write(p)
if version: if version:
write(version) l.extend([version,
write(pack(">I", len(pv))) pack(">I", len(pv)),
write(pv) pv, sv])
write(sv) l.append(stlen)
f = self._f[self._current]
write(stlen) f.seek(self._pos)
f.write("".join(l))
if current: self._index[oid]=-pos
else: self._index[oid]=pos if self._current:
self._index[oid] = - self._pos
else:
self._index[oid] = self._pos
self._pos=pos+tlen self._pos += tlen
def read_index(index, serial, f, current): def read_index(index, serial, f, current):
LOG("read_index(%s)" % f.name)
seek=f.seek seek=f.seek
read=f.read read=f.read
pos=4 pos=4
seek(0,2)
size=f.tell()
while 1: while 1:
seek(pos) f.seek(pos)
h=read(27) h=read(27)
if len(h)==27 and h[8] in 'vni': if len(h)==27 and h[8] in 'vni':
tlen, vlen, dlen = unpack(">iHi", h[9:19]) tlen, vlen, dlen = unpack(">iHi", h[9:19])
else: else: tlen=-1
break
if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen: if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
break break
...@@ -466,15 +479,3 @@ def read_index(index, serial, f, current): ...@@ -466,15 +479,3 @@ def read_index(index, serial, f, current):
except: pass except: pass
return pos return pos
def main(files):
for file in files:
print file
index = {}
serial = {}
read_index(index, serial, open(file), 0)
print index.keys()
if __name__ == "__main__":
import sys
main(sys.argv[1:])
...@@ -83,178 +83,167 @@ ...@@ -83,178 +83,167 @@
# #
############################################################################## ##############################################################################
"""Network ZODB storage client """Network ZODB storage client
"""
__version__='$Revision: 1.35 $'[11:-2] XXX support multiple outstanding requests up until the vote?
XXX is_connected() vis ClientDisconnected error
"""
__version__='$Revision: 1.36 $'[11:-2]
import cPickle
import os
import socket
import string
import struct
import sys
import tempfile
import thread
import threading
import time
from types import TupleType, StringType
from struct import pack, unpack
import struct, time, os, socket, string, Sync, zrpc, ClientCache import ExtensionClass, Sync, ThreadLock
import tempfile, Invalidator, ExtensionClass, thread import ClientCache
import ThreadedAsync import zrpc2
import ServerStub
from TransactionBuffer import TransactionBuffer
now=time.time from ZODB import POSException
from struct import pack, unpack
from ZODB import POSException, BaseStorage
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from zLOG import LOG, PROBLEM, INFO from zLOG import LOG, PROBLEM, INFO, BLATHER
from Exceptions import Disconnected
try: from ZODB.ConflictResolution import ResolvedSerial def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
except: ResolvedSerial='rs' LOG(subsys, type, msg)
TupleType=type(()) try:
from ZODB.ConflictResolution import ResolvedSerial
except ImportError:
ResolvedSerial = 'rs'
class ClientStorageError(POSException.StorageError): class ClientStorageError(POSException.StorageError):
"""An error occured in the ZEO Client Storage""" """An error occured in the ZEO Client Storage"""
class UnrecognizedResult(ClientStorageError): class UnrecognizedResult(ClientStorageError):
"""A server call returned an unrecognized result """A server call returned an unrecognized result"""
"""
class ClientDisconnected(ClientStorageError):
"""The database storage is disconnected from the storage.
"""
class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
_connected=_async=0
__begin='tpc_begin_sync'
def __init__(self, connection, storage='1', cache_size=20000000,
name='', client='', debug=0, var=None,
min_disconnect_poll=5, max_disconnect_poll=300,
wait_for_server_on_startup=1):
# Decide whether to use non-temporary files class ClientDisconnected(ClientStorageError, Disconnected):
client=client or os.environ.get('ZEO_CLIENT','') """The database storage is disconnected from the storage."""
self._connection=connection def get_timestamp(prev_ts):
self._storage=storage t = time.time()
self._debug=debug t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,)))
self._wait_for_server_on_startup=wait_for_server_on_startup t = t.laterThan(prev_ts)
return t
self._info={'length': 0, 'size': 0, 'name': 'ZEO Client', class DisconnectedServerStub:
'supportsUndo':0, 'supportsVersions': 0, """Raise ClientDisconnected on all attribute access."""
}
self._call=zrpc.asyncRPC(connection, debug=debug,
tmin=min_disconnect_poll,
tmax=max_disconnect_poll)
name = name or str(connection) def __getattr__(self, attr):
raise ClientDisconnected()
self.closed = 0 disconnected_stub = DisconnectedServerStub()
self._tfile=tempfile.TemporaryFile()
self._oids=[]
self._serials=[]
self._seriald={}
ClientStorage.inheritedAttribute('__init__')(self, name) class ClientStorage:
self.__lock_acquire=self._lock_acquire def __init__(self, addr, storage='1', cache_size=20000000,
name='', client='', debug=0, var=None,
min_disconnect_poll=5, max_disconnect_poll=300,
wait_for_server_on_startup=0, read_only=0):
self._cache=ClientCache.ClientCache( self._server = disconnected_stub
storage, cache_size, client=client, var=var) self._is_read_only = read_only
self._storage = storage
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0}
ThreadedAsync.register_loop_callback(self.becomeAsync) self._tbuf = TransactionBuffer()
self._db = None
self._oids = []
# XXX It's confusing to have _serial, _serials, and _seriald.
self._serials = []
self._seriald = {}
# IMPORTANT: Note that we aren't fully "there" yet. self._basic_init(name or str(addr))
# In particular, we don't actually connect to the server
# until we have a controlling database set with registerDB
# below.
def registerDB(self, db, limit): # Decide whether to use non-temporary files
"""Register that the storage is controlled by the given DB. client = client or os.environ.get('ZEO_CLIENT', '')
""" self._cache = ClientCache.ClientCache(storage, cache_size,
client=client, var=var)
self._cache.open() # XXX
self._rpc_mgr = zrpc2.ConnectionManager(addr, self,
#debug=debug,
tmin=min_disconnect_poll,
tmax=max_disconnect_poll)
# XXX What if we can only get a read-only connection and we
# want a read-write connection? Looks like the current code
# will block forever.
# Among other things, we know that our data methods won't get if wait_for_server_on_startup:
# called until after this call. self._rpc_mgr.connect(sync=1)
else:
self.invalidator = Invalidator.Invalidator(db.invalidate, if not self._rpc_mgr.attempt_connect():
self._cache.invalidate) self._rpc_mgr.connect()
def out_of_band_hook( def _basic_init(self, name):
code, args, """Handle initialization activites of BaseStorage"""
get_hook={
'b': (self.invalidator.begin, 0), self.__name__ = name
'i': (self.invalidator.invalidate, 1),
'e': (self.invalidator.end, 0), # A ClientStorage only allows one client to commit at a time.
'I': (self.invalidator.Invalidate, 1), # A client enters the commit state by finding tpc_tid set to
'U': (self._commit_lock_release, 0), # None and updating it to the new transaction's id. The
's': (self._serials.append, 1), # tpc_tid variable is protected by tpc_cond.
'S': (self._info.update, 1), self.tpc_cond = threading.Condition()
}.get): self._transaction = None
hook = get_hook(code, None) # Prevent multiple new_oid calls from going out. The _oids
if hook is None: return # variable should only be modified while holding the
hook, flag = hook # oid_cond.
if flag: hook(args) self.oid_cond = threading.Condition()
else: hook()
commit_lock = thread.allocate_lock()
self._call.setOutOfBand(out_of_band_hook) self._commit_lock_acquire = commit_lock.acquire
self._commit_lock_release = commit_lock.release
# Now that we have our callback system in place, we can
# try to connect t = time.time()
t = self._ts = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
self._startup() self._serial = `t`
self._oid='\0\0\0\0\0\0\0\0'
def _startup(self):
if not self._call.connect(not self._wait_for_server_on_startup):
# If we can't connect right away, go ahead and open the cache
# and start a separate thread to try and reconnect.
LOG("ClientStorage", PROBLEM, "Failed to connect to storage")
self._cache.open()
thread.start_new_thread(self._call.connect,(0,))
# If the connect succeeds then this work will be done by
# notifyConnected
def notifyConnected(self, s):
LOG("ClientStorage", INFO, "Connected to storage")
self._lock_acquire()
try:
# We let the connection keep coming up now that
# we have the storage lock. This way, we know no calls
# will be made while in the process of coming up.
self._call.finishConnect(s)
if self.closed: def registerDB(self, db, limit):
return """Register that the storage is controlled by the given DB."""
log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit)))
self._connected=1 self._db = db
self._oids=[]
# we do synchronous commits until we are sure that
# we have and are ready for a main loop.
# Hm. This is a little silly. If self._async, then def is_connected(self):
# we will really never do a synchronous commit. if self._server is disconnected_stub:
# See below. return 0
self.__begin='tpc_begin_sync' else:
return 1
self._call.message_output(str(self._storage))
### This seems silly. We should get the info asynchronously. def notifyConnected(self, c):
# self._info.update(self._call('get_info')) log2(INFO, "Connected to storage")
stub = ServerStub.StorageServer(c)
cached=self._cache.open() self._oids = []
### This is a little expensive for large caches
if cached:
self._call.sendMessage('beginZeoVerify')
for oid, (s, vs) in cached:
self._call.sendMessage('zeoVerify', oid, s, vs)
self._call.sendMessage('endZeoVerify')
finally: self._lock_release() # XXX Why is this synchronous? If it were async, verification
# would start faster.
stub.register(str(self._storage), self._is_read_only)
self.verify_cache(stub)
if self._async: # Don't make the server available to clients until after
import asyncore # validating the cache
self.becomeAsync(asyncore.socket_map) self._server = stub
def verify_cache(self, server):
server.beginZeoVerify()
self._cache.verify(server.zeoVerify)
server.endZeoVerify()
### Is there a race condition between notifyConnected and ### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get ### notifyDisconnected? In Particular, what if we get
...@@ -268,363 +257,345 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage): ...@@ -268,363 +257,345 @@ class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
### in the middle of notifyDisconnected, because *it's* ### in the middle of notifyDisconnected, because *it's*
### responsible for starting the thread that makes the connection. ### responsible for starting the thread that makes the connection.
def notifyDisconnected(self, ignored): def notifyDisconnected(self):
LOG("ClientStorage", PROBLEM, "Disconnected from storage") log2(PROBLEM, "Disconnected from storage")
self._connected=0 self._server = disconnected_stub
self._transaction=None if self._transaction:
thread.start_new_thread(self._call.connect,(0,)) self._transaction = None
if self._transaction is not None: self.tpc_cond.notifyAll()
try: self.tpc_cond.release()
self._commit_lock_release()
except: def __len__(self):
pass return self._info['length']
def becomeAsync(self, map): def getName(self):
self._lock_acquire() return "%s (%s)" % (self.__name__, "XXX")
def getSize(self):
return self._info['size']
def supportsUndo(self):
return self._info['supportsUndo']
def supportsVersions(self):
return self._info['supportsVersions']
def supportsTransactionalUndo(self):
try: try:
self._async=1 return self._info['supportsTransactionalUndo']
if self._connected: except KeyError:
self._call.setLoop(map, getWakeup()) return 0
self.__begin='tpc_begin'
finally: self._lock_release()
def __len__(self): return self._info['length'] def isReadOnly(self):
return self._is_read_only
def abortVersion(self, src, transaction): def _check_trans(self, trans, exc=None):
if transaction is not self._transaction: if self._transaction is not trans:
raise POSException.StorageTransactionError(self, transaction) if exc is None:
self._lock_acquire() return 0
else:
raise exc(self._transaction, trans)
return 1
def _check_tid(self, tid, exc=None):
# XXX Is all this locking unnecessary? The only way to
# begin a transaction is to call tpc_begin(). If we assume
# clients are single-threaded and well-behaved, i.e. they call
# tpc_begin() first, then there appears to be no need for
# locking. If _check_tid() is called and self.tpc_tid != tid,
# then there is no way it can be come equal during the call.
# Thus, there should be no race.
if self.tpc_tid != tid:
if exc is None:
return 0
else:
raise exc(self.tpc_tid, tid)
return 1
# XXX But I'm not sure
self.tpc_cond.acquire()
try: try:
oids=self._call('abortVersion', src, self._serial) if self.tpc_tid != tid:
vlen = pack(">H", len(src)) if exc is None:
for oid in oids: return 0
self._tfile.write("i%s%s%s" % (oid, vlen, src)) else:
return oids raise exc(self.tpc_tid, tid)
finally: self._lock_release() return 1
finally:
self.tpc_cond.release()
def abortVersion(self, src, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._check_trans(transaction,
POSException.StorageTransactionError)
oids = self._server.abortVersion(src, self._serial)
for oid in oids:
self._tbuf.invalidate(oid, src)
return oids
def close(self): def close(self):
self._lock_acquire() self._rpc_mgr.close()
try: if self._cache is not None:
LOG("ClientStorage", INFO, "close")
self._call.closeIntensionally()
try:
self._tfile.close()
except os.error:
# On Windows, this can fail if it is called more than
# once, because it tries to delete the file each
# time.
pass
self._cache.close() self._cache.close()
if self.invalidator is not None:
self.invalidator.close()
self.invalidator = None
self.closed = 1
finally: self._lock_release()
def commitVersion(self, src, dest, transaction): def commitVersion(self, src, dest, transaction):
if transaction is not self._transaction: if self._is_read_only:
raise POSException.StorageTransactionError(self, transaction) raise POSException.ReadOnlyError()
self._lock_acquire() self._check_trans(transaction,
try: POSException.StorageTransactionError)
oids=self._call('commitVersion', src, dest, self._serial) oids = self._server.commitVersion(src, dest, self._serial)
if dest: if dest:
vlen = pack(">H", len(src)) # just invalidate our version data
# just invalidate our version data for oid in oids:
for oid in oids: self._tbuf.invalidate(oid, src)
self._tfile.write("i%s%s%s" % (oid, vlen, src)) else:
else: # dest is '', so invalidate version and non-version
vlen = pack(">H", len(dest)) for oid in oids:
# dest is '', so invalidate version and non-version self._tbuf.invalidate(oid, dest)
for oid in oids: return oids
self._tfile.write("i%s%s%s" % (oid, vlen, dest))
return oids
finally: self._lock_release()
def getName(self):
return "%s (%s)" % (
self.__name__,
self._connected and 'connected' or 'disconnected')
def getSize(self): return self._info['size']
def history(self, oid, version, length=1): def history(self, oid, version, length=1):
self._lock_acquire() return self._server.history(oid, version, length)
try: return self._call('history', oid, version, length)
finally: self._lock_release()
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
self._lock_acquire() return self._server.loadSerial(oid, serial)
try: return self._call('loadSerial', oid, serial)
finally: self._lock_release()
def load(self, oid, version, _stuff=None): def load(self, oid, version, _stuff=None):
self._lock_acquire() p = self._cache.load(oid, version)
try: if p:
cache=self._cache return p
p = cache.load(oid, version) if self._server is None:
if p: return p raise ClientDisconnected()
p, s, v, pv, sv = self._call('zeoLoad', oid) p, s, v, pv, sv = self._server.zeoLoad(oid)
cache.checkSize(0) self._cache.checkSize(0)
cache.store(oid, p, s, v, pv, sv) self._cache.store(oid, p, s, v, pv, sv)
if not v or not version or version != v: if v and version and v == version:
if s: return p, s
raise KeyError, oid # no non-version data for this
return pv, sv return pv, sv
finally: self._lock_release() else:
if s:
return p, s
raise KeyError, oid # no non-version data for this
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
self._lock_acquire() v = self._cache.modifiedInVersion(oid)
try: if v is not None:
v=self._cache.modifiedInVersion(oid) return v
if v is not None: return v return self._server.modifiedInVersion(oid)
return self._call('modifiedInVersion', oid)
finally: self._lock_release()
def new_oid(self, last=None): def new_oid(self, last=None):
self._lock_acquire() if self._is_read_only:
try: raise POSException.ReadOnlyError()
oids=self._oids # We want to avoid a situation where multiple oid requests are
if not oids: # made at the same time.
oids[:]=self._call('new_oids') self.oid_cond.acquire()
oids.reverse() if not self._oids:
self._oids = self._server.new_oids()
return oids.pop() self._oids.reverse()
finally: self._lock_release() self.oid_cond.notifyAll()
oid = self._oids.pop()
self.oid_cond.release()
return oid
def pack(self, t=None, rf=None, wait=0, days=0): def pack(self, t=None, rf=None, wait=0, days=0):
if self._is_read_only:
raise POSException.ReadOnlyError()
# Note that we ignore the rf argument. The server # Note that we ignore the rf argument. The server
# will provide it's own implementation. # will provide it's own implementation.
if t is None: t=time.time() if t is None:
t=t-(days*86400) t = time.time()
self._lock_acquire() t = t - (days * 86400)
try: return self._call('pack', t, wait) return self._server.pack(t, wait)
finally: self._lock_release()
def _check_serials(self):
if self._serials:
l = len(self._serials)
r = self._serials[:l]
del self._serials[:l]
for oid, s in r:
if isinstance(s, Exception):
raise s
self._seriald[oid] = s
return r
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction: if self._is_read_only:
raise POSException.StorageTransactionError(self, transaction) raise POSException.ReadOnlyError()
self._lock_acquire() self._check_trans(transaction, POSException.StorageTransactionError)
try: self._server.storea(oid, serial, data, version, self._serial)
serial=self._call.sendMessage('storea', oid, serial, self._tbuf.store(oid, version, data)
data, version, self._serial) return self._check_serials()
write=self._tfile.write
buf = string.join(("s", oid,
pack(">HI", len(version), len(data)),
version, data), "")
write(buf)
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
d=self._seriald
for oid, s in r: d[oid]=s
return r
return serial
finally: self._lock_release()
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
self._lock_acquire() if transaction is not self._transaction:
try: return
if transaction is not self._transaction: self._server.vote(self._serial)
return return self._check_serials()
self._call('vote', self._serial)
if self._serials:
s=self._serials
l=len(s)
r=s[:l]
del s[:l]
d=self._seriald
for oid, s in r: d[oid]=s
return r
finally: self._lock_release()
def supportsUndo(self):
return self._info['supportsUndo']
def supportsVersions(self):
return self._info['supportsVersions']
def supportsTransactionalUndo(self):
try:
return self._info['supportsTransactionalUndo']
except KeyError:
return 0
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
self._lock_acquire() if transaction is not self._transaction:
try: return
if transaction is not self._transaction: return self._server.tpc_abort(self._serial)
self._call('tpc_abort', self._serial) self._tbuf.clear()
self._transaction=None self._seriald.clear()
self._tfile.seek(0) del self._serials[:]
self._seriald.clear() self._transaction = None
del self._serials[:] self.tpc_cond.notify()
self._commit_lock_release() self.tpc_cond.release()
finally: self._lock_release()
def tpc_begin(self, transaction): def tpc_begin(self, transaction):
self._lock_acquire() self.tpc_cond.acquire()
try: while self._transaction is not None:
if self._transaction is transaction: return if self._transaction == transaction:
self.tpc_cond.release()
user=transaction.user return
desc=transaction.description self.tpc_cond.wait()
ext=transaction._extension
if self._server is None:
while 1: self.tpc_cond.release()
self._lock_release() raise ClientDisconnected()
self._commit_lock_acquire()
self._lock_acquire()
# We've got the local commit lock. Now get
# a (tentative) transaction time stamp.
t=time.time()
t=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
self._ts=t=t.laterThan(self._ts)
id=`t`
try:
if not self._connected:
raise ClientDisconnected(
"This action is temporarily unavailable.<p>")
r=self._call(self.__begin, id, user, desc, ext)
except:
# XXX can't seem to guarantee that the lock is held here.
self._commit_lock_release()
raise
if r is None: break
# We have *BOTH* the local and distributed commit
# lock, now we can actually get ready to get started.
self._serial=id
self._tfile.seek(0)
self._seriald.clear()
del self._serials[:]
self._transaction=transaction
finally: self._lock_release() self._ts = get_timestamp(self._ts)
id = `self._ts`
self._transaction = transaction
def tpc_finish(self, transaction, f=None):
self._lock_acquire()
try: try:
if transaction is not self._transaction: return r = self._server.tpc_begin(id,
if f is not None: f() transaction.user,
transaction.description,
self._call('tpc_finish', self._serial, transaction._extension)
transaction.user, except:
transaction.description, # If _server is None, then the client disconnected during
transaction._extension) # the tpc_begin() and notifyDisconnected() will have
# released the lock.
seriald=self._seriald if self._server is not disconnected_stub:
if self._serials: self.tpc_cond.release()
s=self._serials raise
l=len(s)
r=s[:l] self._serial = id
del s[:l] self._seriald.clear()
for oid, s in r: seriald[oid]=s del self._serials[:]
tfile=self._tfile def tpc_finish(self, transaction, f=None):
seek=tfile.seek if transaction is not self._transaction:
read=tfile.read return
cache=self._cache if f is not None: # XXX what is f()?
size=tfile.tell() f()
cache.checkSize(size)
seek(0) self._server.tpc_finish(self._serial)
i=0
while i < size: r = self._check_serials()
opcode=read(1) assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
if opcode == "s":
oid=read(8) self._update_cache()
s=seriald[oid]
h=read(6) self._transaction = None
vlen, dlen = unpack(">HI", h) self.tpc_cond.notify()
if vlen: v=read(vlen) self.tpc_cond.release()
else: v=''
p=read(dlen) def _update_cache(self):
if len(p) != dlen: # Iterate over the objects in the transaction buffer and
raise ClientStorageError, ( # update or invalidate the cache.
"Unexpected end of file in client storage " self._cache.checkSize(self._tbuf.get_size())
"temporary file." self._tbuf.begin_iterate()
) while 1:
if s==ResolvedSerial: try:
self._cache.invalidate(oid, v) t = self._tbuf.next()
else: except ValueError, msg:
self._cache.update(oid, s, v, p) raise ClientStorageError, (
i=i+15+vlen+dlen "Unexpected error reading temporary file in "
elif opcode == "i": "client storage: %s" % msg)
oid=read(8) if t is None:
h=read(2) break
vlen=unpack(">H", h)[0] oid, v, p = t
v=read(vlen) if p is None: # an invalidation
self._cache.invalidate(oid, v) s = None
i=i+11+vlen else:
s = self._seriald[oid]
seek(0) if s == ResolvedSerial or s is None:
self._cache.invalidate(oid, v)
self._transaction=None else:
self._commit_lock_release() self._cache.update(oid, s, v, p)
finally: self._lock_release() self._tbuf.clear()
def transactionalUndo(self, trans_id, trans): def transactionalUndo(self, trans_id, trans):
self._lock_acquire() if self._is_read_only:
try: raise POSException.ReadOnlyError()
if trans is not self._transaction: self._check_trans(trans, POSException.StorageTransactionError)
raise POSException.StorageTransactionError(self, transaction) oids = self._server.transactionalUndo(trans_id, self._serial)
oids = self._call('transactionalUndo', trans_id, self._serial) for oid in oids:
for oid in oids: self._tbuf.invalidate(oid, '')
# write invalidation records with no version return oids
self._tfile.write("i%s\000\000" % oid)
return oids
finally: self._lock_release()
def undo(self, transaction_id): def undo(self, transaction_id):
self._lock_acquire() if self._is_read_only:
try: raise POSException.ReadOnlyError()
oids=self._call('undo', transaction_id) # XXX what are the sync issues here?
cinvalidate=self._cache.invalidate oids = self._server.undo(transaction_id)
for oid in oids: for oid in oids:
cinvalidate(oid,'') self._cache.invalidate(oid, '')
return oids return oids
finally: self._lock_release()
def undoInfo(self, first=0, last=-20, specification=None): def undoInfo(self, first=0, last=-20, specification=None):
self._lock_acquire() return self._server.undoInfo(first, last, specification)
try:
return self._call('undoInfo', first, last, specification)
finally: self._lock_release()
def undoLog(self, first, last, filter=None): def undoLog(self, first, last, filter=None):
if filter is not None: return () if filter is not None:
return () # XXX can't pass a filter to server
self._lock_acquire() return self._server.undoLog(first, last) # Eek!
try: return self._call('undoLog', first, last) # Eek!
finally: self._lock_release()
def versionEmpty(self, version): def versionEmpty(self, version):
self._lock_acquire() return self._server.versionEmpty(version)
try: return self._call('versionEmpty', version)
finally: self._lock_release()
def versions(self, max=None): def versions(self, max=None):
self._lock_acquire() return self._server.versions(max)
try: return self._call('versions', max)
finally: self._lock_release() # below are methods invoked by the StorageServer
def sync(self): self._call.sync() def serialno(self, arg):
self._serials.append(arg)
def getWakeup(_w=[]):
if _w: return _w[0] def info(self, dict):
import trigger self._info.update(dict)
t=trigger.trigger().pull_trigger
_w.append(t) def begin(self):
return t self._tfile = tempfile.TemporaryFile()
self._pickler = cPickle.Pickler(self._tfile, 1)
self._pickler.fast = 1 # Don't use the memo
def invalidate(self, args):
if self._pickler is None:
return
self._pickler.dump(args)
def end(self):
if self._pickler is None:
return
self._pickler.dump((0,0))
## self._pickler.dump = None
self._tfile.seek(0)
unpick = cPickle.Unpickler(self._tfile)
self._tfile = None
while 1:
oid, version = unpick.load()
if not oid:
break
self._cache.invalidate(oid, version=version)
self._db.invalidate(oid, version=version)
def Invalidate(self, args):
# XXX _db could be None
for oid, version in args:
self._cache.invalidate(oid, version=version)
try:
self._db.invalidate(oid, version=version)
except AttributeError, msg:
log2(PROBLEM,
"Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
repr(version),
msg))
"""Stub for interface exported by ClientStorage"""
class ClientStorage:
def __init__(self, rpc):
self.rpc = rpc
def beginVerify(self):
self.rpc.callAsync('begin')
# XXX what's the difference between these two?
def invalidate(self, args):
self.rpc.callAsync('invalidate', args)
def Invalidate(self, args):
self.rpc.callAsync('Invalidate', args)
def endVerify(self):
self.rpc.callAsync('end')
def serialno(self, arg):
self.rpc.callAsync('serialno', arg)
def info(self, arg):
self.rpc.callAsync('info', arg)
"""Exceptions for ZEO."""
class Disconnected(Exception):
"""Exception raised when a ZEO client is disconnected from the
ZEO server."""
"""Stub for interface exposed by StorageServer"""
class StorageServer:
def __init__(self, rpc):
self.rpc = rpc
def register(self, storage_name, read_only):
self.rpc.call('register', storage_name, read_only)
def get_info(self):
return self.rpc.call('get_info')
def get_size_info(self):
return self.rpc.call('get_size_info')
def beginZeoVerify(self):
self.rpc.callAsync('beginZeoVerify')
def zeoVerify(self, oid, s, sv):
self.rpc.callAsync('zeoVerify', oid, s, sv)
def endZeoVerify(self):
self.rpc.callAsync('endZeoVerify')
def new_oids(self, n=None):
if n is None:
return self.rpc.call('new_oids')
else:
return self.rpc.call('new_oids', n)
def pack(self, t, wait=None):
if wait is None:
self.rpc.call('pack', t)
else:
self.rpc.call('pack', t, wait)
def zeoLoad(self, oid):
return self.rpc.call('zeoLoad', oid)
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
def tpc_begin(self, id, user, descr, ext):
return self.rpc.call('tpc_begin', id, user, descr, ext)
def vote(self, trans_id):
return self.rpc.call('vote', trans_id)
def tpc_finish(self, id):
return self.rpc.call('tpc_finish', id)
def tpc_abort(self, id):
self.rpc.callAsync('tpc_abort', id)
def abortVersion(self, src, id):
return self.rpc.call('abortVersion', src, id)
def commitVersion(self, src, dest, id):
return self.rpc.call('commitVersion', src, dest, id)
def history(self, oid, version, length=None):
if length is not None:
return self.rpc.call('history', oid, version)
else:
return self.rpc.call('history', oid, version, length)
def load(self, oid, version):
return self.rpc.call('load', oid, version)
def loadSerial(self, oid, serial):
return self.rpc.call('loadSerial', oid, serial)
def modifiedInVersion(self, oid):
return self.rpc.call('modifiedInVersion', oid)
def new_oid(self, last=None):
if last is None:
return self.rpc.call('new_oid')
else:
return self.rpc.call('new_oid', last)
def store(self, oid, serial, data, version, trans):
return self.rpc.call('store', oid, serial, data, version, trans)
def transactionalUndo(self, trans_id, trans):
return self.rpc.call('transactionalUndo', trans_id, trans)
def undo(self, trans_id):
return self.rpc.call('undo', trans_id)
def undoLog(self, first, last):
# XXX filter not allowed across RPC
return self.rpc.call('undoLog', first, last)
def undoInfo(self, first, last, spec):
return self.rpc.call('undoInfo', first, last, spec)
def versionEmpty(self, vers):
return self.rpc.call('versionEmpty', vers)
def versions(self, max=None):
if max is None:
return self.rpc.call('versions')
else:
return self.rpc.call('versions', max)
############################################################################# ##############################################################################
# #
# Zope Public License (ZPL) Version 1.0 # Zope Public License (ZPL) Version 1.0
# ------------------------------------- # -------------------------------------
...@@ -59,7 +59,7 @@ ...@@ -59,7 +59,7 @@
# labeled as unofficial distributions. Modifications which do not # labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they # carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above. # conform to all of the clauses above.
# #
# #
# Disclaimer # Disclaimer
# #
...@@ -82,527 +82,394 @@ ...@@ -82,527 +82,394 @@
# attributions are listed in the accompanying credits file. # attributions are listed in the accompanying credits file.
# #
############################################################################## ##############################################################################
"""Network ZODB storage server
__version__ = "$Revision: 1.32 $"[11:-2] This server acts as a front-end for one or more real storages, like
file storage or Berkeley storage.
import asyncore, socket, string, sys, os XXX Need some basic access control-- a declaration of the methods
from smac import SizedMessageAsyncConnection exported for invocation by the server.
from ZODB import POSException """
import asyncore
import cPickle import cPickle
from cPickle import Unpickler import os
from ZODB.POSException import TransactionError, UndoError, VersionCommitError import sys
from ZODB.Transaction import Transaction import threading
import traceback import types
from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
import ClientStub
import zrpc2
import zLOG
from zrpc2 import Dispatcher, Handler, ManagedServerConnection, Delay
from ZODB.POSException import StorageError, StorageTransactionError, \
TransactionError, ReadOnlyError
from ZODB.referencesf import referencesf from ZODB.referencesf import referencesf
from thread import start_new_thread from ZODB.Transaction import Transaction
from cStringIO import StringIO
from ZEO import trigger
from ZEO import asyncwrap
from types import StringType
class StorageServerError(POSException.StorageError): pass
max_blather=120
def blather(*args):
accum = []
total_len = 0
for arg in args:
if not isinstance(arg, StringType):
arg = str(arg)
accum.append(arg)
total_len = total_len + len(arg)
if total_len >= max_blather:
break
m = string.join(accum)
if len(m) > max_blather: m = m[:max_blather] + ' ...'
LOG('ZEO Server', TRACE, m)
# We create a special fast pickler! This allows us # We create a special fast pickler! This allows us
# to create slightly more efficient pickles and # to create slightly more efficient pickles and
# to create them a tad faster. # to create them a tad faster.
pickler=cPickle.Pickler() pickler = cPickle.Pickler()
pickler.fast=1 # Don't use the memo pickler.fast = 1 # Don't use the memo
dump=pickler.dump dump = pickler.dump
class StorageServer(asyncore.dispatcher): def log(message, level=zLOG.INFO, label="ZEO Server:%s" % os.getpid(),
error=None):
def __init__(self, connection, storages): zLOG.LOG(label, level, message, error=error)
class StorageServerError(StorageError):
pass
class StorageServer:
def __init__(self, addr, storages, read_only=0):
# XXX should read_only be a per-storage option? not yet...
self.addr = addr
self.storages = storages
self.read_only = read_only
self.connections = {}
for name, store in storages.items():
fixup_storage(store)
self.dispatcher = Dispatcher(addr, factory=self.newConnection,
reuse_addr=1)
def newConnection(self, sock, addr, nil):
c = ManagedServerConnection(sock, addr, None, self)
c.register_object(StorageProxy(self, c))
return c
self.__storages=storages def register(self, storage_id, proxy):
for n, s in storages.items(): """Register a connection's use with a particular storage.
init_storage(s)
This information is needed to handle invalidation.
self.__connections={} """
self.__get_connections=self.__connections.get l = self.connections.get(storage_id)
if l is None:
self._pack_trigger = trigger.trigger() l = self.connections[storage_id] = []
asyncore.dispatcher.__init__(self) # intialize waiting list
self.storages[storage_id]._StorageProxy__waiting = []
if type(connection) is type(''): l.append(proxy)
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
try: os.unlink(connection) def invalidate(self, conn, storage_id, invalidated=(), info=0):
except: pass for p in self.connections[storage_id]:
else: if invalidated and p is not conn:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) p.client.Invalidate(invalidated)
self.set_reuse_addr() else:
p.client.info(info)
LOG('ZEO Server', INFO, 'Listening on %s' % repr(connection))
self.bind(connection) def close_server(self):
self.listen(5) # Close the dispatcher so that there are no new connections.
self.dispatcher.close()
def register_connection(self, connection, storage_id): for storage in self.storages.values():
storage=self.__storages.get(storage_id, None) storage.close()
if storage is None: # Force the asyncore mainloop to exit by hackery, i.e. close
LOG('ZEO Server', ERROR, "Unknown storage_id: %s" % storage_id) # every socket in the map. loop() will return when the map is
connection.close() # empty.
return None, None for s in asyncore.socket_map.values():
try:
connections=self.__get_connections(storage_id, None) s.close()
if connections is None: except:
self.__connections[storage_id]=connections=[] pass
connections.append(connection)
return storage, storage_id def close(self, conn):
# XXX who calls this?
def unregister_connection(self, connection, storage_id): # called when conn is closed
# way too inefficient
connections=self.__get_connections(storage_id, None) removed = 0
if connections: for sid, cl in self.connections.items():
n=[] if conn.obj in cl:
for c in connections: cl.remove(conn.obj)
if c is not connection: removed = 1
n.append(c)
class StorageProxy(Handler):
self.__connections[storage_id]=n def __init__(self, server, conn):
self.server = server
def invalidate(self, connection, storage_id, invalidated=(), info=0, self.client = ClientStub.ClientStorage(conn)
dump=dump): self.__storage = None
for c in self.__connections[storage_id]: self.__invalidated = []
if invalidated and c is not connection: self._transaction = None
c.message_output('I'+dump(invalidated, 1))
if info: def __repr__(self):
c.message_output('S'+dump(info, 1)) tid = self._transaction and repr(self._transaction.id)
if self.__storage:
def writable(self): return 0 stid = self.__storage._transaction and \
repr(self.__storage._transaction.id)
def handle_read(self): pass
def readable(self): return 1
def handle_connect (self): pass
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error:
sys.stderr.write('warning: accept failed\n')
else: else:
ZEOConnection(self, sock, addr) stid = None
return "<StorageProxy %X trans=%s s_trans=%s>" % (id(self), tid,
def log_info(self, message, type='info'): stid)
if type=='error': type=ERROR
else: type=INFO def _log(self, msg, level=zLOG.INFO, error=None, pid=os.getpid()):
LOG('ZEO Server', type, message) zLOG.LOG("ZEO Server %s %X" % (pid, id(self)),
level, msg, error=error)
log=log_info
def setup_delegation(self):
storage_methods={} """Delegate several methods to the storage"""
for n in ( self.undoInfo = self.__storage.undoInfo
'get_info', 'abortVersion', 'commitVersion', self.undoLog = self.__storage.undoLog
'history', 'load', 'loadSerial', self.versionEmpty = self.__storage.versionEmpty
'modifiedInVersion', 'new_oid', 'new_oids', 'pack', 'store', self.versions = self.__storage.versions
'storea', 'tpc_abort', 'tpc_begin', 'tpc_begin_sync', self.history = self.__storage.history
'tpc_finish', 'undo', 'undoLog', 'undoInfo', 'versionEmpty', 'versions', self.load = self.__storage.load
'transactionalUndo', self.loadSerial = self.__storage.loadSerial
'vote', 'zeoLoad', 'zeoVerify', 'beginZeoVerify', 'endZeoVerify',
): def _check_tid(self, tid, exc=None):
storage_methods[n]=1 caller = sys._getframe().f_back.f_code.co_name
storage_method=storage_methods.has_key if self._transaction is None:
self._log("no current transaction: %s()" % caller,
def find_global(module, name, zLOG.PROBLEM)
global_dict=globals(), silly=('__doc__',)): if exc is not None:
try: m=__import__(module, global_dict, global_dict, silly) raise exc(None, tid)
except:
raise StorageServerError, (
"Couldn\'t import global module %s" % module)
try: r=getattr(m, name)
except:
raise StorageServerError, (
"Couldn\'t find global %s in module %s" % (name, module))
safe=getattr(r, '__no_side_effects__', 0)
if safe: return r
raise StorageServerError, 'Unsafe global, %s.%s' % (module, name)
_noreturn=[]
class ZEOConnection(SizedMessageAsyncConnection):
_transaction=None
__storage=__storage_id=None
def __init__(self, server, sock, addr):
self.__server=server
self.__invalidated=[]
self.__closed=None
if __debug__: debug='ZEO Server'
else: debug=0
SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug)
LOG('ZEO Server', INFO, 'Connect %s %s' % (id(self), `addr`))
def close(self):
t=self._transaction
if (t is not None and self.__storage is not None and
self.__storage._transaction is t):
self.tpc_abort(t.id)
else:
self._transaction=None
self.__invalidated=[]
self.__server.unregister_connection(self, self.__storage_id)
self.__closed=1
SizedMessageAsyncConnection.close(self)
LOG('ZEO Server', INFO, 'Close %s' % id(self))
def message_input(self, message,
dump=dump, Unpickler=Unpickler, StringIO=StringIO,
None=None):
if __debug__:
if len(message) > max_blather:
tmp = `message[:max_blather]`
else: else:
tmp = `message` return 0
blather('message_input', id(self), tmp) if self._transaction.id != tid:
self._log("%s(%s) invalid; current transaction = %s" % \
if self.__storage is None: (caller, repr(tid), repr(self._transaction.id)),
# This is the first communication from the client zLOG.PROBLEM)
self.__storage, self.__storage_id = ( if exc is not None:
self.__server.register_connection(self, message)) raise exc(self._transaction.id, tid)
# Send info back asynchronously, so client need not ask
self.message_output('S'+dump(self.get_info(), 1))
return
try:
# Unpickle carefully.
unpickler=Unpickler(StringIO(message))
unpickler.find_global=find_global
args=unpickler.load()
name, args = args[0], args[1:]
if __debug__:
apply(blather,
("call", id(self), ":", name,) + args)
if not storage_method(name):
raise 'Invalid Method Name', name
if hasattr(self, name):
r=apply(getattr(self, name), args)
else: else:
r=apply(getattr(self.__storage, name), args) return 0
if r is _noreturn: return return 1
except (UndoError, VersionCommitError):
# These are normal usage errors. No need to leg them
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
except:
LOG('ZEO Server', ERROR, 'error', error=sys.exc_info())
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
if __debug__: def register(self, storage_id, read_only):
blather("%s R: %s" % (id(self), `r`)) """Select the storage that this client will use
r=dump(r,1)
self.message_output('R'+r)
def return_error(self, err_type, err_value, type=type, dump=dump): This method must be the first one called by the client.
if type(err_value) is not type(self): """
err_value = err_type, err_value storage = self.server.storages.get(storage_id)
if storage is None:
self._log("unknown storage_id: %s" % storage_id)
raise ValueError, "unknown storage: %s" % storage_id
if __debug__: if not read_only and (self.server.read_only or storage.isReadOnly()):
blather("%s E: %s" % (id(self), `err_value`)) raise ReadOnlyError()
try: r=dump(err_value, 1)
except:
# Ugh, must be an unpicklable exception
r=StorageServerError("Couldn't pickle error %s" % `r`)
dump('',1) # clear pickler
r=dump(r,1)
self.message_output('E'+r) self.__storage_id = storage_id
self.__storage = storage
self.setup_delegation()
self.server.register(storage_id, self)
self._log("registered storage %s: %s" % (storage_id, storage))
def get_info(self): def get_info(self):
storage=self.__storage return {'length': len(self.__storage),
info = { 'size': self.__storage.getSize(),
'length': len(storage), 'name': self.__storage.getName(),
'size': storage.getSize(), 'supportsUndo': self.__storage.supportsUndo(),
'name': storage.getName(), 'supportsVersions': self.__storage.supportsVersions(),
} 'supportsTransactionalUndo':
for feature in ('supportsUndo', self.__storage.supportsTransactionalUndo(),
'supportsVersions', }
'supportsTransactionalUndo',):
if hasattr(storage, feature):
info[feature] = getattr(storage, feature)()
else:
info[feature] = 0
return info
def get_size_info(self): def get_size_info(self):
storage=self.__storage return {'length': len(self.__storage),
return { 'size': self.__storage.getSize(),
'length': len(storage), }
'size': storage.getSize(),
}
def zeoLoad(self, oid): def zeoLoad(self, oid):
storage=self.__storage v = self.__storage.modifiedInVersion(oid)
v=storage.modifiedInVersion(oid) if v:
if v: pv, sv = storage.load(oid, v) pv, sv = self.__storage.load(oid, v)
else: pv=sv=None else:
pv = sv = None
try: try:
p, s = storage.load(oid,'') p, s = self.__storage.load(oid, '')
except KeyError: except KeyError:
if sv: if sv:
# Created in version, no non-version data # Created in version, no non-version data
p=s=None p = s = None
else: else:
raise raise
return p, s, v, pv, sv return p, s, v, pv, sv
def beginZeoVerify(self): def beginZeoVerify(self):
self.message_output('bN.') self.client.beginVerify()
return _noreturn
def zeoVerify(self, oid, s, sv):
def zeoVerify(self, oid, s, sv, try:
dump=dump): p, os, v, pv, osv = self.zeoLoad(oid)
try: p, os, v, pv, osv = self.zeoLoad(oid) except: # except what?
except: return _noreturn return None
p=pv=None # free the pickles
if os != s: if os != s:
self.message_output('i'+dump((oid, ''),1)) self.client.invalidate((oid, ''))
elif osv != sv: elif osv != sv:
self.message_output('i'+dump((oid, v),1)) self.client.invalidate((oid, v))
return _noreturn
def endZeoVerify(self): def endZeoVerify(self):
self.message_output('eN.') self.client.endVerify()
return _noreturn
def new_oids(self, n=100):
new_oid=self.__storage.new_oid
if n < 0: n=1
r=range(n)
for i in r: r[i]=new_oid()
return r
def pack(self, t, wait=0): def pack(self, t, wait=0):
start_new_thread(self._pack, (t,wait)) t = threading.Thread(target=self._pack, args=(t, wait))
if wait: return _noreturn t.start()
def _pack(self, t, wait=0): def _pack(self, t, wait=0):
try: try:
LOG('ZEO Server', BLATHER, 'pack begin')
self.__storage.pack(t, referencesf) self.__storage.pack(t, referencesf)
LOG('ZEO Server', BLATHER, 'pack end')
except: except:
LOG('ZEO Server', ERROR, self._log('ZEO Server', zLOG.ERROR,
'Pack failed for %s' % self.__storage_id, 'Pack failed for %s' % self.__storage_id,
error=sys.exc_info()) error=sys.exc_info())
if wait: if wait:
self.return_error(sys.exc_info()[0], sys.exc_info()[1]) raise
self.__server._pack_trigger.pull_trigger()
else: else:
if wait: if not wait:
self.message_output('RN.')
self.__server._pack_trigger.pull_trigger()
else:
# Broadcast new size statistics # Broadcast new size statistics
self.__server.invalidate(0, self.__storage_id, (), self.server.invalidate(0, self.__storage_id, (),
self.get_size_info()) self.get_size_info())
def abortVersion(self, src, id): def abortVersion(self, src, id):
t=self._transaction self._check_tid(id, exc=StorageTransactionError)
if t is None or id != t.id: oids = self.__storage.abortVersion(src, self._transaction)
raise POSException.StorageTransactionError(self, id) for oid in oids:
oids=self.__storage.abortVersion(src, t) self.__invalidated.append((oid, src))
a=self.__invalidated.append
for oid in oids: a((oid,src))
return oids return oids
def commitVersion(self, src, dest, id): def commitVersion(self, src, dest, id):
t=self._transaction self._check_tid(id, exc=StorageTransactionError)
if t is None or id != t.id: oids = self.__storage.commitVersion(src, dest, self._transaction)
raise POSException.StorageTransactionError(self, id)
oids=self.__storage.commitVersion(src, dest, t)
a=self.__invalidated.append
for oid in oids: for oid in oids:
a((oid,dest)) self.__invalidated.append((oid, dest))
if dest: a((oid,src)) if dest:
self.__invalidated.append((oid, src))
return oids return oids
def storea(self, oid, serial, data, version, id, def storea(self, oid, serial, data, version, id):
dump=dump): self._check_tid(id, exc=StorageTransactionError)
try: try:
t=self._transaction # XXX does this stmt need to be in the try/except?
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
newserial=self.__storage.store(oid, serial, data, version, t) newserial = self.__storage.store(oid, serial, data, version,
self._transaction)
except TransactionError, v: except TransactionError, v:
# This is a normal transaction errorm such as a conflict error # This is a normal transaction error such as a conflict error
# or a version lock or conflict error. It doen't need to be # or a version lock or conflict error. It doesn't need to be
# logged. # logged.
newserial=v self._log("transaction error: %s" % repr(v))
newserial = v
except: except:
# all errors need to be serialized to prevent unexpected # all errors need to be serialized to prevent unexpected
# returns, which would screw up the return handling. # returns, which would screw up the return handling.
# IOW, Anything that ends up here is evil enough to be logged. # IOW, Anything that ends up here is evil enough to be logged.
LOG('ZEO Server', ERROR, 'store error', error=sys.exc_info()) error = sys.exc_info()
newserial=sys.exc_info()[1] self._log('store error: %s: %s' % (error[0], error[1]),
zLOG.ERROR, error=error)
newserial = sys.exc_info()[1]
else: else:
if serial != '\0\0\0\0\0\0\0\0': if serial != '\0\0\0\0\0\0\0\0':
self.__invalidated.append((oid, version)) self.__invalidated.append((oid, version))
try: r=dump((oid,newserial), 1) try:
nil = dump(newserial, 1)
except: except:
# We got a pickling error, must be because the self._log("couldn't pickle newserial: %s" % repr(newserial),
# newserial is an unpicklable exception. zLOG.ERROR)
r=StorageServerError("Couldn't pickle exception %s" % `newserial`) dump('', 1) # clear pickler
dump('',1) # clear pickler r = StorageServerError("Couldn't pickle exception %s" % \
r=dump((oid, r),1) `newserial`)
newserial = r
self.message_output('s'+r) self.client.serialno((oid, newserial))
return _noreturn
def vote(self, id): def vote(self, id):
t=self._transaction self._check_tid(id, exc=StorageTransactionError)
if t is None or id != t.id: self.__storage.tpc_vote(self._transaction)
raise POSException.StorageTransactionError(self, id)
return self.__storage.tpc_vote(t)
def transactionalUndo(self, trans_id, id): def transactionalUndo(self, trans_id, id):
t=self._transaction self._check_tid(id, exc=StorageTransactionError)
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
return self.__storage.transactionalUndo(trans_id, self._transaction) return self.__storage.transactionalUndo(trans_id, self._transaction)
def undo(self, transaction_id): def undo(self, transaction_id):
oids=self.__storage.undo(transaction_id) oids = self.__storage.undo(transaction_id)
if oids: if oids:
self.__server.invalidate( self.server.invalidate(self, self.__storage_id,
self, self.__storage_id, map(lambda oid: (oid,None), oids) map(lambda oid: (oid, None, ''), oids))
)
return oids return oids
return () return ()
def tpc_abort(self, id): # When multiple clients are using a single storage, there are several
t=self._transaction # different _transaction attributes to keep track of. Each
if t is None or id != t.id: return # StorageProxy object has a single _transaction that refers to its
r=self.__storage.tpc_abort(t) # current transaction. The storage (self.__storage) has another
# _transaction that is used for the *real* transaction.
storage=self.__storage
try: waiting=storage.__waiting # The real trick comes with the __waiting queue for a storage.
except: waiting=storage.__waiting=[] # When a StorageProxy pulls a new transaction from the queue, it
while waiting: # must inform the new transaction's proxy. (The two proxies may
f, args = waiting.pop(0) # be the same.) The new transaction's proxy sets its _transaction
if apply(f,args): break # and continues from there.
self._transaction=None
self.__invalidated=[]
def unlock(self):
if self.__closed: return
self.message_output('UN.')
def tpc_begin(self, id, user, description, ext): def tpc_begin(self, id, user, description, ext):
t=self._transaction if self._transaction is not None:
if t is not None: if self._transaction.id == id:
if id == t.id: return self._log("duplicate tpc_begin(%s)" % repr(id))
return
else: else:
raise StorageServerError( raise StorageTransactionError("Multiple simultaneous tpc_begin"
"Multiple simultaneous tpc_begin requests from the same " " requests from one client.")
"client."
) t = Transaction()
storage=self.__storage t.id = id
if storage._transaction is not None: t.user = user
try: waiting=storage.__waiting t.description = description
except: waiting=storage.__waiting=[] t._extension = ext
waiting.append((self.unlock, ()))
return 1 # Return a flag indicating a lock condition. if self.__storage._transaction is not None:
d = zrpc2.Delay()
self._transaction=t=Transaction() self.__storage.__waiting.append((d, self, t))
t.id=id return d
t.user=user
t.description=description self._transaction = t
t._extension=ext self.__storage.tpc_begin(t)
storage.tpc_begin(t) self.__invalidated = []
self.__invalidated=[]
def tpc_finish(self, id):
def tpc_begin_sync(self, id, user, description, ext): if not self._check_tid(id):
if self.__closed: return return
t=self._transaction
if t is not None and id == t.id: return
storage=self.__storage
if storage._transaction is None:
self.try_again_sync(id, user, description, ext)
else:
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
waiting.append((self.try_again_sync, (id, user, description, ext)))
return _noreturn r = self.__storage.tpc_finish(self._transaction)
assert self.__storage._transaction is None
def try_again_sync(self, id, user, description, ext):
storage=self.__storage
if storage._transaction is None:
self._transaction=t=Transaction()
t.id=id
t.user=user
t.description=description
storage.tpc_begin(t)
self.__invalidated=[]
self.message_output('RN.')
return 1
def tpc_finish(self, id, user, description, ext): if self.__invalidated:
t=self._transaction self.server.invalidate(self, self.__storage_id,
if id != t.id: return self.__invalidated,
self.get_size_info())
storage=self.__storage if not self._handle_waiting():
r=storage.tpc_finish(t) self._transaction = None
self.__invalidated = []
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
while waiting:
f, args = waiting.pop(0)
if apply(f,args): break
self._transaction=None def tpc_abort(self, id):
if self.__invalidated: if not self._check_tid(id):
self.__server.invalidate(self, self.__storage_id, return
self.__invalidated, r = self.__storage.tpc_abort(self._transaction)
self.get_size_info()) assert self.__storage._transaction is None
self.__invalidated=[]
if not self._handle_waiting():
def init_storage(storage): self._transaction = None
if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None self.__invalidated = []
if __name__=='__main__': def _restart_delayed_transaction(self, delay, trans):
import ZODB.FileStorage self._transaction = trans
name, port = sys.argv[1:3] self.__storage.tpc_begin(trans)
blather(name, port) self.__invalidated = []
try: assert self._transaction.id == self.__storage._transaction.id
port='', int(port) delay.reply(None)
except:
pass def _handle_waiting(self):
if self.__storage.__waiting:
d = {'1': ZODB.FileStorage.FileStorage(name)} delay, proxy, trans = self.__storage.__waiting.pop(0)
StorageServer(port, d) proxy._restart_delayed_transaction(delay, trans)
asyncwrap.loop() if self is proxy:
return 1
def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
if n < 0:
n = 1
return [self.__storage.new_oid() for i in range(n)]
def fixup_storage(storage):
# backwards compatibility hack
if not hasattr(storage,'tpc_vote'):
storage.tpc_vote = lambda *args: None
"""A TransactionBuffer store transaction updates until commit or abort.
A transaction may generate enough data that it is not practical to
always hold pending updates in memory. Instead, a TransactionBuffer
is used to store the data until a commit or abort.
"""
# XXX Figure out what a sensible storage format is
# XXX A faster implementation might store trans data in memory until
# it reaches a certain size.
import tempfile
import cPickle
class TransactionBuffer:
def __init__(self):
self.file = tempfile.TemporaryFile()
self.count = 0
self.size = 0
# It's safe to use a fast pickler because the only objects
# stored are builtin types -- strings or None.
self.pickler = cPickle.Pickler(self.file, 1)
self.pickler.fast = 1
def store(self, oid, version, data):
"""Store oid, version, data for later retrieval"""
self.pickler.dump((oid, version, data))
self.count += 1
# Estimate per-record cache size
self.size = self.size + len(data) + (27 + 12)
if version:
self.size = self.size + len(version) + 4
def invalidate(self, oid, version):
self.pickler.dump((oid, version, None))
self.count += 1
def clear(self):
"""Mark the buffer as empty"""
self.file.seek(0)
self.count = 0
self.size = 0
# XXX unchecked constraints:
# 1. can't call store() after begin_iterate()
# 2. must call clear() after iteration finishes
def begin_iterate(self):
"""Move the file pointer in advance of iteration"""
self.file.flush()
self.file.seek(0)
self.unpickler = cPickle.Unpickler(self.file)
def next(self):
"""Return next tuple of data or None if EOF"""
if self.count == 0:
del self.unpickler
return None
oid_ver_data = self.unpickler.load()
self.count -= 1
return oid_ver_data
def get_size(self):
"""Return size of data stored in buffer (just a hint)."""
return self.size
...@@ -85,11 +85,14 @@ ...@@ -85,11 +85,14 @@
"""Sized message async connections """Sized message async connections
""" """
__version__ = "$Revision: 1.11 $"[11:-2] __version__ = "$Revision: 1.12 $"[11:-2]
import asyncore, struct
from Exceptions import Disconnected
from zLOG import LOG, TRACE, ERROR, INFO, BLATHER
from types import StringType
import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno import socket, errno
from zLOG import LOG, TRACE, ERROR, INFO
# Use the dictionary to make sure we get the minimum number of errno # Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems -- # entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
...@@ -109,81 +112,101 @@ tmp_dict = {errno.EAGAIN: 0, ...@@ -109,81 +112,101 @@ tmp_dict = {errno.EAGAIN: 0,
expected_socket_write_errors = tuple(tmp_dict.keys()) expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict del tmp_dict
class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher): class SizedMessageAsyncConnection(asyncore.dispatcher):
__super_init = asyncore.dispatcher.__init__
__super_close = asyncore.dispatcher.close
__closed = 1 # Marker indicating that we're closed
__append=None # Marker indicating that we're closed socket = None # to outwit Sam's getattr
socket=None # to outwit Sam's getattr READ_SIZE = 8096
def __init__(self, sock, addr, map=None, debug=None): def __init__(self, sock, addr, map=None, debug=None):
SizedMessageAsyncConnection.inheritedAttribute( self.__super_init(sock, map)
'__init__')(self, sock, map) self.addr = addr
self.addr=addr
if debug is not None: if debug is not None:
self._debug=debug self._debug = debug
elif not hasattr(self, '_debug'): elif not hasattr(self, '_debug'):
self._debug=__debug__ and 'smac' self._debug = __debug__ and 'smac'
self.__state=None self.__state = None
self.__inp=None self.__inp = None # None, a single String, or a list
self.__inpl=0 self.__input_len = 0
self.__l=4 self.__msg_size = 4
self.__output=output=[] self.__output = []
self.__append=output.append self.__closed = None
self.__pop=output.pop
# XXX avoid expensive getattr calls?
def handle_read(self, def __nonzero__(self):
join=string.join, StringType=type(''), _type=type, return 1
_None=None):
def handle_read(self):
# Use a single __inp buffer and integer indexes to make this
# fast.
try: try:
d=self.recv(8096) d=self.recv(8096)
except socket.error, err: except socket.error, err:
if err[0] in expected_socket_read_errors: if err[0] in expected_socket_read_errors:
return return
raise raise
if not d: return if not d:
return
inp=self.__inp
if inp is _None: input_len = self.__input_len + len(d)
inp=d msg_size = self.__msg_size
elif _type(inp) is StringType: state = self.__state
inp=[inp,d]
inp = self.__inp
if msg_size > input_len:
if inp is None:
self.__inp = d
elif type(self.__inp) is StringType:
self.__inp = [self.__inp, d]
else:
self.__inp.append(d)
self.__input_len = input_len
return # keep waiting for more input
# load all previous input and d into single string inp
if isinstance(inp, StringType):
inp = inp + d
elif inp is None:
inp = d
else: else:
inp.append(d) inp.append(d)
inp = "".join(inp)
inpl=self.__inpl+len(d)
l=self.__l offset = 0
while (offset + msg_size) <= input_len:
while 1: msg = inp[offset:offset + msg_size]
offset = offset + msg_size
if l <= inpl: if state is None:
# Woo hoo, we have enough data # waiting for message
if _type(inp) is not StringType: inp=join(inp,'') msg_size = struct.unpack(">i", msg)[0]
d=inp[:l] state = 1
inp=inp[l:]
inpl=inpl-l
if self.__state is _None:
# waiting for message
l=struct.unpack(">i",d)[0]
self.__state=1
else:
l=4
self.__state=_None
self.message_input(d)
else: else:
break # not enough data msg_size = 4
state = None
self.__l=l self.message_input(msg)
self.__inp=inp
self.__inpl=inpl
def readable(self): return 1 self.__state = state
def writable(self): return not not self.__output self.__msg_size = msg_size
self.__inp = inp[offset:]
self.__input_len = input_len - offset
def readable(self):
return 1
def writable(self):
if len(self.__output) == 0:
return 0
else:
return 1
def handle_write(self): def handle_write(self):
output=self.__output output = self.__output
while output: while output:
v=output[0] v = output[0]
try: try:
n=self.send(v) n=self.send(v)
except socket.error, err: except socket.error, err:
...@@ -191,42 +214,33 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher): ...@@ -191,42 +214,33 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
break # we couldn't write anything break # we couldn't write anything
raise raise
if n < len(v): if n < len(v):
output[0]=v[n:] output[0] = v[n:]
break # we can't write any more break # we can't write any more
else: else:
del output[0] del output[0]
#break # waaa
def handle_close(self): def handle_close(self):
self.close() self.close()
def message_output(self, message, def message_output(self, message):
pack=struct.pack, len=len): if __debug__:
if self._debug: if self._debug:
if len(message) > 40: m=message[:40]+' ...' if len(message) > 40:
else: m=message m = message[:40]+' ...'
LOG(self._debug, TRACE, 'message_output %s' % `m`) else:
m = message
append=self.__append LOG(self._debug, TRACE, 'message_output %s' % `m`)
if append is None:
raise Disconnected("This action is temporarily unavailable.<p>") if self.__closed is not None:
raise Disconnected, (
append(pack(">i",len(message))+message) "This action is temporarily unavailable."
"<p>"
def log_info(self, message, type='info'): )
if type=='error': type=ERROR # do two separate appends to avoid copying the message string
else: type=INFO self.__output.append(struct.pack(">i", len(message)))
LOG('ZEO', type, message) self.__output.append(message)
log=log_info
def close(self): def close(self):
if self.__append is not None: if self.__closed is None:
self.__append=None self.__closed = 1
SizedMessageAsyncConnection.inheritedAttribute('close')(self) self.__super_close()
class Disconnected(Exception):
"""The client has become disconnected from the server
"""
...@@ -86,10 +86,13 @@ ...@@ -86,10 +86,13 @@
"""Start the server storage. """Start the server storage.
""" """
__version__ = "$Revision: 1.26 $"[11:-2] __version__ = "$Revision: 1.27 $"[11:-2]
import sys, os, getopt, string import sys, os, getopt, string
import StorageServer
import asyncore
def directory(p, n=1): def directory(p, n=1):
d=p d=p
while n: while n:
...@@ -115,9 +118,11 @@ def get_storage(m, n, cache={}): ...@@ -115,9 +118,11 @@ def get_storage(m, n, cache={}):
def main(argv): def main(argv):
me=argv[0] me=argv[0]
sys.path[:]==filter(None, sys.path)
sys.path.insert(0, directory(me, 2)) sys.path.insert(0, directory(me, 2))
# XXX hack for profiling support
global unix, storages, zeo_pid, asyncore
args=[] args=[]
last='' last=''
for a in argv[1:]: for a in argv[1:]:
...@@ -130,25 +135,13 @@ def main(argv): ...@@ -130,25 +135,13 @@ def main(argv):
args.append(a) args.append(a)
last=a last=a
if os.environ.has_key('INSTANCE_HOME'): INSTANCE_HOME=os.environ.get('INSTANCE_HOME', directory(me, 4))
INSTANCE_HOME=os.environ['INSTANCE_HOME']
elif os.path.isdir(os.path.join(directory(me, 4),'var')):
INSTANCE_HOME=directory(me, 4)
else:
INSTANCE_HOME=os.getcwd()
if os.path.isdir(os.path.join(INSTANCE_HOME, 'var')):
var=os.path.join(INSTANCE_HOME, 'var')
else:
var=INSTANCE_HOME
zeo_pid=os.environ.get('ZEO_SERVER_PID', zeo_pid=os.environ.get('ZEO_SERVER_PID',
os.path.join(var, 'ZEO_SERVER.pid') os.path.join(INSTANCE_HOME, 'var', 'ZEO_SERVER.pid')
) )
opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:') fs=os.path.join(INSTANCE_HOME, 'var', 'Data.fs')
fs=os.path.join(var, 'Data.fs')
usage="""%s [options] [filename] usage="""%s [options] [filename]
...@@ -156,17 +149,14 @@ def main(argv): ...@@ -156,17 +149,14 @@ def main(argv):
-D -- Run in debug mode -D -- Run in debug mode
-d -- Generate detailed debug logging without running
in the foreground.
-U -- Unix-domain socket file to listen on -U -- Unix-domain socket file to listen on
-u username or uid number -u username or uid number
The username to run the ZEO server as. You may want to run The username to run the ZEO server as. You may want to run
the ZEO server as 'nobody' or some other user with limited the ZEO server as 'nobody' or some other user with limited
resouces. The only works under Unix, and if the storage resouces. The only works under Unix, and if ZServer is
server is started by root. started by root.
-p port -- port to listen on -p port -- port to listen on
...@@ -189,23 +179,42 @@ def main(argv): ...@@ -189,23 +179,42 @@ def main(argv):
attr_name -- This is the name to which the storage object attr_name -- This is the name to which the storage object
is assigned in the module. is assigned in the module.
-P file -- Run under profile and dump output to file. Implies the
-s flag.
if no file name is specified, then %s is used. if no file name is specified, then %s is used.
""" % (me, fs) """ % (me, fs)
try:
opts, args = getopt.getopt(args, 'p:Dh:U:sS:u:P:')
except getopt.error, msg:
print usage
print msg
sys.exit(1)
port=None port=None
debug=detailed=0 debug=0
host='' host=''
unix=None unix=None
Z=1 Z=1
UID='nobody' UID='nobody'
prof = None
for o, v in opts: for o, v in opts:
if o=='-p': port=string.atoi(v) if o=='-p': port=string.atoi(v)
elif o=='-h': host=v elif o=='-h': host=v
elif o=='-U': unix=v elif o=='-U': unix=v
elif o=='-u': UID=v elif o=='-u': UID=v
elif o=='-D': debug=1 elif o=='-D': debug=1
elif o=='-d': detailed=1
elif o=='-s': Z=0 elif o=='-s': Z=0
elif o=='-P': prof = v
if prof:
Z = 0
try:
from ZServer.medusa import asyncore
sys.modules['asyncore']=asyncore
except: pass
if port is None and unix is None: if port is None and unix is None:
print usage print usage
...@@ -219,10 +228,9 @@ def main(argv): ...@@ -219,10 +228,9 @@ def main(argv):
sys.exit(1) sys.exit(1)
fs=args[0] fs=args[0]
__builtins__.__debug__=debug
if debug: os.environ['Z_DEBUG_MODE']='1' if debug: os.environ['Z_DEBUG_MODE']='1'
if detailed: os.environ['STUPID_LOG_SEVERITY']='-99999'
from zLOG import LOG, INFO, ERROR from zLOG import LOG, INFO, ERROR
# Try to set uid to "-u" -provided uid. # Try to set uid to "-u" -provided uid.
...@@ -263,71 +271,54 @@ def main(argv): ...@@ -263,71 +271,54 @@ def main(argv):
import zdaemon import zdaemon
zdaemon.run(sys.argv, '') zdaemon.run(sys.argv, '')
try: storages={}
for o, v in opts:
import ZEO.StorageServer, asyncore if o=='-S':
n, m = string.split(v,'=')
storages={} if string.find(m,':'):
for o, v in opts: # we got an attribute name
if o=='-S': m, a = string.split(m,':')
n, m = string.split(v,'=') else:
if string.find(m,':'): # attribute name must be same as storage name
# we got an attribute name a=n
m, a = string.split(m,':') storages[n]=get_storage(m,a)
else:
# attribute name must be same as storage name
a=n
storages[n]=get_storage(m,a)
if not storages:
import ZODB.FileStorage
storages['1']=ZODB.FileStorage.FileStorage(fs)
# Try to set up a signal handler
try:
import signal
signal.signal(signal.SIGTERM,
lambda sig, frame, s=storages: shutdown(s)
)
signal.signal(signal.SIGINT,
lambda sig, frame, s=storages: shutdown(s, 0)
)
try: signal.signal(signal.SIGHUP, rotate_logs_handler)
except: pass
except: pass
items=storages.items()
items.sort()
for kv in items:
LOG('ZEO Server', INFO, 'Serving %s:\t%s' % kv)
if not unix: unix=host, port
ZEO.StorageServer.StorageServer(unix, storages)
try: ppid, pid = os.getppid(), os.getpid()
except: pass # getpid not supported
else: open(zeo_pid,'w').write("%s %s" % (ppid, pid))
except:
# Log startup exception and tell zdaemon not to restart us.
info=sys.exc_info()
try:
import zLOG
zLOG.LOG("z2", zLOG.PANIC, "Startup exception",
error=info)
except:
pass
import traceback
apply(traceback.print_exception, info)
sys.exit(0)
asyncore.loop() if not storages:
import ZODB.FileStorage
storages['1']=ZODB.FileStorage.FileStorage(fs)
# Try to set up a signal handler
try:
import signal
signal.signal(signal.SIGTERM,
lambda sig, frame, s=storages: shutdown(s)
)
signal.signal(signal.SIGINT,
lambda sig, frame, s=storages: shutdown(s, 0)
)
signal.signal(signal.SIGHUP, rotate_logs_handler)
finally: pass
items=storages.items()
items.sort()
for kv in items:
LOG('ZEO Server', INFO, 'Serving %s:\t%s' % kv)
if not unix: unix=host, port
if prof:
cmds = \
"StorageServer.StorageServer(unix, storages);" \
'open(zeo_pid,"w").write("%s %s" % (os.getppid(), os.getpid()));' \
"asyncore.loop()"
import profile
profile.run(cmds, prof)
else:
StorageServer.StorageServer(unix, storages)
open(zeo_pid,'w').write("%s %s" % (os.getppid(), os.getpid()))
asyncore.loop()
def rotate_logs(): def rotate_logs():
import zLOG import zLOG
...@@ -335,10 +326,7 @@ def rotate_logs(): ...@@ -335,10 +326,7 @@ def rotate_logs():
zLOG.log_write.reinitialize() zLOG.log_write.reinitialize()
else: else:
# Hm, lets at least try to take care of the stupid logger: # Hm, lets at least try to take care of the stupid logger:
if hasattr(zLOG, '_set_stupid_dest'): zLOG._stupid_dest=None
zLOG._set_stupid_dest(None)
else:
zLOG._stupid_dest = None
def rotate_logs_handler(signum, frame): def rotate_logs_handler(signum, frame):
rotate_logs() rotate_logs()
...@@ -359,7 +347,7 @@ def shutdown(storages, die=1): ...@@ -359,7 +347,7 @@ def shutdown(storages, die=1):
for storage in storages.values(): for storage in storages.values():
try: storage.close() try: storage.close()
except: pass finally: pass
try: try:
from zLOG import LOG, INFO from zLOG import LOG, INFO
......
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL). A copy of the ZPL should accompany this
# distribution. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL
# EXPRESS OR IMPLIED WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST
# INFRINGEMENT, AND FITNESS FOR A PARTICULAR PURPOSE.
"""Library for forking storage server and connecting client storage""" """Library for forking storage server and connecting client storage"""
import asyncore import asyncore
import os import os
import profile
import random import random
import socket import socket
import sys import sys
import traceback
import types import types
import ZEO.ClientStorage, ZEO.StorageServer import ZEO.ClientStorage, ZEO.StorageServer
# Change value of PROFILE to enable server-side profiling
PROFILE = 0 PROFILE = 0
if PROFILE:
import hotshot
def get_port(): def get_port():
"""Return a port that is not in use. """Return a port that is not in use.
...@@ -66,9 +78,11 @@ else: ...@@ -66,9 +78,11 @@ else:
buf = self.recv(4) buf = self.recv(4)
if buf: if buf:
assert buf == "done" assert buf == "done"
server.close_server()
asyncore.socket_map.clear() asyncore.socket_map.clear()
def handle_close(self): def handle_close(self):
server.close_server()
asyncore.socket_map.clear() asyncore.socket_map.clear()
class ZEOClientExit: class ZEOClientExit:
...@@ -77,20 +91,27 @@ else: ...@@ -77,20 +91,27 @@ else:
self.pipe = pipe self.pipe = pipe
def close(self): def close(self):
os.write(self.pipe, "done") try:
os.close(self.pipe) os.write(self.pipe, "done")
os.close(self.pipe)
except os.error:
pass
def start_zeo_server(storage, addr): def start_zeo_server(storage, addr):
rd, wr = os.pipe() rd, wr = os.pipe()
pid = os.fork() pid = os.fork()
if pid == 0: if pid == 0:
if PROFILE: try:
p = profile.Profile() if PROFILE:
p.runctx("run_server(storage, addr, rd, wr)", globals(), p = hotshot.Profile("stats.s.%d" % os.getpid())
locals()) p.runctx("run_server(storage, addr, rd, wr)",
p.dump_stats("stats.s.%d" % os.getpid()) globals(), locals())
else: p.close()
run_server(storage, addr, rd, wr) else:
run_server(storage, addr, rd, wr)
except:
print "Exception in ZEO server process"
traceback.print_exc()
os._exit(0) os._exit(0)
else: else:
os.close(rd) os.close(rd)
...@@ -98,11 +119,11 @@ else: ...@@ -98,11 +119,11 @@ else:
def run_server(storage, addr, rd, wr): def run_server(storage, addr, rd, wr):
# in the child, run the storage server # in the child, run the storage server
global server
os.close(wr) os.close(wr)
ZEOServerExit(rd) ZEOServerExit(rd)
serv = ZEO.StorageServer.StorageServer(addr, {'1':storage}) server = ZEO.StorageServer.StorageServer(addr, {'1':storage})
asyncore.loop() asyncore.loop()
os.close(rd)
storage.close() storage.close()
if isinstance(addr, types.StringType): if isinstance(addr, types.StringType):
os.unlink(addr) os.unlink(addr)
...@@ -128,6 +149,7 @@ else: ...@@ -128,6 +149,7 @@ else:
s = ZEO.ClientStorage.ClientStorage(addr, storage_id, s = ZEO.ClientStorage.ClientStorage(addr, storage_id,
debug=1, client=cache, debug=1, client=cache,
cache_size=cache_size, cache_size=cache_size,
min_disconnect_poll=0.5) min_disconnect_poll=0.5,
wait_for_server_on_startup=1)
return s, exit, pid return s, exit, pid
import random
import unittest
from ZEO.TransactionBuffer import TransactionBuffer
def random_string(size):
"""Return a random string of size size."""
l = [chr(random.randrange(256)) for i in range(size)]
return "".join(l)
def new_store_data():
"""Return arbitrary data to use as argument to store() method."""
return random_string(8), '', random_string(random.randrange(1000))
def new_invalidate_data():
"""Return arbitrary data to use as argument to invalidate() method."""
return random_string(8), ''
class TransBufTests(unittest.TestCase):
def checkTypicalUsage(self):
tbuf = TransactionBuffer()
tbuf.store(*new_store_data())
tbuf.invalidate(*new_invalidate_data())
tbuf.begin_iterate()
while 1:
o = tbuf.next()
if o is None:
break
tbuf.clear()
def doUpdates(self, tbuf):
data = []
for i in range(10):
d = new_store_data()
tbuf.store(*d)
data.append(d)
d = new_invalidate_data()
tbuf.invalidate(*d)
data.append(d)
tbuf.begin_iterate()
for i in range(len(data)):
x = tbuf.next()
if x[2] is None:
# the tbuf add a dummy None to invalidates
x = x[:2]
self.assertEqual(x, data[i])
def checkOrderPreserved(self):
tbuf = TransactionBuffer()
self.doUpdates(tbuf)
def checkReusable(self):
tbuf = TransactionBuffer()
self.doUpdates(tbuf)
tbuf.clear()
self.doUpdates(tbuf)
tbuf.clear()
self.doUpdates(tbuf)
def test_suite():
return unittest.makeSuite(TransBufTests, 'check')
...@@ -85,11 +85,14 @@ ...@@ -85,11 +85,14 @@
"""Sized message async connections """Sized message async connections
""" """
__version__ = "$Revision: 1.11 $"[11:-2] __version__ = "$Revision: 1.12 $"[11:-2]
import asyncore, struct
from Exceptions import Disconnected
from zLOG import LOG, TRACE, ERROR, INFO, BLATHER
from types import StringType
import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno import socket, errno
from zLOG import LOG, TRACE, ERROR, INFO
# Use the dictionary to make sure we get the minimum number of errno # Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems -- # entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
...@@ -109,81 +112,101 @@ tmp_dict = {errno.EAGAIN: 0, ...@@ -109,81 +112,101 @@ tmp_dict = {errno.EAGAIN: 0,
expected_socket_write_errors = tuple(tmp_dict.keys()) expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict del tmp_dict
class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher): class SizedMessageAsyncConnection(asyncore.dispatcher):
__super_init = asyncore.dispatcher.__init__
__super_close = asyncore.dispatcher.close
__closed = 1 # Marker indicating that we're closed
__append=None # Marker indicating that we're closed socket = None # to outwit Sam's getattr
socket=None # to outwit Sam's getattr READ_SIZE = 8096
def __init__(self, sock, addr, map=None, debug=None): def __init__(self, sock, addr, map=None, debug=None):
SizedMessageAsyncConnection.inheritedAttribute( self.__super_init(sock, map)
'__init__')(self, sock, map) self.addr = addr
self.addr=addr
if debug is not None: if debug is not None:
self._debug=debug self._debug = debug
elif not hasattr(self, '_debug'): elif not hasattr(self, '_debug'):
self._debug=__debug__ and 'smac' self._debug = __debug__ and 'smac'
self.__state=None self.__state = None
self.__inp=None self.__inp = None # None, a single String, or a list
self.__inpl=0 self.__input_len = 0
self.__l=4 self.__msg_size = 4
self.__output=output=[] self.__output = []
self.__append=output.append self.__closed = None
self.__pop=output.pop
# XXX avoid expensive getattr calls?
def handle_read(self, def __nonzero__(self):
join=string.join, StringType=type(''), _type=type, return 1
_None=None):
def handle_read(self):
# Use a single __inp buffer and integer indexes to make this
# fast.
try: try:
d=self.recv(8096) d=self.recv(8096)
except socket.error, err: except socket.error, err:
if err[0] in expected_socket_read_errors: if err[0] in expected_socket_read_errors:
return return
raise raise
if not d: return if not d:
return
inp=self.__inp
if inp is _None: input_len = self.__input_len + len(d)
inp=d msg_size = self.__msg_size
elif _type(inp) is StringType: state = self.__state
inp=[inp,d]
inp = self.__inp
if msg_size > input_len:
if inp is None:
self.__inp = d
elif type(self.__inp) is StringType:
self.__inp = [self.__inp, d]
else:
self.__inp.append(d)
self.__input_len = input_len
return # keep waiting for more input
# load all previous input and d into single string inp
if isinstance(inp, StringType):
inp = inp + d
elif inp is None:
inp = d
else: else:
inp.append(d) inp.append(d)
inp = "".join(inp)
inpl=self.__inpl+len(d)
l=self.__l offset = 0
while (offset + msg_size) <= input_len:
while 1: msg = inp[offset:offset + msg_size]
offset = offset + msg_size
if l <= inpl: if state is None:
# Woo hoo, we have enough data # waiting for message
if _type(inp) is not StringType: inp=join(inp,'') msg_size = struct.unpack(">i", msg)[0]
d=inp[:l] state = 1
inp=inp[l:]
inpl=inpl-l
if self.__state is _None:
# waiting for message
l=struct.unpack(">i",d)[0]
self.__state=1
else:
l=4
self.__state=_None
self.message_input(d)
else: else:
break # not enough data msg_size = 4
state = None
self.__l=l self.message_input(msg)
self.__inp=inp
self.__inpl=inpl
def readable(self): return 1 self.__state = state
def writable(self): return not not self.__output self.__msg_size = msg_size
self.__inp = inp[offset:]
self.__input_len = input_len - offset
def readable(self):
return 1
def writable(self):
if len(self.__output) == 0:
return 0
else:
return 1
def handle_write(self): def handle_write(self):
output=self.__output output = self.__output
while output: while output:
v=output[0] v = output[0]
try: try:
n=self.send(v) n=self.send(v)
except socket.error, err: except socket.error, err:
...@@ -191,42 +214,33 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher): ...@@ -191,42 +214,33 @@ class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
break # we couldn't write anything break # we couldn't write anything
raise raise
if n < len(v): if n < len(v):
output[0]=v[n:] output[0] = v[n:]
break # we can't write any more break # we can't write any more
else: else:
del output[0] del output[0]
#break # waaa
def handle_close(self): def handle_close(self):
self.close() self.close()
def message_output(self, message, def message_output(self, message):
pack=struct.pack, len=len): if __debug__:
if self._debug: if self._debug:
if len(message) > 40: m=message[:40]+' ...' if len(message) > 40:
else: m=message m = message[:40]+' ...'
LOG(self._debug, TRACE, 'message_output %s' % `m`) else:
m = message
append=self.__append LOG(self._debug, TRACE, 'message_output %s' % `m`)
if append is None:
raise Disconnected("This action is temporarily unavailable.<p>") if self.__closed is not None:
raise Disconnected, (
append(pack(">i",len(message))+message) "This action is temporarily unavailable."
"<p>"
def log_info(self, message, type='info'): )
if type=='error': type=ERROR # do two separate appends to avoid copying the message string
else: type=INFO self.__output.append(struct.pack(">i", len(message)))
LOG('ZEO', type, message) self.__output.append(message)
log=log_info
def close(self): def close(self):
if self.__append is not None: if self.__closed is None:
self.__append=None self.__closed = 1
SizedMessageAsyncConnection.inheritedAttribute('close')(self) self.__super_close()
class Disconnected(Exception):
"""The client has become disconnected from the server
"""
"""RPC protocol for ZEO based on asyncore
The basic protocol is as:
a pickled tuple containing: msgid, flags, method, args
msgid is an integer.
flags is an integer.
The only currently defined flag is ASYNC (0x1), which means
the client does not expect a reply.
method is a string specifying the method to invoke.
For a reply, the method is ".reply".
args is a tuple of the argument to pass to method.
XXX need to specify a version number that describes the protocol.
allow for future revision.
XXX support multiple outstanding calls
XXX factor out common pattern of deciding what protocol to use based
on whether address is tuple or string
"""
import asyncore
import errno
import cPickle
import os
import select
import socket
import sys
import threading
import thread
import time
import traceback
import types
from cStringIO import StringIO
from ZODB import POSException
from ZEO import smac, trigger
from Exceptions import Disconnected
import zLOG
import ThreadedAsync
from Exceptions import Disconnected
REPLY = ".reply" # message name used for replies
ASYNC = 1
_label = "zrpc:%s" % os.getpid()
def new_label():
global _label
_label = "zrpc:%s" % os.getpid()
def log(message, level=zLOG.BLATHER, label=None, error=None):
zLOG.LOG(label or _label, level, message, error=error)
class ZRPCError(POSException.StorageError):
pass
class DecodingError(ZRPCError):
"""A ZRPC message could not be decoded."""
class DisconnectedError(ZRPCError, Disconnected):
"""The database storage is disconnected from the storage server."""
# Export the mainloop function from asycnore to zrpc clients
loop = asyncore.loop
def connect(addr, client=None):
if type(addr) == types.TupleType:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(addr)
c = Connection(s, addr, client)
return c
class Marshaller:
"""Marshal requests and replies to second across network"""
# It's okay to share a single Pickler as long as it's in fast
# mode, which means that it doesn't have a memo.
pickler = cPickle.Pickler()
pickler.fast = 1
pickle = pickler.dump
errors = (cPickle.UnpickleableError,
cPickle.UnpicklingError,
cPickle.PickleError,
cPickle.PicklingError)
def encode(self, msgid, flags, name, args):
"""Returns an encoded message"""
return self.pickle((msgid, flags, name, args), 1)
def decode(self, msg):
"""Decodes msg and returns its parts"""
unpickler = cPickle.Unpickler(StringIO(msg))
unpickler.find_global = find_global
try:
return unpickler.load() # msgid, flags, name, args
except (cPickle.UnpicklingError, IndexError), err_msg:
log("can't decode %s" % repr(msg), level=zLOG.ERROR)
raise DecodingError(msg)
class Delay:
"""Used to delay response to client for synchronous calls
When a synchronous call is made and the original handler returns
without handling the call, it returns a Delay object that prevents
the mainloop from sending a response.
"""
def set_sender(self, msgid, send_reply):
self.msgid = msgid
self.send_reply = send_reply
def reply(self, obj):
self.send_reply(self.msgid, obj)
class Connection(smac.SizedMessageAsyncConnection):
"""Dispatcher for RPC on object
The connection supports synchronous calls, which expect a return,
and asynchronous calls that do not.
It uses the Marshaller class to handle encoding and decoding of
method calls are arguments.
A Connection is designed for use in a multithreaded application,
where a synchronous call must block until a response is ready.
The current design only allows a single synchronous call to be
outstanding.
"""
__super_init = smac.SizedMessageAsyncConnection.__init__
__super_close = smac.SizedMessageAsyncConnection.close
__super_writable = smac.SizedMessageAsyncConnection.writable
def __init__(self, sock, addr, obj=None):
self.msgid = 0
self.obj = obj
self.marshal = Marshaller()
self.closed = 0
self.async = 0
# The reply lock is used to block when a synchronous call is
# waiting for a response
self.__super_init(sock, addr)
self._map = {self._fileno: self}
self._prepare_async()
self.__call_lock = thread.allocate_lock()
self.__reply_lock = thread.allocate_lock()
self.__reply_lock.acquire()
if isinstance(obj, Handler):
self.set_caller = 1
else:
self.set_caller = 0
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.addr)
def close(self):
if self.closed:
return
self.closed = 1
self.__super_close()
def register_object(self, obj):
"""Register obj as the true object to invoke methods on"""
self.obj = obj
def message_input(self, message):
"""Decoding an incoming message and dispatch it"""
# XXX Not sure what to do with errors that reach this level.
# Need to catch ZRPCErrors in handle_reply() and
# handle_request() so that they get back to the client.
try:
msgid, flags, name, args = self.marshal.decode(message)
except DecodingError, msg:
return self.return_error(None, None, sys.exc_info()[0],
sys.exc_info()[1])
if __debug__:
log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
repr(args)[:40]),
level=zLOG.DEBUG)
if name == REPLY:
self.handle_reply(msgid, flags, args)
else:
self.handle_request(msgid, flags, name, args)
def handle_reply(self, msgid, flags, args):
if __debug__:
log("recv reply: %s, %s, %s" % (msgid, flags, str(args)[:40]),
level=zLOG.DEBUG)
self.__reply = msgid, flags, args
self.__reply_lock.release() # will fail if lock is unlocked
def handle_request(self, msgid, flags, name, args):
if __debug__:
log("call %s%s on %s" % (name, repr(args)[:40], repr(self.obj)),
zLOG.DEBUG)
if not self.check_method(name):
raise ZRPCError("Invalid method name: %s on %s" % (name,
`self.obj`))
meth = getattr(self.obj, name)
try:
if self.set_caller:
self.obj.set_caller(self)
try:
ret = meth(*args)
finally:
self.obj.clear_caller()
else:
ret = meth(*args)
except (POSException.UndoError,
POSException.VersionCommitError), msg:
error = sys.exc_info()
log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
return self.return_error(msgid, flags, error[0], error[1])
except Exception, msg:
error = sys.exc_info()
log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
return self.return_error(msgid, flags, error[0], error[1])
if flags & ASYNC:
if ret is not None:
log("async method %s returned value %s" % (name, repr(ret)),
zLOG.ERROR)
raise ZRPCError("async method returned value")
else:
if __debug__:
log("%s return %s" % (name, repr(ret)[:40]), zLOG.DEBUG)
if isinstance(ret, Delay):
ret.set_sender(msgid, self.send_reply)
else:
self.send_reply(msgid, ret)
def handle_error(self):
self.log_error()
self.close()
def log_error(self, msg="No error message supplied"):
error = sys.exc_info()
log(msg, zLOG.ERROR, error=error)
del error
def check_method(self, name):
# XXX minimal security check should go here: Is name exported?
return hasattr(self.obj, name)
def send_reply(self, msgid, ret):
msg = self.marshal.encode(msgid, 0, REPLY, ret)
self.message_output(msg)
def return_error(self, msgid, flags, err_type, err_value):
if flags is None:
self.log_error("Exception raised during decoding")
return
if flags & ASYNC:
self.log_error("Asynchronous call raised exception: %s" % self)
return
if type(err_value) is not types.InstanceType:
err_value = err_type, err_value
try:
msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
except self.marshal.errors:
err = ZRPCError("Couldn't pickle error %s" % `err_value`)
msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
self._do_io()
# The next two methods are used by clients to invoke methods on
# remote objects
# XXX Should revise design to allow multiple outstanding
# synchronous calls
def call(self, method, *args):
self.__call_lock.acquire()
try:
return self._call(method, args)
finally:
self.__call_lock.release()
def _call(self, method, args):
if self.closed:
raise DisconnectedError("This action is temporarily unavailable")
msgid = self.msgid
self.msgid = self.msgid + 1
if __debug__:
log("send msg: %d, 0, %s, ..." % (msgid, method))
self.message_output(self.marshal.encode(msgid, 0, method, args))
self.__reply = None
# lock is currently held
self._do_io(wait=1)
# lock is held again...
r_msgid, r_flags, r_args = self.__reply
self.__reply_lock.acquire()
assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
if type(r_args) == types.TupleType \
and type(r_args[0]) == types.ClassType \
and issubclass(r_args[0], Exception):
raise r_args[1] # error raised by server
return r_args
def callAsync(self, method, *args):
self.__call_lock.acquire()
try:
self._callAsync(method, args)
finally:
self.__call_lock.release()
def _callAsync(self, method, args):
if self.closed:
raise DisconnectedError("This action is temporarily unavailable")
msgid = self.msgid
self.msgid += 1
if __debug__:
log("send msg: %d, %d, %s, ..." % (msgid, ASYNC, method))
self.message_output(self.marshal.encode(msgid, ASYNC, method, args))
self._do_io()
# handle IO, possibly in async mode
def sync(self):
pass # XXX what is this supposed to do?
def _prepare_async(self):
self._async = 0
ThreadedAsync.register_loop_callback(self.set_async)
# XXX If we are not in async mode, this will cause dead
# Connections to be leaked.
def set_async(self, map):
# XXX do we need a lock around this? I'm not sure there is
# any harm to a race with _do_io().
self._async = 1
self.trigger = trigger.trigger()
def is_async(self):
return self._async
def _do_io(self, wait=0): # XXX need better name
# XXX invariant? lock must be held when calling with wait==1
# otherwise, in non-async mode, there will be no poll
if __debug__:
log("_do_io(wait=%d), async=%d" % (wait, self.is_async()),
level=zLOG.DEBUG)
if self.is_async():
self.trigger.pull_trigger()
if wait:
self.__reply_lock.acquire()
# wait until reply...
self.__reply_lock.release()
else:
if wait:
# do loop only if lock is already acquired
while not self.__reply_lock.acquire(0):
asyncore.poll(10.0, self._map)
if self.closed:
raise Disconnected()
self.__reply_lock.release()
else:
asyncore.poll(0.0, self._map)
# XXX it seems that we need to release before returning if
# called with wait==1. perhaps the caller need not acquire
# upon return...
class ServerConnection(Connection):
# XXX this is a hack
def _do_io(self, wait=0):
"""If this is a server, there is no explicit IO to do"""
pass
class ConnectionManager:
"""Keeps a connection up over time"""
# XXX requires that obj implement notifyConnected and
# notifyDisconnected. make this optional?
def __init__(self, addr, obj=None, debug=1, tmin=1, tmax=180):
self.set_addr(addr)
self.obj = obj
self.tmin = tmin
self.tmax = tmax
self.debug = debug
self.connected = 0
self.connection = None
# If _thread is not None, then there is a helper thread
# attempting to connect. _thread is protected by _connect_lock.
self._thread = None
self._connect_lock = threading.Lock()
self.trigger = None
self.async = 0
self.closed = 0
ThreadedAsync.register_loop_callback(self.set_async)
def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addr)
def set_addr(self, addr):
"Set one or more addresses to use for server."
# For backwards compatibility (and simplicity?) the
# constructor accepts a single address in the addr argument --
# a string for a Unix domain socket or a 2-tuple with a
# hostname and port. It can also accept a list of such addresses.
addr_type = self._guess_type(addr)
if addr_type is not None:
self.addr = [(addr_type, addr)]
else:
self.addr = []
for a in addr:
addr_type = self._guess_type(a)
if addr_type is None:
raise ValueError, "unknown address in list: %s" % repr(a)
self.addr.append((addr_type, a))
def _guess_type(self, addr):
if isinstance(addr, types.StringType):
return socket.AF_UNIX
if (len(addr) == 2
and isinstance(addr[0], types.StringType)
and isinstance(addr[1], types.IntType)):
return socket.AF_INET
# not anything I know about
return None
def close(self):
"""Prevent ConnectionManager from opening new connections"""
self.closed = 1
self._connect_lock.acquire()
try:
if self._thread is not None:
self._thread.join()
finally:
self._connect_lock.release()
if self.connection:
self.connection.close()
def register_object(self, obj):
self.obj = obj
def set_async(self, map):
# XXX need each connection started with async==0 to have a callback
self.async = 1 # XXX needs to be set on the Connection
self.trigger = trigger.trigger()
def connect(self, sync=0):
if self.connected == 1:
return
self._connect_lock.acquire()
try:
if self._thread is None:
zLOG.LOG(_label, zLOG.BLATHER,
"starting thread to connect to server")
self._thread = threading.Thread(target=self.__m_connect)
self._thread.start()
if sync:
try:
self._thread.join()
except AttributeError:
# probably means the thread exited quickly
pass
finally:
self._connect_lock.release()
def attempt_connect(self):
# XXX will _attempt_connects() take too long? think select().
self._attempt_connects()
return self.connected
def notify_closed(self, conn):
self.connected = 0
self.connection = None
self.obj.notifyDisconnected()
if not self.closed:
self.connect()
class Connected(Exception):
def __init__(self, sock):
self.sock = sock
def __m_connect(self):
# a new __connect that handles multiple addresses
try:
delay = self.tmin
while not (self.closed or self._attempt_connects()):
time.sleep(delay)
delay *= 2
if delay > self.tmax:
delay = self.tmax
finally:
self._thread = None
def _attempt_connects(self):
"Return true if any connect attempt succeeds."
sockets = {}
zLOG.LOG(_label, zLOG.BLATHER,
"attempting connection on %d sockets" % len(self.addr))
try:
for domain, addr in self.addr:
if __debug__:
zLOG.LOG(_label, zLOG.DEBUG,
"attempt connection to %s" % repr(addr))
s = socket.socket(domain, socket.SOCK_STREAM)
s.setblocking(0)
# XXX can still block for a while if addr requires DNS
e = self._connect_ex(s, addr)
if e is not None:
sockets[s] = addr
# next wait until the actually connect
while sockets:
if self.closed:
for s in sockets.keys():
s.close()
return 0
try:
r, w, x = select.select([], sockets.keys(), [], 1.0)
except select.error:
continue
for s in w:
e = self._connect_ex(s, sockets[s])
if e is None:
del sockets[s]
except self.Connected, container:
s = container.sock
del sockets[s]
# close all the other sockets
for s in sockets.keys():
s.close()
return 1
return 0
def _connect_ex(self, s, addr):
"""Call s.connect_ex(addr) and return true if loop should continue.
We have to handle several possible return values from
connect_ex(). If the socket is connected and the initial ZEO
setup works, we're done. Report success by raising an
exception. Yes, the is odd, but we need to bail out of the
select() loop in the caller and an exception is a principled
way to do the abort.
If the socket sonnects and the initial ZEO setup fails or the
connect_ex() returns an error, we close the socket and ignore it.
If connect_ex() returns EINPROGRESS, we need to try again later.
"""
e = s.connect_ex(addr)
if e == errno.EINPROGRESS:
return 1
elif e == 0:
c = self._test_connection(s, addr)
zLOG.LOG(_label, zLOG.DEBUG, "connected to %s" % repr(addr))
if c:
self.connected = 1
raise self.Connected(s)
else:
if __debug__:
zLOG.LOG(_label, zLOG.DEBUG,
"error connecting to %s: %s" % (addr,
errno.errorcode[e]))
s.close()
def _test_connection(self, s, addr):
c = ManagedConnection(s, addr, self.obj, self)
try:
self.obj.notifyConnected(c)
self.connection = c
return 1
except:
# XXX zLOG the error
c.close()
return 0
class ManagedServerConnection(ServerConnection):
"""A connection that notifies its ConnectionManager of closing"""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.__mgr = mgr
self.__super_init(sock, addr, obj)
def close(self):
self.__super_close()
self.__mgr.close(self)
class ManagedConnection(Connection):
"""A connection that notifies its ConnectionManager of closing.
A managed connection also defers the ThreadedAsync work to its
manager.
"""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.__mgr = mgr
if self.__mgr.async:
self.__async = 1
self.trigger = self.__mgr.trigger
else:
self.__async = None
self.__super_init(sock, addr, obj)
def _prepare_async(self):
# Don't do the register_loop_callback that the superclass does
pass
def is_async(self):
if self.__async:
return 1
async = self.__mgr.async
if async:
self.__async = 1
self.trigger = self.__mgr.trigger
return async
def close(self):
self.__super_close()
self.__mgr.notify_closed(self)
class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
__super_init = asyncore.dispatcher.__init__
reuse_addr = 1
def __init__(self, addr, obj=None, factory=Connection, reuse_addr=None):
self.__super_init()
self.addr = addr
self.obj = obj
self.factory = factory
self.clients = []
if reuse_addr is not None:
self.reuse_addr = reuse_addr
self._open_socket()
def _open_socket(self):
if type(self.addr) == types.TupleType:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error, msg:
log("accepted failed: %s" % msg)
return
c = self.factory(sock, addr, self.obj)
log("connect from %s: %s" % (repr(addr), c))
self.clients.append(c)
class Handler:
"""Base class used to handle RPC caller discovery"""
def set_caller(self, addr):
self.__caller = addr
def get_caller(self):
return self.__caller
def clear_caller(self):
self.__caller = None
_globals = globals()
_silly = ('__doc__',)
def find_global(module, name):
"""Helper for message unpickler"""
try:
m = __import__(module, _globals, _globals, _silly)
except ImportError, msg:
raise ZRPCError("import error %s: %s" % (module, msg))
try:
r = getattr(m, name)
except AttributeError:
raise ZRPCError("module %s has no global %s" % (module, name))
safe = getattr(r, '__no_side_effects__', 0)
if safe:
return r
if type(r) == types.ClassType and issubclass(r, Exception):
return r
raise ZRPCError("Unsafe global: %s.%s" % (module, name))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment