Commit 4a6b0283 authored by Julien Muchembled's avatar Julien Muchembled

mvccadapter: check if the last TID changed without invalidation

Since commit b5895a5c ("mvccadapter:
fix race with invalidations when starting a new transaction"),
a ZEO test fails as follows:

    File "src/ZEO/tests/drop_cache_rather_than_verify.txt", line 114, in drop_cache_rather_than_verify.txt
    Failed example:
        conn.root()[1].x
    Expected:
        6
    Got:
        1

Earlier in the test, the ZEO server is restarted and then another
client commits. When disconnected, the first client does not receive
invalidations anymore and the connection gets stuck in the past until
there's a new commit after it reconnected. It was possible to make the
test pass with the following patch:

--- a/src/ZEO/ClientStorage.py
+++ b/src/ZEO/ClientStorage.py
@@ -357,6 +357,7 @@ def notify_connected(self, conn, info):

         # invalidate our db cache
         if self._db is not None:
+            self._db.invalidate(self.lastTransaction(), ())
             self._db.invalidateCache()

         logger.info("%s %s to storage: %s",

Other implementations like NEO are probably affected the same way.

Rather than changing interfaces in a backward-incompatible way,
this commit revert to the original behaviour, and all the changes
that were done in existing tests are reverted.

However, the interfaces are clarified about the fact that storage
implementations must update at a precise moment the value that is
returned by lastTransaction(): just after invalidate() or
tpc_finish callback.
parent 5ce50c38
...@@ -113,15 +113,15 @@ def pack_with_repeated_blob_records(): ...@@ -113,15 +113,15 @@ def pack_with_repeated_blob_records():
fixed by the time you read this, but there might still be fixed by the time you read this, but there might still be
transactions in the wild that have duplicate records. transactions in the wild that have duplicate records.
>>> db = ZODB.DB(ZODB.FileStorage.FileStorage('t', blob_dir='bobs')) >>> fs = ZODB.FileStorage.FileStorage('t', blob_dir='bobs')
>>> db = ZODB.DB(fs)
>>> conn = db.open() >>> conn = db.open()
>>> conn.root()[1] = ZODB.blob.Blob() >>> conn.root()[1] = ZODB.blob.Blob()
>>> transaction.commit() >>> transaction.commit()
>>> tm = transaction.TransactionManager() >>> tm = transaction.TransactionManager()
>>> oid = conn.root()[1]._p_oid >>> oid = conn.root()[1]._p_oid
>>> fs = db._mvcc_storage.new_instance() >>> from ZODB.utils import load_current
>>> _ = fs.poll_invalidations() >>> blob_record, oldserial = load_current(fs, oid)
>>> blob_record, oldserial = fs.load(oid)
Now, create a transaction with multiple saves: Now, create a transaction with multiple saves:
......
...@@ -685,6 +685,14 @@ class IStorage(Interface): ...@@ -685,6 +685,14 @@ class IStorage(Interface):
def lastTransaction(): def lastTransaction():
"""Return the id of the last committed transaction. """Return the id of the last committed transaction.
For proper MVCC operation, the return value is the id of the last
transaction for which invalidation notifications are completed.
In particular for client-server implementations, lastTransaction
should return a cached value (rather than querying the server).
A preliminary call to sync() can be done to get the actual last
TID at the wanted time.
If no transactions have been committed, return a string of 8 If no transactions have been committed, return a string of 8
null (0) characters. null (0) characters.
""" """
......
...@@ -49,7 +49,6 @@ class MVCCAdapter(Base): ...@@ -49,7 +49,6 @@ class MVCCAdapter(Base):
instance = MVCCAdapterInstance(self) instance = MVCCAdapterInstance(self)
with self._lock: with self._lock:
self._instances.add(instance) self._instances.add(instance)
instance._lastTransaction()
return instance return instance
def before_instance(self, before=None): def before_instance(self, before=None):
...@@ -100,7 +99,7 @@ class MVCCAdapterInstance(Base): ...@@ -100,7 +99,7 @@ class MVCCAdapterInstance(Base):
) )
_start = None # Transaction start time _start = None # Transaction start time
_ltid = None # Last storage transaction id _ltid = b'' # Last storage transaction id
def __init__(self, base): def __init__(self, base):
self._base = base self._base = base
...@@ -109,16 +108,6 @@ class MVCCAdapterInstance(Base): ...@@ -109,16 +108,6 @@ class MVCCAdapterInstance(Base):
self._invalidations = set() self._invalidations = set()
self._sync = getattr(self._storage, 'sync', lambda : None) self._sync = getattr(self._storage, 'sync', lambda : None)
def _lastTransaction(self):
ltid = self._storage.lastTransaction()
# At this precise moment, a transaction may be
# committed and we have already received the new tid.
with self._lock:
# So make sure we won't override with a smaller value.
if self._ltid is None:
# Calling lastTransaction() here could result in a deadlock.
self._ltid = ltid
def release(self): def release(self):
self._base._release(self) self._base._release(self)
...@@ -142,8 +131,15 @@ class MVCCAdapterInstance(Base): ...@@ -142,8 +131,15 @@ class MVCCAdapterInstance(Base):
self._sync() self._sync()
def poll_invalidations(self): def poll_invalidations(self):
# Storage implementations don't always call invalidate() when
# the last TID changes, e.g. after network reconnection,
# so we still have to poll.
ltid = self._storage.lastTransaction()
# But at this precise moment, a transaction may be committed and
# we have already received the new tid, along with invalidations.
with self._lock: with self._lock:
self._start = p64(u64(self._ltid) + 1) # So we must pick the greatest value.
self._start = p64(u64(max(ltid, self._ltid)) + 1)
if self._invalidations is None: if self._invalidations is None:
self._invalidations = set() self._invalidations = set()
return None return None
......
...@@ -536,13 +536,13 @@ class InvalidationTests(unittest.TestCase): ...@@ -536,13 +536,13 @@ class InvalidationTests(unittest.TestCase):
>>> mvcc_storage.invalidate(p64(1), {p1._p_oid: 1}) >>> mvcc_storage.invalidate(p64(1), {p1._p_oid: 1})
Transaction start times are based on storage's last transaction, Transaction start times are based on storage's last
which is known from invalidations. (Previousely, they were transaction. (Previousely, they were based on the first
based on the first invalidation seen in a transaction.) invalidation seen in a transaction.)
>>> mvcc_instance.poll_invalidations() == [p1._p_oid] >>> mvcc_instance.poll_invalidations() == [p1._p_oid]
True True
>>> mvcc_instance._start == p64(2) >>> mvcc_instance._start == p64(u64(db.storage.lastTransaction()) + 1)
True True
>>> mvcc_storage.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1}) >>> mvcc_storage.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1})
......
...@@ -85,14 +85,13 @@ storage has seen. ...@@ -85,14 +85,13 @@ storage has seen.
>>> cn = db.open() >>> cn = db.open()
>>> ltid = u64(st.lastTransaction()) >>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
>>> cn._storage._start == p64(ltid + 1)
True True
>>> cn.db()._mvcc_storage.invalidate(p64(ltid+100), dict.fromkeys([1, 2])) >>> cn.db()._mvcc_storage.invalidate(p64(100), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(ltid + 1) >>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True True
>>> cn.db()._mvcc_storage.invalidate(p64(ltid+200), dict.fromkeys([1, 2])) >>> cn.db()._mvcc_storage.invalidate(p64(200), dict.fromkeys([1, 2]))
>>> cn._storage._start == p64(ltid + 1) >>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True True
A connection's high-water mark is set to the transaction id taken from A connection's high-water mark is set to the transaction id taken from
...@@ -106,7 +105,7 @@ but that doesn't work unless an object is modified. sync() will abort ...@@ -106,7 +105,7 @@ but that doesn't work unless an object is modified. sync() will abort
a transaction and process invalidations. a transaction and process invalidations.
>>> cn.sync() >>> cn.sync()
>>> cn._storage._start == p64(ltid + 201) >>> cn._storage._start == p64(u64(st.lastTransaction()) + 1)
True True
Basic functionality Basic functionality
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment