Commit c8e6d938 authored by Jeremy Hylton's avatar Jeremy Hylton

Implement synchronous pack() invocation with new SlowMethodThread class.

parent c205d437
......@@ -240,32 +240,20 @@ class ZEOStorage:
def endZeoVerify(self):
self.client.endVerify()
def pack(self, t, wait=None):
def pack(self, time, wait=None):
if wait is not None:
wait = MTDelay()
t = threading.Thread(target=self._pack, args=(t, wait))
t.start()
if wait is not None:
return wait
return run_in_thread(self._pack, time)
else:
# If the client isn't waiting for a reply, start a thread
# and forget about it.
t = threading.Thread(target=self._pack, args=(time,))
t.start()
return None
def _pack(self, t, delay):
try:
self.__storage.pack(t, referencesf)
except:
self._log('Pack failed for %s' % self.__storage_id,
zLOG.ERROR,
error=sys.exc_info())
if delay is not None:
raise
else:
if delay is None:
# Broadcast new size statistics
self.server.invalidate(0, self.__storage_id, (),
self.get_size_info())
else:
delay.reply(None)
def _pack(self, time):
self.__storage.pack(time, referencesf)
# Broadcast new size statistics
self.server.invalidate(0, self.__storage_id, (), self.get_size_info())
def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100"""
......@@ -583,3 +571,35 @@ class DelayedCommitStrategy:
new_strategy.store(oid, serial, data, version)
meth = getattr(new_strategy, self.name)
return meth(*self.args)
def run_in_thread(method, *args):
t = SlowMethodThread(method, args)
t.start()
return t.delay
class SlowMethodThread(threading.Thread):
"""Thread to run potentially slow storage methods.
Clients can use the delay attribute to access the MTDelay object
used to send a zrpc response at the right time.
"""
# Some storage methods can take a long time to complete. If we
# run these methods via a standard asyncore read handler, they
# will block all other server activity until they complete. To
# avoid blocking, we spawn a separate thread, return an MTDelay()
# object, and have the thread reply() when it finishes.
def __init__(self, method, args):
threading.Thread.__init__(self)
self._method = method
self._args = args
self.delay = MTDelay()
def run(self):
try:
result = self._method(*self._args)
except Exception:
self.delay.error(sys.exc_info())
else:
self.delay.reply(result)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment