Commit bcfc3e36 authored by Jim Fulton's avatar Jim Fulton

The storage server is now multi-threaded.

parent 3c0e51bd
......@@ -1346,7 +1346,7 @@ class ClientStub:
self.rpc.callAsync('endVerify')
def invalidateTransaction(self, tid, args):
self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
self.rpc.callAsync('invalidateTransaction', tid, args)
def serialnos(self, arg):
self.rpc.callAsyncNoPoll('serialnos', arg)
......@@ -1372,11 +1372,11 @@ class ClientStub:
class ClientStub308(ClientStub):
def invalidateTransaction(self, tid, args):
self.rpc.callAsyncNoPoll(
'invalidateTransaction', tid, [(arg, '') for arg in args])
ClientStub.invalidateTransaction(
self, tid, [(arg, '') for arg in args])
def invalidateVerify(self, oid):
self.rpc.callAsync('invalidateVerify', (oid, ''))
ClientStub.invalidateVerify(self, (oid, ''))
class ZEOStorage308Adapter:
......
......@@ -560,14 +560,17 @@ class ManagedServerConnection(Connection):
# Exception types that should not be logged:
unlogged_exception_types = (ZODB.POSException.POSKeyError, )
# Servers use a shared server trigger that uses the asyncore socket map
trigger = ZEO.zrpc.trigger.trigger()
call_from_thread = trigger.pull_trigger
def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr
Connection.__init__(self, sock, addr, obj, 'S')
map = {}
Connection.__init__(self, sock, addr, obj, 'S', map=map)
self.marshal = ServerMarshaller()
self.trigger = ZEO.zrpc.trigger.trigger(map)
self.call_from_thread = self.trigger.pull_trigger
t = threading.Thread(target=server_loop, args=(map,))
t.setDaemon(True)
t.start()
def handshake(self):
# Send the server's preferred protocol to the client.
......@@ -601,6 +604,13 @@ class ManagedServerConnection(Connection):
poll = smac.SizedMessageAsyncConnection.handle_write
def server_loop(map):
while len(map) > 1:
asyncore.poll(30.0, map)
for o in map.values():
o.close()
class ManagedClientConnection(Connection):
"""Client-side Connection subclass."""
__super_init = Connection.__init__
......@@ -714,10 +724,6 @@ class ManagedClientConnection(Connection):
self.trigger.pull_trigger()
# Delay used when we call asyncore.poll() directly.
# Start with a 1 msec delay, double until 1 sec.
delay = 0.001
self.replies_cond.acquire()
try:
while 1:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment