Commit 0bde3265 authored by Jim Fulton's avatar Jim Fulton

Merge pull request #23 from zopefoundation/load-uses-loadBefore

Load uses load before
parents 50692664 14adf5ab
...@@ -4,6 +4,15 @@ Changelog ...@@ -4,6 +4,15 @@ Changelog
4.2.0 (unreleased) 4.2.0 (unreleased)
------------------ ------------------
- Changed loadBefore to operate more like load behaved, especially
with regard to the load lock. This allowes ZEO to work with the
upcoming ZODB 5, which used loadbefore rather than load.
Reimplemented load using loadBefore, thus testing loadBefore
extensively via existing tests.
- Fixed: the ZEO cache loadBefore method failed to utilize current data.
- Drop support for Python 2.6 and 3.2. - Drop support for Python 2.6 and 3.2.
4.2.0b1 (2015-06-05) 4.2.0b1 (2015-06-05)
......
...@@ -53,6 +53,9 @@ from ZODB import utils ...@@ -53,6 +53,9 @@ from ZODB import utils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# max signed 64-bit value ~ infinity :) Signed cuz LBTree and TimeStamp
m64 = b'\x7f\xff\xff\xff\xff\xff\xff\xff'
try: try:
from ZODB.ConflictResolution import ResolvedSerial from ZODB.ConflictResolution import ResolvedSerial
except ImportError: except ImportError:
...@@ -819,75 +822,36 @@ class ClientStorage(object): ...@@ -819,75 +822,36 @@ class ClientStorage(object):
otherwise a KeyError is raised. otherwise a KeyError is raised.
""" """
self._lock.acquire() # for atomic processing of invalidations result = self.loadBefore(oid, m64)
try: if result is None:
t = self._cache.load(oid) raise POSException.POSKeyError(oid)
if t: return result[:2]
return t
finally: def loadBefore(self, oid, tid):
self._lock.release() """Load the object data written before a transaction id
"""
with self._lock: # for atomic processing of invalidations
result = self._cache.loadBefore(oid, tid)
if result:
return result
if self._server is None: if self._server is None:
raise ClientDisconnected() raise ClientDisconnected()
self._load_lock.acquire() with self._load_lock:
try: with self._lock:
self._lock.acquire()
try:
self._load_oid = oid self._load_oid = oid
self._load_status = 1 self._load_status = 1
finally:
self._lock.release()
data, tid = self._server.loadEx(oid) result = self._server.loadBefore(oid, tid)
self._lock.acquire() # for atomic processing of invalidations with self._lock: # for atomic processing of invalidations
try: if result and self._load_status:
if self._load_status: data, tid, end = result
self._cache.store(oid, tid, None, data) self._cache.store(oid, tid, end, data)
self._load_oid = None self._load_oid = None
finally:
self._lock.release()
finally:
self._load_lock.release()
return data, tid
def loadBefore(self, oid, tid):
self._lock.acquire()
try:
t = self._cache.loadBefore(oid, tid)
if t is not None:
return t
finally:
self._lock.release()
t = self._server.loadBefore(oid, tid)
if t is None:
return None
data, start, end = t
if end is None:
# This method should not be used to get current data. It
# doesn't use the _load_lock, so it is possble to overlap
# this load with an invalidation for the same object.
# If we call again, we're guaranteed to get the
# post-invalidation data. But if the data is still
# current, we'll still get end == None.
# Maybe the best thing to do is to re-run the test with
# the load lock in the case. That's slow performance, but
# I don't think real application code will ever care about
# it.
return data, start, end
self._lock.acquire()
try:
self._cache.store(oid, start, end, data)
finally:
self._lock.release()
return data, start, end return result
def new_oid(self): def new_oid(self):
"""Storage API: return a new object identifier.""" """Storage API: return a new object identifier."""
......
...@@ -494,7 +494,7 @@ class ClientCache(object): ...@@ -494,7 +494,7 @@ class ClientCache(object):
# @defreturn 3-tuple: (string, string, string) # @defreturn 3-tuple: (string, string, string)
@locked @locked
def load(self, oid): def load(self, oid, before_tid=None):
ofs = self.current.get(oid) ofs = self.current.get(oid)
if ofs is None: if ofs is None:
self._trace(0x20, oid) self._trace(0x20, oid)
...@@ -509,6 +509,9 @@ class ClientCache(object): ...@@ -509,6 +509,9 @@ class ClientCache(object):
assert end_tid == z64, (ofs, self.f.tell(), oid, tid, end_tid) assert end_tid == z64, (ofs, self.f.tell(), oid, tid, end_tid)
assert lver == 0, "Versions aren't supported" assert lver == 0, "Versions aren't supported"
if before_tid and tid >= before_tid:
return None
data = read(ldata) data = read(ldata)
assert len(data) == ldata, (ofs, self.f.tell(), oid, len(data), ldata) assert len(data) == ldata, (ofs, self.f.tell(), oid, len(data), ldata)
...@@ -550,13 +553,22 @@ class ClientCache(object): ...@@ -550,13 +553,22 @@ class ClientCache(object):
def loadBefore(self, oid, before_tid): def loadBefore(self, oid, before_tid):
noncurrent_for_oid = self.noncurrent.get(u64(oid)) noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid is None: if noncurrent_for_oid is None:
result = self.load(oid, before_tid)
if result:
return result[0], result[1], None
else:
self._trace(0x24, oid, "", before_tid) self._trace(0x24, oid, "", before_tid)
return None return result
items = noncurrent_for_oid.items(None, u64(before_tid)-1) items = noncurrent_for_oid.items(None, u64(before_tid)-1)
if not items: if not items:
result = self.load(oid, before_tid)
if result:
return result[0], result[1], None
else:
self._trace(0x24, oid, "", before_tid) self._trace(0x24, oid, "", before_tid)
return None return result
tid, ofs = items[-1] tid, ofs = items[-1]
self.f.seek(ofs) self.f.seek(ofs)
...@@ -577,8 +589,12 @@ class ClientCache(object): ...@@ -577,8 +589,12 @@ class ClientCache(object):
assert read(8) == oid, (ofs, self.f.tell(), oid) assert read(8) == oid, (ofs, self.f.tell(), oid)
if end_tid < before_tid: if end_tid < before_tid:
result = self.load(oid, before_tid)
if result:
return result[0], result[1], None
else:
self._trace(0x24, oid, "", before_tid) self._trace(0x24, oid, "", before_tid)
return None return result
self._n_accesses += 1 self._n_accesses += 1
self._trace(0x26, oid, "", saved_tid) self._trace(0x26, oid, "", saved_tid)
......
...@@ -314,7 +314,9 @@ class CacheTests(ZODB.tests.util.TestCase): ...@@ -314,7 +314,9 @@ class CacheTests(ZODB.tests.util.TestCase):
# We use large-2 for the same reason we used small-1 above. # We use large-2 for the same reason we used small-1 above.
expected_len = large-2 expected_len = large-2
self.assertEquals(len(cache), expected_len) self.assertEquals(len(cache), expected_len)
expected_oids = set(list(range(11, 50))+list(range(106, 110))+list(range(200, 305))) expected_oids = set(list(range(11, 50)) +
list(range(106, 110)) +
list(range(200, 305)))
self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()), self.assertEquals(set(u64(oid) for (oid, tid) in cache.contents()),
expected_oids) expected_oids)
...@@ -336,6 +338,21 @@ class CacheTests(ZODB.tests.util.TestCase): ...@@ -336,6 +338,21 @@ class CacheTests(ZODB.tests.util.TestCase):
self.cache.setLastTid(p64(3)) self.cache.setLastTid(p64(3))
self.cache.setLastTid(p64(4)) self.cache.setLastTid(p64(4))
def test_loadBefore_doesnt_miss_current(self):
# Make sure that loadBefore get's current data if there
# isn't non-current data
cache = self.cache
oid = n1
cache.store(oid, n1, None, b'first')
self.assertEqual(cache.loadBefore(oid, n1), None)
self.assertEqual(cache.loadBefore(oid, n2), (b'first', n1, None))
self.cache.invalidate(oid, n2)
cache.store(oid, n2, None, b'second')
self.assertEqual(cache.loadBefore(oid, n1), None)
self.assertEqual(cache.loadBefore(oid, n2), (b'first', n1, n2))
self.assertEqual(cache.loadBefore(oid, n3), (b'second', n2, None))
def kill_does_not_cause_cache_corruption(): def kill_does_not_cause_cache_corruption():
r""" r"""
......
...@@ -242,19 +242,24 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -242,19 +242,24 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# Undone oid info returned by vote. # Undone oid info returned by vote.
# #
# Z3101 -- checkCurrentSerialInTransaction # Z3101 -- checkCurrentSerialInTransaction
#
# Z4 -- checkCurrentSerialInTransaction
# No-longer call load.
# Protocol variables: # Protocol variables:
# Our preferred protocol. # Our preferred protocol.
current_protocol = b"Z3101" current_protocol = b"Z4"
# If we're a client, an exhaustive list of the server protocols we # If we're a client, an exhaustive list of the server protocols we
# can accept. # can accept.
servers_we_can_talk_to = [b"Z308", b"Z309", b"Z310", current_protocol] servers_we_can_talk_to = [b"Z308", b"Z309", b"Z310", b"Z3101",
current_protocol]
# If we're a server, an exhaustive list of the client protocols we # If we're a server, an exhaustive list of the client protocols we
# can accept. # can accept.
clients_we_can_talk_to = [ clients_we_can_talk_to = [
b"Z200", b"Z201", b"Z303", b"Z308", b"Z309", b"Z310", current_protocol] b"Z200", b"Z201", b"Z303", b"Z308", b"Z309", b"Z310", b"Z3101",
current_protocol]
# This is pretty excruciating. Details: # This is pretty excruciating. Details:
# #
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment