Commit 1a38d378 authored by Jim Fulton's avatar Jim Fulton

Added a prefetch method.

parent 37c66789
...@@ -504,8 +504,15 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage): ...@@ -504,8 +504,15 @@ class ClientStorage(ZODB.ConflictResolution.ConflictResolvingStorage):
return result[:2] return result[:2]
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
result = self._cache.loadBefore(oid, tid)
if result:
return result
return self._server.load_before(oid, tid) return self._server.load_before(oid, tid)
def prefetch(self, oids, tid):
self._server.prefetch(oids, tid)
def new_oid(self): def new_oid(self):
"""Storage API: return a new object identifier. """Storage API: return a new object identifier.
""" """
......
...@@ -544,6 +544,27 @@ class Client(object): ...@@ -544,6 +544,27 @@ class Client(object):
else: else:
self._when_ready(self.load_before_threadsafe, future, oid, tid) self._when_ready(self.load_before_threadsafe, future, oid, tid)
def _prefetch(self, oid, tid):
@self.protocol.load_before(oid, tid)
def load_before(load_future):
try:
data = load_future.result()
if data:
data, start, end = data
self.cache.store(oid, start, end, data)
except Exception:
logger.exception("prefetch %r %r" % (oid, tid))
def prefetch(self, future, oids, tid):
if self.ready:
for oid in oids:
if self.cache.loadBefore(oid, tid) is None:
self._prefetch(oid, tid)
future.set_result(None)
else:
future.set_exception(ClientDisconnected())
def tpc_finish_threadsafe(self, future, tid, updates, f): def tpc_finish_threadsafe(self, future, tid, updates, f):
if self.ready: if self.ready:
@self.protocol.promise('tpc_finish', tid) @self.protocol.promise('tpc_finish', tid)
...@@ -662,6 +683,9 @@ class ClientRunner(object): ...@@ -662,6 +683,9 @@ class ClientRunner(object):
def async_iter(self, it): def async_iter(self, it):
return self.__call(self.client.call_async_iter_threadsafe, it) return self.__call(self.client.call_async_iter_threadsafe, it)
def prefetch(self, oids, tid):
return self.__call(self.client.prefetch, oids, tid)
def load_before(self, oid, tid): def load_before(self, oid, tid):
return self.__call(self.client.load_before_threadsafe, oid, tid) return self.__call(self.client.load_before_threadsafe, oid, tid)
......
...@@ -513,6 +513,50 @@ ZEOStorage as closed and see if trying to get a lock cleans it up: ...@@ -513,6 +513,50 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
>>> logging.getLogger('ZEO').removeHandler(handler) >>> logging.getLogger('ZEO').removeHandler(handler)
""" """
def test_prefetch(self):
"""The client storage prefetch method pre-fetches from the server
>>> count = 99
>>> import ZEO
>>> addr, stop = ZEO.server()
>>> conn = ZEO.connection(addr)
>>> root = conn.root()
>>> cls = root.__class__
>>> for i in range(count):
... root[i] = cls()
>>> conn.transaction_manager.commit()
>>> oids = [root[i]._p_oid for i in range(count)]
>>> conn.close()
>>> conn = ZEO.connection(addr)
>>> storage = conn.db().storage
>>> len(storage._cache)
1
>>> storage.prefetch(oids, conn._storage._start)
The prefetch returns before the cache is filled:
>>> len(storage._cache) < count
True
But it is filled eventually:
>>> from zope.testing.wait import wait
>>> wait(lambda : len(storage._cache) > count)
>>> loads = storage.server_status()['loads']
Now if we reload the data, it will be satisfied from the cache:
>>> for oid in oids:
... _ = conn._storage.load(oid)
>>> storage.server_status()['loads'] == loads
True
>>> conn.close()
>>> stop()
"""
def test_suite(): def test_suite():
return unittest.TestSuite(( return unittest.TestSuite((
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment