Commit 27eb6c44 authored by Jim Fulton's avatar Jim Fulton

Got it basically working

parent d7dd0efd
......@@ -47,12 +47,13 @@
##############################################################################
"""Network ZODB storage client
"""
__version__='$Revision: 1.1 $'[11:-2]
__version__='$Revision: 1.2 $'[11:-2]
import struct, time, os, socket, cPickle, string, Sync, zrpc
now=time.time
from struct import pack, unpack
from ZODB import POSException, BaseStorage
from ZODB.TimeStamp import TimeStamp
TupleType=type(())
......@@ -64,9 +65,19 @@ class ClientStorage(BaseStorage.BaseStorage):
def __init__(self, connection, async=0):
if async: self._call=zrpc.async(connection)
else: self._call=zrpc.sync(connection)
if async:
import asyncore
def loop(timeout=30.0, use_poll=0,
self=self, asyncore=asyncore, loop=asyncore.loop):
self.becomeAsync()
asyncore.loop=loop
loop(timeout, use_poll)
asyncore.loop=loop
self._call=zrpc.sync(connection)
self.__begin='tpc_begin_sync'
self._call._write('1')
info=self._call('get_info')
self._len=info.get('length',0)
self._size=info.get('size',0)
......@@ -78,6 +89,10 @@ class ClientStorage(BaseStorage.BaseStorage):
info.get('name', str(connection)),
)
def becomeAsync(self):
self._call=zrpc.async(self._call)
self.__begin='tpc_begin'
def registerDB(self, db, limit):
def invalidate(code, args,
......@@ -100,7 +115,7 @@ class ClientStorage(BaseStorage.BaseStorage):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try: return self._call('abortVersion', src, transaction.id)
try: return self._call('abortVersion', src, self._serial)
finally: self._lock_release()
def close(self):
......@@ -112,7 +127,7 @@ class ClientStorage(BaseStorage.BaseStorage):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try: return self._call('commitVersion', src, dest, transaction.id)
try: return self._call('commitVersion', src, dest, self._serial)
finally: self._lock_release()
def getName(self): return self.__name__
......@@ -151,7 +166,7 @@ class ClientStorage(BaseStorage.BaseStorage):
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try: return self._call('store', oid, serial,
data, version, transaction.id)
data, version, self._serial)
finally: self._lock_release()
def supportsUndo(self): return self._supportsUndo
......@@ -161,7 +176,7 @@ class ClientStorage(BaseStorage.BaseStorage):
self._lock_acquire()
try:
if transaction is not self._transaction: return
self._call('tpc_abort', id)
self._call('tpc_abort', self._serial)
self._transaction=None
self._commit_lock_release()
finally: self._lock_release()
......@@ -171,59 +186,43 @@ class ClientStorage(BaseStorage.BaseStorage):
try:
if self._transaction is transaction: return
while 1:
self._lock_release()
self._commit_lock_acquire()
self._lock_acquire()
if self._call('tpc_begin', id, user, desc, ext) is None:
break
self._transaction=transaction
self._clear_temp()
user=transaction.user
desc=transaction.description
ext=transaction._extension
if ext: ext=dumps(ext,1)
else: ext=""
self._ude=user, desc, ext
t=time.time()
t=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
self._ts=t=t.laterThan(self._ts)
self._serial=`t`
self._serial=id=`t`
while 1:
self._lock_release()
self._commit_lock_acquire()
self._lock_acquire()
if self._call(self.__begin, id, user, desc, ext) is None:
break
self._begin(self._serial, user, desc, ext)
self._transaction=transaction
finally: self._lock_release()
def tpc_finish(self, transaction, f=None):
self._lock_acquire()
try:
if transaction is not self._transaction: return
if f is not None: f()
u,d,e=self._ude
self._finish(self._serial, u, d, e)
self._call('tpc_finish', self._serial,
transaction.user,
transaction.description,
transaction._extension)
self._clear_temp()
self._ude=None
self._transaction=None
self._commit_lock_release()
finally: self._lock_release()
def _finish(self, tid, u, d, e):
pass
def _finish(self, id, user, desc, ext):
return self._call('tpc_finish', id, user, desc, ext)
def undo(self, transaction_id):
return self._call('undo', transaction_id)
self._lock_acquire()
try: return self._call('undo', transaction_id)
finally: self._lock_release()
def undoLog(self, version, first, last, filter=None):
......@@ -248,5 +247,4 @@ class ClientStorage(BaseStorage.BaseStorage):
self._lock_acquire()
try: return self._call('versionEmpty', max)
finally: self._lock_release()
Zope Enterprize Option, iteration 1
Put this package in your Zope lib/python.
To start the storage server, go to your Zope install directory and::
lib/python/ZEO/start.py -p port_number
(Run start without arguments to see options.)
To get Zope to use the server, create a custom_zodb module that
uses a ClientStorage::
import ZEO.ClientStorage
Storage=ZEO.ClientStorage.ClientStorage(('',port_number), async=1)
You can specify a host name (rather than '') if you want.
The port number is, of course, the port number used to start the
storage server. The async switch tells the client to switch
itself to async mode (if and) when the asyncore main loop is called.
......@@ -5,10 +5,10 @@ from ZODB import POSException
from ZODB.Transaction import Transaction
import traceback
class StorageServerError(POSException.ServerError): pass
class StorageServerError(POSException.StorageError): pass
class Server(asyncore.dispatcher):
class StorageServer(asyncore.dispatcher):
def __init__(self, connection, storages):
......@@ -36,6 +36,18 @@ class Server(asyncore.dispatcher):
if connections is None:
self.__connections[storage_id]=connections=[]
connections.append(connection)
return storage, storage_id
def unregister_connection(self, connection, storage_id):
connections=self.__get_connections(storage_id, None)
if connections:
n=[]
for c in connections:
if c is not connection:
n.append(c)
self.__connections[storage_id]=n
def invalidate(self, connection, storage_id, invalidated,
dumps=cPickle.dumps):
......@@ -62,12 +74,13 @@ class Server(asyncore.dispatcher):
storage_methods={}
for n in ('get_info', 'abortVersion', 'commitVersion', 'history',
'load', 'modifiedInVersion', 'new_oid', 'pack', 'store',
'tpc_abort', 'tpc_begin', 'tpc_finish', 'undo', 'undoLog',
'tpc_abort', 'tpc_begin', 'tpc_begin_sync', 'tpc_finish', 'undo',
'undoLog',
'versionEmpty'):
storage_methods[n]=1
storage_method=storage_methods.has_key
_noreturn=[]
class Connection(smac):
_transaction=None
......@@ -76,14 +89,25 @@ class Connection(smac):
def __init__(self, server, sock, addr):
smac.__init__(self, sock, addr)
self.__server=server
self.__storage=server.storage
self.__invalidated=[]
self.__closed=None
def close(self):
t=self._transaction
if (t is not None and self.__storage is not None and
self.__storage._transaction is t):
self.tpc_abort(t.id)
self.__server.unregister_connection(self, self.__storage_id)
self.__closed=1
smac.close(self)
def message_input(self, message):
if __debug__:
m=`message`
if len(m) > 60: m=m[:60]+' ...'
print 'message_input', m
if self.__storage is None:
self.__storage, self.__storage_id = (
self.__server.register_connection(self, message))
......@@ -93,18 +117,29 @@ class Connection(smac):
try:
args=cPickle.loads(message)
name, args = args[0], args[1:]
if __debug__:
m=`tuple(args)`
if len(m) > 60: m=m[:60]+' ...'
print 'call: %s%s' % (name, m)
if not storage_method(name):
raise 'Invalid Method Name', name
if hasattr(self, name):
r=apply(getattr(self, name), args)
else:
r=apply(getattr(self.__storage, name), args)
if r is _noreturn: return
except:
traceback.print_exc()
t, r = sys.exc_info()[:2]
if type(r) is not type(self): r=t,r
rt='E'
if __debug__:
m=`r`
if len(m) > 60: m=m[:60]+' ...'
print '%s: %s' % (rt, m)
r=cPickle.dumps(r,1)
self.message_output(rt+r)
......@@ -122,47 +157,90 @@ class Connection(smac):
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
newserial=self.__storage.store(oid, data, serial, version, t)
newserial=self.__storage.store(oid, serial, data, version, t)
if serial != '\0\0\0\0\0\0\0\0':
self.__invalidated.append(oid, serial, version)
return newserial
def unlock(self):
self.message_output('UN')
def tpc_abort(self, id):
t=self._transaction
if t is None or id != t.id: return
r=self.__storage.tpc_abort(t)
for c in self.__storage.__waiting: c.unlock()
storage=self.__storage
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
while waiting:
f, args = waiting.pop(0)
if apply(f,args): break
self._transaction=None
self.__invalidated=[]
def unlock(self):
if self.__closed: return
self.message_output('UN.')
def tpc_begin(self, id, user, description, ext):
t=self._transaction
if t is not None and id == t.id: return
storage=self.__storage
if storage._transaction is not None:
storage.__waiting.append(self)
return 1
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
waiting.append(self.unlock, ())
return 1 # Return a flag indicating a lock condition.
self._transaction=t=Transaction()
t.id=id
t.user=user
t.description=description
storage.tpc_begin(t)
storage.__waiting=[]
self.__invalidated=[]
def tpc_begin_sync(self, id, user, description, ext):
if self.__closed: return
t=self._transaction
if t is not None and id == t.id: return
storage=self.__storage
if storage._transaction is None:
self.try_again_sync(id, user, description, ext)
else:
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
waiting.append(self.try_again_sync, (id, user, description, ext))
return _noreturn
def try_again_sync(self, id, user, description, ext):
storage=self.__storage
if storage._transaction is None:
self._transaction=t=Transaction()
t.id=id
t.user=user
t.description=description
storage.tpc_begin(t)
self.__invalidated=[]
self.message_output('RN.')
return 1
def tpc_finish(self, id, user, description, ext):
t=self._transaction
if id != t.id: return
t.user=user
t.description=description
r=self.__storage.tpc_finish(t)
for c in self.__storage.__waiting: c.unlock()
t.ext=ext
storage=self.__storage
r=storage.tpc_finish(t)
try: waiting=storage.__waiting
except: waiting=storage.__waiting=[]
while waiting:
f, args = waiting.pop(0)
if apply(f,args): break
self._transaction=None
self.__server.invalidate(self, self.__storage_id, self.__invalidated)
self.__invalidated=[]
......@@ -171,7 +249,8 @@ class Connection(smac):
if __name__=='__main__':
import ZODB.FileStorage
name, port = sys.argv[1:3]
print name, port
try: port='',string.atoi(port)
except: pass
Server(port, ZODB.FileStorage.FileStorage(name))
StorageServer(port, ZODB.FileStorage.FileStorage(name))
asyncore.loop()
......@@ -61,4 +61,8 @@ class smac(asyncore.dispatcher):
def message_output(self, message,
pack=struct.pack, len=len):
if __debug__:
if len(message) > 40: m=message[:40]+' ...'
else: m=message
print 'message_output', `m`
self.__append(pack(">i",len(message))+message)
# Start the server storage
import sys, os, getopt, string
def directory(p):
d=os.path.split(p)[0]
if not d or d=='.': return os.getcwd()
return d
def main(argv):
me=argv[0]
sys.path[:]==filter(None, sys.path)
sys.path.insert(0, directory(directory(me)))
args=[]
for a in argv[1:]:
if string.find(a, '=') > 0:
a=string.split(a,'=')
os.environ[a[0]]=string.join(a[1:],'=')
continue
args.append(a)
opts, args = getopt.getopt(args, 'p:Dh:')
fs=directory(directory(directory(directory(me))))+'/var/Data.fs'
usage="""%s -p port [options] [filename]
where options are:
-D -- Run in debug mode
-h -- host address to listen on
if no file name is specified, then %s is used.
""" % (me, fs)
port=None
debug=0
host=''
for o, v in opts:
if o=='-p': port=string.atoi(v)
elif o=='-h': host=v
elif o=='-D': debug=1
if port is None:
print usage
print 'No port specified.'
sys.exit(1)
if args:
if len(args) > 1:
print usage
print 'Unrecognizd arguments: ', string.join(args[1:])
sys.exit(1)
fs=args[0]
__builtins__.__debug__=debug
import ZEO.StorageServer, ZODB.FileStorage, asyncore
print 'Serving', fs
ZEO.StorageServer.StorageServer(
(host,port),
{
'1': ZODB.FileStorage.FileStorage(fs)
},
)
asyncore.loop()
if __name__=='__main__': main(sys.argv)
......@@ -2,10 +2,14 @@
"""
from cPickle import dumps, loads
from ThreadLock import allocate_lock
import socket, smac, string, struct
from thread import allocate_lock
from smac import smac
import socket, string, struct
TupleType=type(())
Wakeup=None
class sync:
"""Synchronous rpc"""
......@@ -15,10 +19,12 @@ class sync:
host, port = connection
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(host, port)
self.__s=s
self._sync__s=s
self._outOfBand=outOfBand
def setOutOfBand(self, f): self._outOfBand=f
def close(self): self._sync__s.close()
def __call__(self, *args):
args=dumps(args,1)
......@@ -39,7 +45,7 @@ class sync:
raise UnrecognizedResult, r
def _write(self, data, pack=struct.pack):
send=self.__s.send
send=self._sync__s.send
h=pack(">i", len(data))
l=len(h)
while l > 0:
......@@ -53,7 +59,7 @@ class sync:
l=l-sent
def _read(self, _st=type(''), join=string.join, unpack=struct.unpack):
recv=self.__s.recv
recv=self._sync__s.recv
l=4
......@@ -78,24 +84,45 @@ class sync:
if type(data) is not _st: data=join(data,'')
return data
class async(smac.smac, sync):
class async(smac, sync):
def __init__(self, connection, outOfBand=None):
host, port = connection
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(host, port)
self._outOfBand=outOfBand
try:
host, port = connection
except:
s=connection._sync__s
self._outOfBand=connection._outOfBand
else:
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(host, port)
self._outOfBand=outOfBand
l=allocate_lock()
self.__la=l.acquire
self.__lr=l.release
self.__r=None
l.acquire()
smac.__init__(self, s, None)
global Wakeup
if Wakeup is None:
import ZServer.PubCore.ZEvent
Wakeup=ZServer.PubCore.ZEvent.Wakeup
def _write(self, data): self.message_output(data)
def _write(self, data):
self.message_output(data)
Wakeup() # You dumb bastard
def message_input(self, m):
if __debug__:
md=`m`
if len(m) > 60: md=md[:60]+' ...'
print 'message_input', md
c=m[:1]
if c in 'RE':
self.__r=m
......
......@@ -61,4 +61,8 @@ class smac(asyncore.dispatcher):
def message_output(self, message,
pack=struct.pack, len=len):
if __debug__:
if len(message) > 40: m=message[:40]+' ...'
else: m=message
print 'message_output', `m`
self.__append(pack(">i",len(message))+message)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment