Commit f96776b3 authored by Jim Fulton's avatar Jim Fulton

Changed load_before and preftech to use generators rather than callbacks

parent db9d3da5
...@@ -8,6 +8,7 @@ else: ...@@ -8,6 +8,7 @@ else:
from ZEO.Exceptions import ClientDisconnected from ZEO.Exceptions import ClientDisconnected
from ZODB.ConflictResolution import ResolvedSerial from ZODB.ConflictResolution import ResolvedSerial
import concurrent.futures import concurrent.futures
import functools
import logging import logging
import random import random
import threading import threading
...@@ -27,6 +28,37 @@ Fallback = object() ...@@ -27,6 +28,37 @@ Fallback = object()
local_random = random.Random() # use separate generator to facilitate tests local_random = random.Random() # use separate generator to facilitate tests
def future_generator(func):
"""Decorates a generator that generates futures
"""
@functools.wraps(func)
def call_generator(*args, **kw):
gen = func(*args, **kw)
try:
f = next(gen)
except StopIteration:
gen.close()
else:
def store(gen, future):
@future.add_done_callback
def _(future):
try:
try:
result = future.result()
except Exception as exc:
f = gen.throw(exc)
else:
f = gen.send(result)
except StopIteration:
gen.close()
else:
store(gen, f)
store(gen, f)
return call_generator
class Protocol(base.Protocol): class Protocol(base.Protocol):
"""asyncio low-level ZEO client interface """asyncio low-level ZEO client interface
""" """
...@@ -249,7 +281,7 @@ class Protocol(base.Protocol): ...@@ -249,7 +281,7 @@ class Protocol(base.Protocol):
self.futures[message_id] = future self.futures[message_id] = future
self._write( self._write(
self.encode(message_id, False, 'loadBefore', (oid, tid))) self.encode(message_id, False, 'loadBefore', (oid, tid)))
return future.add_done_callback return future
# Methods called by the server. # Methods called by the server.
# WARNING WARNING we can't call methods that call back to us # WARNING WARNING we can't call methods that call back to us
...@@ -525,35 +557,33 @@ class Client(object): ...@@ -525,35 +557,33 @@ class Client(object):
# Special methods because they update the cache. # Special methods because they update the cache.
@future_generator
def load_before_threadsafe(self, future, oid, tid): def load_before_threadsafe(self, future, oid, tid):
data = self.cache.loadBefore(oid, tid) data = self.cache.loadBefore(oid, tid)
if data is not None: if data is not None:
future.set_result(data) future.set_result(data)
elif self.ready: elif self.ready:
@self.protocol.load_before(oid, tid)
def load_before(load_future):
try:
data = load_future.result()
future.set_result(data)
if data:
data, start, end = data
self.cache.store(oid, start, end, data)
except Exception as exc:
future.set_exception(exc)
else:
self._when_ready(self.load_before_threadsafe, future, oid, tid)
def _prefetch(self, oid, tid):
@self.protocol.load_before(oid, tid)
def load_before(load_future):
try: try:
data = load_future.result() data = yield self.protocol.load_before(oid, tid)
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(data)
if data: if data:
data, start, end = data data, start, end = data
self.cache.store(oid, start, end, data) self.cache.store(oid, start, end, data)
except Exception: else:
logger.exception("prefetch %r %r" % (oid, tid)) self._when_ready(self.load_before_threadsafe, future, oid, tid)
@future_generator
def _prefetch(self, oid, tid):
try:
data = yield self.protocol.load_before(oid, tid)
if data:
data, start, end = data
self.cache.store(oid, start, end, data)
except Exception:
logger.exception("prefetch %r %r" % (oid, tid))
def prefetch(self, future, oids, tid): def prefetch(self, future, oids, tid):
if self.ready: if self.ready:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment