Commit 8e7eab33 authored by Kirill Smelkov's avatar Kirill Smelkov

[ZODB4] Backport the way MVCC is handled from ZODB5

This backports to ZODB4 Connection ZODB5's approach to handle MVCC via
always calling storage.loadBefore() instead of "load for latest version
+ loadBefore if we were notified of database changes" approach.

Why?
----

Short answer: because Wendelin.core 2 needs to know at which particular
database state application-level ZODB connection is viewing the
database, and it is hard to implement such functionality correctly
without this backport. Please see appendix for the explanation.

What
----

This backports to ZODB4 the minimum necessary part of upstream commit 227953b9
(Simplify MVCC by determining transaction start time using lastTransaction) +
follow-up correctness fixes:

https://github.com/zopefoundation/ZODB/issues/50
https://github.com/zopefoundation/ZODB/pull/56
https://github.com/zopefoundation/ZODB/pull/291
https://github.com/zopefoundation/ZODB/pull/307

In short:

- a Connection is always opened with explicitly corresponding to a particular database revision
- Connection uses only loadBefore with that revision to load objects
- every time a Connection is (re)opened, the result of queued invalidations and
  explicit query to storage.lastTransaction is carefully merged to refresh
  Connection's idea about which database state it corresponds to.

The "careful" in last point is important. Historically ZODB5 was first reworked
in commit 227953b9 (https://github.com/zopefoundation/ZODB/pull/56) to always
call lastTransaction to refresh state of Connection view. Since there
was no proper synchronisation with respect to process of handling
invalidations, that lead to data corruption issue due to race in
Connection.open() vs invalidations:

https://github.com/zopefoundation/ZODB/issues/290

That race and data corruption was fixed in commit b5895a5c
(https://github.com/zopefoundation/ZODB/pull/291) by way of avoiding
lastTransaction call and relying only on invalidations channel when
refreshing Connection view.

This fix in turn led to another data corruption issue because in
presence of client-server reconnections, ZODB storage drivers can partly
skip notifying client with detailed invalidation messages:

https://github.com/zopefoundation/ZODB/pull/291#issuecomment-581047061

A fix to that issue (https://github.com/zopefoundation/ZODB/pull/307)
proposed to change back to query storage for lastTransaction on every
Connection refresh, but to implement careful merging of lastTransaction
result and data from invalidation channel. However it was found that the
"careful merging" can work correctly only if we require from storage
drivers a particular ordering of invalidation events wrt lastTransaction
return and result:

https://github.com/zopefoundation/ZODB/pull/307#discussion_r434145034

While ZEO was already complying with that requirements, NEO had to be
fixed to support that:

https://github.com/zopefoundation/ZODB/pull/307#discussion_r434166238
neoppod@a7d101ec
neoppod@96a5c01f

Patch details
-------------

We change Connection._txn_time to be a "before" for the database state
to which Connection view corresponds. This state is hooked to be
initialized and updated in Connection._flush_invalidations - the
function that is called from both explicit Connection (re)open and at
transaction boundaries via Connection.afterCompletion hook.

Objects loading is factored into Connection._load which replaces old
"load + check invalidated + fallback to loadBefore" game in
Connection._setstate.

Connection.open now calls Connection._flush_invalidations
unconditionally - even if it was global cache reset event - because
besides invalidation flushes the latter is now responsible for querying
storage lastTransaction.

TmpStore - a "storage" that provides runtime support for savepoints - is
refactored correspondingly to delegate loading of original objects back
to underlying Connection.

DB.close is modified - similarly to ZODB5 - to release DB's Connections
carefully with preventing connections from DB poll from implicitly
starting new transactions via afterCompletion hook.

ZODB.nxd_patches is introduced to indicate to client software that this
particular patch is present and can be relied upon.

Tests are updated correspondingly. In 227953b9 Jim talks about
converting many tests - because

	"Lots of tests didn't clean up databases and connections properly"

and because new MVCC approach

	"makes database and connection hygiene a bit more important,
	especially for tests, because a connection will continue to interact
	with storages if it isn't properly closed, which can lead to errors if
	the storage is closed."

but finally implementing automatic cleanup at transaction boundaries
because there are too many tests to fix. We backport only automatic
cleanup + necessary explicit test fixes to keep the diff minimal.

All tests pass. This includes tests for ZODB itself, ZEO and NEO test
over hereby modified ZODB(*), my test programs from

https://github.com/zopefoundation/ZODB/issues/290	and
https://github.com/zopefoundation/ZEO/issues/155

and ERP5 tests. Upcoming wendelin.core 2 also work with this change.

(*) ZEO, NEO and ERP5 tests fail sometimes, but there is no regression
here because ZEO, NEO and ERP5 tests are failing regularly, and in the
same way, even with unmodified ZODB.

Appendix. zconn_at
------------------

This appendix provides motivation for the backport:

For wendelin.core v2 we need a way to know at which particular database
state application-level ZODB connection is viewing the database. Knowing
that state, WCFS client library interacts with WCFS filesystem server
and, in simple terms, requests the server to provide data as of that
particular database state. Let us call the function that for a client
ZODB connection returns database state corresponding to its database
view zconn_at.

Now here is the problem: zconn_at is relatively easy to implement for
ZODB5 - see e.g. here:

https://lab.nexedi.com/nexedi/wendelin.core/blob/v0.13-54-ga6a8f5b/lib/zodb.py#L142-181
wendelin.core@3bd82127

however, for ZODB4, since its operational models is not
directly MVCC, it is not that straightforward. Still, even for older
ZODB4, for every client connection, there _is_ such at that corresponds
to that connection view of the database.

We need ZODB4 support, because ZODB4 is currently the version that
Nexedi uses, and my understanding is that it will stay like this for not
a small time. I have the feeling that ZODB5 was reworked in better
direction, but without caring enough about quality which resulted in
concurrency bugs with data corruption effects like

https://github.com/zopefoundation/ZODB/issues/290
https://github.com/zopefoundation/ZEO/issues/155
etc.

Even though the first one is now fixed (but it broke other parts and so
both ZODB had to be fixed again _and_ NEO had to be fixed for that ZODB
fix to work currently), I feel that upgrading into ZODB5 for Nexedi will
require some non-negligible amount of QA work, and thus it is better if
we move step-by-step - even if we eventually upgrade to ZODB5 - it is
better we first migrate wendelin.core 1 -> wendelin.core 2 with keeping
current version of ZODB.

Now please note what would happen if zconn_at gives, even a bit, wrong
answer: wcfs client will ask wcfs server to provide array data as of
different database state compared to current on-client ZODB connection.
This will result in that data accessed via ZBigArray will _not_
correspond to all other data accessed via regular ZODB mechanism.
It is, in other words, a data corruptions.
In some scenarios it can be harmless, but generally it is the worst
that can happen to a database.

It is good to keep in mind ZODB issue290 when imagining corner cases
that zconn_at has to deal with. Even though that issue is ZODB5 only, it
shows what kind of bugs it can be in zconn_at implementation for ZODB4.

Just for the reference: in Wendelin-based systems there is usually constant
stream of database ingestions coming from many places simultaneously. Plus many
activities crunching on the DB at the same time as well. And the more clients a
system handles, the more there will be level-of-concurrency increase. This
means that the problem of correctly handling concurrency issues in zconn_at is
not purely theoretical, but has direct relation to our systems.

--------

With this backport, zconn_at for ZODB4 becomes trivial and robust to implement:

https://lab.nexedi.com/kirr/wendelin.core/blob/484071b3/lib/zodb.py#L183-195

I would like to thank Joshua Wölfel whose internship helped this topic
to shape up:

https://www.erp5.com/project_section/wendelin-ia-project/forum/Joshua-internship-D8b7NNhWfz

/cc @nexedi, @jwolf083Signed-off-by: Kirill Smelkov's avatarKirill Smelkov <kirr@nexedi.com>
parent 40116375
...@@ -19,6 +19,7 @@ import logging ...@@ -19,6 +19,7 @@ import logging
import sys import sys
import tempfile import tempfile
import threading import threading
import traceback
import warnings import warnings
import os import os
import time import time
...@@ -199,10 +200,12 @@ class Connection(ExportImport, object): ...@@ -199,10 +200,12 @@ class Connection(ExportImport, object):
# _conflicts). # _conflicts).
self._conflicts = {} self._conflicts = {}
# _ltid stores last transaction received via invalidate from storage.
self._ltid = None
# _txn_time stores the upper bound on transactions visible to # _txn_time stores the upper bound on transactions visible to
# this connection. That is, all object revisions must be # this connection. That is, all object revisions must be
# written before _txn_time. If it is None, then the current # written before _txn_time.
# revisions are acceptable.
self._txn_time = None self._txn_time = None
# To support importFile(), implemented in the ExportImport base # To support importFile(), implemented in the ExportImport base
...@@ -238,6 +241,23 @@ class Connection(ExportImport, object): ...@@ -238,6 +241,23 @@ class Connection(ExportImport, object):
elif obj._p_jar is not self: elif obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar) raise InvalidObjectReference(obj, obj._p_jar)
# _load loads object data according to connection view of the database.
def _load(self, oid):
return self._loadFrom(oid, self._storage)
def _loadFrom(self, oid, storage): # -> (data, serial) | POSKeyError | ReadConflicError
if self._mvcc_storage:
data, serial = storage.load(oid)
else:
# NOTE: loadBefore raises POSKeyError if oid is recorded as deleted as of <._txn_time
r = storage.loadBefore(oid, self._txn_time)
if r is None:
# oid is not present at all os of <._txn_time
raise ReadConflictError(oid)
data, serial, _ = r
self._load_count += 1
return data, serial
def get(self, oid): def get(self, oid):
"""Return the persistent object with oid 'oid'.""" """Return the persistent object with oid 'oid'."""
if self.opened is None: if self.opened is None:
...@@ -253,7 +273,7 @@ class Connection(ExportImport, object): ...@@ -253,7 +273,7 @@ class Connection(ExportImport, object):
if obj is not None: if obj is not None:
return obj return obj
p, serial = self._storage.load(oid, '') p, serial = self._load(oid)
obj = self._reader.getGhost(p) obj = self._reader.getGhost(p)
# Avoid infiniate loop if obj tries to load its state before # Avoid infiniate loop if obj tries to load its state before
...@@ -358,14 +378,16 @@ class Connection(ExportImport, object): ...@@ -358,14 +378,16 @@ class Connection(ExportImport, object):
if self.before is not None: if self.before is not None:
# This is a historical connection. Invalidations are irrelevant. # This is a historical connection. Invalidations are irrelevant.
return return
if tid is None: # ZEO can still call invalidate(tid=None) from ClientStorage.finish_verification
self._log.warning("invalidate(tid=None) called:\n%s" % ''.join(traceback.format_stack()))
self.invalidateCache()
return
self._inv_lock.acquire() self._inv_lock.acquire()
try: try:
if self._txn_time is None: if tid < self._ltid:
self._txn_time = tid
elif (tid is not None) and (tid < self._txn_time):
raise AssertionError("invalidations out of order, %r < %r" raise AssertionError("invalidations out of order, %r < %r"
% (tid, self._txn_time)) % (tid, self._ltid))
self._ltid = tid
self._invalidated.update(oids) self._invalidated.update(oids)
finally: finally:
self._inv_lock.release() self._inv_lock.release()
...@@ -500,7 +522,7 @@ class Connection(ExportImport, object): ...@@ -500,7 +522,7 @@ class Connection(ExportImport, object):
self._registered_objects = [] self._registered_objects = []
self._creating.clear() self._creating.clear()
# Process pending invalidations. # Process pending invalidations and query storage for lastTransaction.
def _flush_invalidations(self): def _flush_invalidations(self):
if self._mvcc_storage: if self._mvcc_storage:
# Poll the storage for invalidations. # Poll the storage for invalidations.
...@@ -513,6 +535,18 @@ class Connection(ExportImport, object): ...@@ -513,6 +535,18 @@ class Connection(ExportImport, object):
self._cache.invalidate(invalidated) self._cache.invalidate(invalidated)
self._inv_lock.acquire() self._inv_lock.acquire()
else:
# Storage implementations don't always call invalidate() when
# the last TID changes, e.g. after network reconnection,
# so we still have to poll.
ltid = self._storage.lastTransaction()
# But at this precise moment, a transaction may be committed and
# we have already received the new tid, along with invalidations.
self._inv_lock.acquire()
# So we must pick the greatest value.
self._txn_time = p64(u64(max(ltid, self._ltid)) + 1)
try: try:
# Non-ghostifiable objects may need to read when they are # Non-ghostifiable objects may need to read when they are
# invalidated, so we'll quickly just replace the # invalidated, so we'll quickly just replace the
...@@ -544,7 +578,6 @@ class Connection(ExportImport, object): ...@@ -544,7 +578,6 @@ class Connection(ExportImport, object):
invalidated = dict.fromkeys(self._invalidated) invalidated = dict.fromkeys(self._invalidated)
self._invalidated = set() self._invalidated = set()
self._txn_time = None
if self._invalidatedCache: if self._invalidatedCache:
self._invalidatedCache = False self._invalidatedCache = False
invalidated = self._cache.cache_data.copy() invalidated = self._cache.cache_data.copy()
...@@ -920,16 +953,6 @@ class Connection(ExportImport, object): ...@@ -920,16 +953,6 @@ class Connection(ExportImport, object):
# as a performance optimization for the pure-Python persistent implementation # as a performance optimization for the pure-Python persistent implementation
# where accessing an attribute involves __getattribute__ calls # where accessing an attribute involves __getattribute__ calls
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, among the cases to consider:
# - Try MVCC
# - Raise ConflictError.
if self.before is not None: if self.before is not None:
# Load data that was current before the time we have. # Load data that was current before the time we have.
before = self.before before = self.before
...@@ -939,29 +962,7 @@ class Connection(ExportImport, object): ...@@ -939,29 +962,7 @@ class Connection(ExportImport, object):
p, serial, end = t p, serial, end = t
else: else:
# There is a harmless data race with self._invalidated. A p, serial = self._load(oid)
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if self._invalidatedCache:
raise ReadConflictError()
if (oid in self._invalidated):
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(oid, '')
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p) self._reader.setGhostState(obj, p)
obj._p_serial = serial obj._p_serial = serial
...@@ -973,43 +974,6 @@ class Connection(ExportImport, object): ...@@ -973,43 +974,6 @@ class Connection(ExportImport, object):
obj._p_blob_uncommitted = None obj._p_blob_uncommitted = None
obj._p_blob_committed = self._storage.loadBlob(oid, serial) obj._p_blob_committed = self._storage.loadBlob(oid, serial)
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not self._setstate_noncurrent(obj):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
# MVCC Blob support
if isinstance(obj, Blob):
obj._p_blob_uncommitted = None
obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, start)
return True
def register(self, obj): def register(self, obj):
"""Register obj with the current transaction manager. """Register obj with the current transaction manager.
...@@ -1094,7 +1058,6 @@ class Connection(ExportImport, object): ...@@ -1094,7 +1058,6 @@ class Connection(ExportImport, object):
if self._reset_counter != global_reset_counter: if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache. # New code is in place. Start a new cache.
self._resetCache() self._resetCache()
else:
self._flush_invalidations() self._flush_invalidations()
transaction_manager.registerSynch(self) transaction_manager.registerSynch(self)
...@@ -1170,7 +1133,7 @@ class Connection(ExportImport, object): ...@@ -1170,7 +1133,7 @@ class Connection(ExportImport, object):
def savepoint(self): def savepoint(self):
if self._savepoint_storage is None: if self._savepoint_storage is None:
tmpstore = TmpStore(self._normal_storage) tmpstore = TmpStore(self)
self._savepoint_storage = tmpstore self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage self._storage = self._savepoint_storage
...@@ -1234,7 +1197,7 @@ class Connection(ExportImport, object): ...@@ -1234,7 +1197,7 @@ class Connection(ExportImport, object):
# that that the next attribute access of its name # that that the next attribute access of its name
# unghostify it, which will cause its blob data # unghostify it, which will cause its blob data
# to be reattached "cleanly" # to be reattached "cleanly"
self.invalidate(None, (oid, )) self._cache.invalidate(oid)
else: else:
s = self._storage.store(oid, serial, data, s = self._storage.store(oid, serial, data,
'', transaction) '', transaction)
...@@ -1292,13 +1255,14 @@ class TmpStore: ...@@ -1292,13 +1255,14 @@ class TmpStore:
"""A storage-like thing to support savepoints.""" """A storage-like thing to support savepoints."""
def __init__(self, storage): def __init__(self, conn):
self._storage = storage self._conn = conn
self._storage = conn._normal_storage
for method in ( for method in (
'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore', 'getName', 'new_oid', 'getSize', 'sortKey',
'isReadOnly' 'isReadOnly'
): ):
setattr(self, method, getattr(storage, method)) setattr(self, method, getattr(self._storage, method))
self._file = tempfile.TemporaryFile(prefix='TmpStore') self._file = tempfile.TemporaryFile(prefix='TmpStore')
# position: current file position # position: current file position
...@@ -1319,10 +1283,13 @@ class TmpStore: ...@@ -1319,10 +1283,13 @@ class TmpStore:
remove_committed_dir(self._blob_dir) remove_committed_dir(self._blob_dir)
self._blob_dir = None self._blob_dir = None
def load(self, oid, version): def _load(self, oid):
return self._conn._loadFrom(oid, self._storage)
def load(self, oid, version=''):
pos = self.index.get(oid) pos = self.index.get(oid)
if pos is None: if pos is None:
return self._storage.load(oid, '') return self._load(oid)
self._file.seek(pos) self._file.seek(pos)
h = self._file.read(8) h = self._file.read(8)
oidlen = u64(h) oidlen = u64(h)
...@@ -1334,6 +1301,13 @@ class TmpStore: ...@@ -1334,6 +1301,13 @@ class TmpStore:
serial = h[:8] serial = h[:8]
return self._file.read(size), serial return self._file.read(size), serial
def loadBefore(self, oid, before):
if before != self._conn._txn_time:
raise ValueError('TmpStore.loadBefore called with before != conn._txn_time')
p, serial = self.load(oid)
return p, serial, None # NOTE next_serial is ignored by caller
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
# we have this funny signature so we can reuse the normal non-commit # we have this funny signature so we can reuse the normal non-commit
# commit logic # commit logic
......
...@@ -626,12 +626,25 @@ class DB(object): ...@@ -626,12 +626,25 @@ class DB(object):
noop = lambda *a: None noop = lambda *a: None
self.close = noop self.close = noop
# go over all connections and prepare them to handle last txn.abort()
txn_managers = set() # of conn.transaction_manager
@self._connectionMap @self._connectionMap
def _(c): def _(conn):
if c.transaction_manager is not None: if conn.transaction_manager is not None:
c.transaction_manager.abort() for c in six.itervalues(conn.connections):
c.afterCompletion = c.newTransaction = c.close = noop # Prevent connections from implicitly starting new
c._release_resources() # transactions.
c.afterCompletion = c.newTransaction = noop
txn_managers.add(conn.transaction_manager)
conn.close = noop
conn._release_resources()
# abort transaction managers for all above connections
# call txn.abort only after all connections are prepared, as else - if
# we call txn.abort() above - some connections could be not yet
# prepared and with still active afterCompletion callback.
for transaction_manager in txn_managers:
transaction_manager.abort()
self.storage.close() self.storage.close()
del self.storage del self.storage
......
...@@ -26,3 +26,15 @@ sys.modules['ZODB.PersistentList'] = sys.modules['persistent.list'] ...@@ -26,3 +26,15 @@ sys.modules['ZODB.PersistentList'] = sys.modules['persistent.list']
del mapping, list, sys del mapping, list, sys
from ZODB.DB import DB, connection from ZODB.DB import DB, connection
# set of changes backported by Nexedi.
nxd_patches = {
# Rework Connection MVCC implementation to always call
# storage.loadBefore(zconn._txn_time) to load objects.
# storage.load() is no longer called at all.
# https://github.com/zopefoundation/ZODB/issues/50
# https://github.com/zopefoundation/ZODB/pull/56
# https://github.com/zopefoundation/ZODB/pull/307
# ...
'conn:MVCC-via-loadBefore-only',
}
...@@ -776,5 +776,5 @@ def IExternalGC_suite(factory): ...@@ -776,5 +776,5 @@ def IExternalGC_suite(factory):
return doctest.DocFileSuite( return doctest.DocFileSuite(
'IExternalGC.test', 'IExternalGC.test',
setUp=setup, tearDown=zope.testing.setupstack.tearDown, setUp=setup, tearDown=ZODB.tests.util.tearDown,
checker=ZODB.tests.util.checker) checker=ZODB.tests.util.checker)
...@@ -61,7 +61,7 @@ While it's boring, it's important to verify that the same relationships ...@@ -61,7 +61,7 @@ While it's boring, it's important to verify that the same relationships
hold if the default pool size is overridden. hold if the default pool size is overridden.
>>> handler.clear() >>> handler.clear()
>>> st.close() >>> db.close()
>>> st = Storage() >>> st = Storage()
>>> PS = 2 # smaller pool size >>> PS = 2 # smaller pool size
>>> db = DB(st, pool_size=PS) >>> db = DB(st, pool_size=PS)
...@@ -117,7 +117,7 @@ We can change the pool size on the fly: ...@@ -117,7 +117,7 @@ We can change the pool size on the fly:
Enough of that. Enough of that.
>>> handler.clear() >>> handler.clear()
>>> st.close() >>> db.close()
More interesting is the stack-like nature of connection reuse. So long as More interesting is the stack-like nature of connection reuse. So long as
we keep opening new connections, and keep them alive, all connections we keep opening new connections, and keep them alive, all connections
...@@ -256,7 +256,7 @@ Nothing in that last block should have logged any msgs: ...@@ -256,7 +256,7 @@ Nothing in that last block should have logged any msgs:
If "too many" connections are open, then closing one may kick an older If "too many" connections are open, then closing one may kick an older
closed one out of the available connection stack. closed one out of the available connection stack.
>>> st.close() >>> db.close()
>>> st = Storage() >>> st = Storage()
>>> db = DB(st, pool_size=3) >>> db = DB(st, pool_size=3)
>>> conns = [db.open() for dummy in range(6)] >>> conns = [db.open() for dummy in range(6)]
...@@ -324,7 +324,7 @@ gc to reclaim the Connection and its cache eventually works, but that can ...@@ -324,7 +324,7 @@ gc to reclaim the Connection and its cache eventually works, but that can
take "a long time" and caches can hold on to many objects, and limited take "a long time" and caches can hold on to many objects, and limited
resources (like RDB connections), for the duration. resources (like RDB connections), for the duration.
>>> st.close() >>> db.close()
>>> st = Storage() >>> st = Storage()
>>> db = DB(st, pool_size=2) >>> db = DB(st, pool_size=2)
>>> conn0 = db.open() >>> conn0 = db.open()
......
...@@ -22,7 +22,8 @@ import unittest ...@@ -22,7 +22,8 @@ import unittest
import transaction import transaction
import ZODB.tests.util import ZODB.tests.util
from ZODB.config import databaseFromString from ZODB.config import databaseFromString
from ZODB.utils import p64 from ZODB.utils import p64, u64, z64, newTid
from ZODB.POSException import ConflictError
from persistent import Persistent from persistent import Persistent
from zope.interface.verify import verifyObject from zope.interface.verify import verifyObject
from zope.testing import loggingsupport, renormalizing from zope.testing import loggingsupport, renormalizing
...@@ -521,17 +522,18 @@ class InvalidationTests(unittest.TestCase): ...@@ -521,17 +522,18 @@ class InvalidationTests(unittest.TestCase):
Transaction ids are 8-byte strings, just like oids; p64() will Transaction ids are 8-byte strings, just like oids; p64() will
create one from an int. create one from an int.
>>> cn.invalidate(p64(1), {p1._p_oid: 1}) >>> t = u64(cn._txn_time)
>>> cn._txn_time >>> cn.invalidate(p64(t+1), {p1._p_oid: 1})
'\x00\x00\x00\x00\x00\x00\x00\x01' >>> cn._txn_time == p64(t)
True
>>> p1._p_oid in cn._invalidated >>> p1._p_oid in cn._invalidated
True True
>>> p2._p_oid in cn._invalidated >>> p2._p_oid in cn._invalidated
False False
>>> cn.invalidate(p64(10), {p2._p_oid: 1, p64(76): 1}) >>> cn.invalidate(p64(t+10), {p2._p_oid: 1, p64(76): 1})
>>> cn._txn_time >>> cn._txn_time == p64(t)
'\x00\x00\x00\x00\x00\x00\x00\x01' True
>>> p1._p_oid in cn._invalidated >>> p1._p_oid in cn._invalidated
True True
>>> p2._p_oid in cn._invalidated >>> p2._p_oid in cn._invalidated
...@@ -586,51 +588,29 @@ def doctest_invalidateCache(): ...@@ -586,51 +588,29 @@ def doctest_invalidateCache():
>>> connection.invalidateCache() >>> connection.invalidateCache()
Now, if we try to load an object, we'll get a read conflict: This won't have any effect until the next transaction:
>>> connection.root()['b'].x >>> connection.root()['a']._p_changed
Traceback (most recent call last): 0
... >>> connection.root()['b']._p_changed
ReadConflictError: database read conflict error >>> connection.root()['c']._p_changed
1
If we try to commit the transaction, we'll get a conflict error: But if we sync():
>>> tm.commit() >>> connection.sync()
Traceback (most recent call last):
...
ConflictError: database conflict error
and the cache will have been cleared: All of our data was invalidated:
>>> print(connection.root()['a']._p_changed) >>> connection.root()['a']._p_changed
None >>> connection.root()['b']._p_changed
>>> print(connection.root()['b']._p_changed) >>> connection.root()['c']._p_changed
None
>>> print(connection.root()['c']._p_changed)
None
But we'll be able to access data again: But we can load data as usual:
>>> connection.root()['b'].x >>> connection.root()['b'].x
1 1
Aborting a transaction after a read conflict also lets us read data and go
on about our business:
>>> connection.invalidateCache()
>>> connection.root()['c'].x
Traceback (most recent call last):
...
ReadConflictError: database read conflict error
>>> tm.abort()
>>> connection.root()['c'].x
1
>>> connection.root()['c'].x = 2
>>> tm.commit()
>>> db.close() >>> db.close()
""" """
...@@ -1244,12 +1224,16 @@ class StubStorage: ...@@ -1244,12 +1224,16 @@ class StubStorage:
def __init__(self): def __init__(self):
# internal # internal
self._head = z64
self._stored = [] self._stored = []
self._finished = [] self._finished = []
self._data = {} self._data = {}
self._transdata = {} self._transdata = {}
self._transstored = [] self._transstored = []
def lastTransaction(self):
return self._head
def new_oid(self): def new_oid(self):
oid = str(self._oid) oid = str(self._oid)
self._oid += 1 self._oid += 1
...@@ -1283,7 +1267,11 @@ class StubStorage: ...@@ -1283,7 +1267,11 @@ class StubStorage:
raise RuntimeError( raise RuntimeError(
'StubStorage uses only one transaction at a time') 'StubStorage uses only one transaction at a time')
self._finished.extend(self._transstored) self._finished.extend(self._transstored)
tid = newTid(None)
self._data.update(self._transdata) self._data.update(self._transdata)
for oid,p in self._transdata.items():
self._data[oid] = (p, tid)
self._head = tid
callback(transaction) callback(transaction)
del self._transaction del self._transaction
self._transdata.clear() self._transdata.clear()
...@@ -1294,6 +1282,12 @@ class StubStorage: ...@@ -1294,6 +1282,12 @@ class StubStorage:
raise TypeError('StubStorage does not support versions.') raise TypeError('StubStorage does not support versions.')
return self._data[oid] return self._data[oid]
def loadBefore(self, oid, before):
if u64(before) != u64(self._head) + 1:
raise ValueError('noncurrent loadBefore not supported')
p, serial = self.load(oid)
return p, serial, None
def store(self, oid, serial, p, version, transaction): def store(self, oid, serial, p, version, transaction):
if version != '': if version != '':
raise TypeError('StubStorage does not support versions.') raise TypeError('StubStorage does not support versions.')
...@@ -1302,9 +1296,12 @@ class StubStorage: ...@@ -1302,9 +1296,12 @@ class StubStorage:
elif self._transaction != transaction: elif self._transaction != transaction:
raise RuntimeError( raise RuntimeError(
'StubStorage uses only one transaction at a time') 'StubStorage uses only one transaction at a time')
serialOK = self._data.get(oid, z64)
if serial != serialOK:
raise ConflictError(oid=oid)
self._stored.append(oid) self._stored.append(oid)
self._transstored.append(oid) self._transstored.append(oid)
self._transdata[oid] = (p, serial) self._transdata[oid] = p
# Explicitly returning None, as we're not pretending to be a ZEO # Explicitly returning None, as we're not pretending to be a ZEO
# storage # storage
return None return None
......
...@@ -89,28 +89,33 @@ def test_invalidateCache(): ...@@ -89,28 +89,33 @@ def test_invalidateCache():
>>> tm1.commit() >>> tm1.commit()
>>> tm2 = transaction.TransactionManager() >>> tm2 = transaction.TransactionManager()
>>> c2 = db.open(transaction_manager=tm2) >>> c2 = db.open(transaction_manager=tm2)
>>> c1.root()['a']._p_deactivate() >>> c2.root()['a'].value
1
>>> tm3 = transaction.TransactionManager() >>> tm3 = transaction.TransactionManager()
>>> c3 = db.open(transaction_manager=tm3) >>> c3 = db.open(transaction_manager=tm3)
>>> c3.root()['a'].value >>> c3.root()['a'].value
1 1
>>> c3.close() >>> c3.close()
>>> db.invalidateCache() >>> db.invalidateCache()
>>> c1.root.a._p_changed
0
>>> c1.sync()
>>> c1.root.a._p_changed
>>> c2.root.a._p_changed
0
>>> c2.sync()
>>> c2.root.a._p_changed
>>> c3 is db.open(transaction_manager=tm3)
True
>>> c3.root.a._p_changed
>>> c1.root()['a'].value >>> c1.root()['a'].value
Traceback (most recent call last): 1
...
ReadConflictError: database read conflict error
>>> c2.root()['a'].value >>> c2.root()['a'].value
Traceback (most recent call last): 1
... >>> c3.root()['a'].value
ReadConflictError: database read conflict error 1
>>> c3 is db.open(transaction_manager=tm3)
True
>>> print(c3.root()['a']._p_changed)
None
>>> db.close() >>> db.close()
""" """
......
...@@ -121,6 +121,9 @@ class MinimalMemoryStorage(BaseStorage, object): ...@@ -121,6 +121,9 @@ class MinimalMemoryStorage(BaseStorage, object):
end_tid = None end_tid = None
else: else:
end_tid = tids[j] end_tid = tids[j]
self.hook(the_oid, self._cur[the_oid], '')
return self._index[(the_oid, tid)], tid, end_tid return self._index[(the_oid, tid)], tid, end_tid
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
......
...@@ -54,6 +54,8 @@ except NameError: ...@@ -54,6 +54,8 @@ except NameError:
import io import io
file_type = io.BufferedReader file_type = io.BufferedReader
from . import util
def new_time(): def new_time():
"""Create a _new_ time stamp. """Create a _new_ time stamp.
...@@ -548,10 +550,8 @@ def loadblob_tmpstore(): ...@@ -548,10 +550,8 @@ def loadblob_tmpstore():
Now we open a database with a TmpStore in front: Now we open a database with a TmpStore in front:
>>> database.close()
>>> from ZODB.Connection import TmpStore >>> from ZODB.Connection import TmpStore
>>> tmpstore = TmpStore(blob_storage) >>> tmpstore = TmpStore(connection)
We can access the blob correctly: We can access the blob correctly:
...@@ -757,7 +757,7 @@ def storage_reusable_suite(prefix, factory, ...@@ -757,7 +757,7 @@ def storage_reusable_suite(prefix, factory,
"blob_connection.txt", "blob_connection.txt",
"blob_importexport.txt", "blob_importexport.txt",
"blob_transaction.txt", "blob_transaction.txt",
setUp=setup, tearDown=zope.testing.setupstack.tearDown, setUp=setup, tearDown=util.tearDown,
checker=zope.testing.renormalizing.RENormalizing([ checker=zope.testing.renormalizing.RENormalizing([
# Py3k renders bytes where Python2 used native strings... # Py3k renders bytes where Python2 used native strings...
(re.compile(r"^b'"), "'"), (re.compile(r"^b'"), "'"),
...@@ -780,10 +780,10 @@ def storage_reusable_suite(prefix, factory, ...@@ -780,10 +780,10 @@ def storage_reusable_suite(prefix, factory,
if test_packing: if test_packing:
suite.addTest(doctest.DocFileSuite( suite.addTest(doctest.DocFileSuite(
"blob_packing.txt", "blob_packing.txt",
setUp=setup, tearDown=zope.testing.setupstack.tearDown, setUp=setup, tearDown=util.tearDown,
)) ))
suite.addTest(doctest.DocTestSuite( suite.addTest(doctest.DocTestSuite(
setUp=setup, tearDown=zope.testing.setupstack.tearDown, setUp=setup, tearDown=util.tearDown,
checker = ZODB.tests.util.checker + \ checker = ZODB.tests.util.checker + \
zope.testing.renormalizing.RENormalizing([ zope.testing.renormalizing.RENormalizing([
(re.compile(r'\%(sep)s\%(sep)s' % dict(sep=os.path.sep)), '/'), (re.compile(r'\%(sep)s\%(sep)s' % dict(sep=os.path.sep)), '/'),
...@@ -823,7 +823,7 @@ def test_suite(): ...@@ -823,7 +823,7 @@ def test_suite():
"blob_tempdir.txt", "blob_tempdir.txt",
"blobstorage_packing.txt", "blobstorage_packing.txt",
setUp=setUp, setUp=setUp,
tearDown=zope.testing.setupstack.tearDown, tearDown=util.tearDown,
optionflags=doctest.ELLIPSIS, optionflags=doctest.ELLIPSIS,
checker=ZODB.tests.util.checker, checker=ZODB.tests.util.checker,
)) ))
...@@ -831,7 +831,7 @@ def test_suite(): ...@@ -831,7 +831,7 @@ def test_suite():
"blob_layout.txt", "blob_layout.txt",
optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE, optionflags=doctest.ELLIPSIS|doctest.NORMALIZE_WHITESPACE,
setUp=setUp, setUp=setUp,
tearDown=zope.testing.setupstack.tearDown, tearDown=util.tearDown,
checker=ZODB.tests.util.checker + checker=ZODB.tests.util.checker +
zope.testing.renormalizing.RENormalizing([ zope.testing.renormalizing.RENormalizing([
(re.compile(r'\%(sep)s\%(sep)s' % dict(sep=os.path.sep)), '/'), (re.compile(r'\%(sep)s\%(sep)s' % dict(sep=os.path.sep)), '/'),
......
...@@ -33,9 +33,17 @@ This note includes doctests that explain how MVCC is implemented (and ...@@ -33,9 +33,17 @@ This note includes doctests that explain how MVCC is implemented (and
test that the implementation is correct). The tests use a test that the implementation is correct). The tests use a
MinimalMemoryStorage that implements MVCC support, but not much else. MinimalMemoryStorage that implements MVCC support, but not much else.
***IMPORTANT***: The MVCC approach has changed since these tests were
originally written. The new approach is much simpler because we no
longer call load to get the current state of an object. We call
loadBefore instead, having gotten a transaction time at the start of a
transaction. As a result, the rhythm of the tests is a little odd,
because we no longer need to probe a complex dance that doesn't exist any more.
>>> from ZODB.tests.test_storage import MinimalMemoryStorage >>> from ZODB.tests.test_storage import MinimalMemoryStorage
>>> from ZODB import DB >>> from ZODB import DB
>>> db = DB(MinimalMemoryStorage()) >>> st = MinimalMemoryStorage()
>>> db = DB(st)
We will use two different connections with different transaction managers We will use two different connections with different transaction managers
to make sure that the connections act independently, even though they'll to make sure that the connections act independently, even though they'll
...@@ -59,6 +67,10 @@ Now open a second connection. ...@@ -59,6 +67,10 @@ Now open a second connection.
>>> tm2 = transaction.TransactionManager() >>> tm2 = transaction.TransactionManager()
>>> cn2 = db.open(transaction_manager=tm2) >>> cn2 = db.open(transaction_manager=tm2)
>>> from ZODB.utils import p64, u64
>>> cn2._txn_time == p64(u64(st.lastTransaction()) + 1)
True
>>> txn_time2 = cn2._txn_time
Connection high-water mark Connection high-water mark
-------------------------- --------------------------
...@@ -67,22 +79,20 @@ The ZODB Connection tracks a transaction high-water mark, which ...@@ -67,22 +79,20 @@ The ZODB Connection tracks a transaction high-water mark, which
bounds the latest transaction id that can be read by the current bounds the latest transaction id that can be read by the current
transaction and still present a consistent view of the database. transaction and still present a consistent view of the database.
Transactions with ids up to but not including the high-water mark Transactions with ids up to but not including the high-water mark
are OK to read. When a transaction commits, the database sends are OK to read. At the beginning of a transaction, a connection
invalidations to all the other connections; the invalidation contains sets the high-water mark to just over the last transaction time the
the transaction id and the oids of modified objects. The Connection storage has seen.
stores the high-water mark in _txn_time, which is set to None until
an invalidation arrives.
>>> cn = db.open() >>> cn = db.open()
>>> print(cn._txn_time) >>> cn._txn_time == p64(u64(st.lastTransaction()) + 1)
None True
>>> cn.invalidate(100, dict.fromkeys([1, 2])) >>> cn.db().invalidate(p64(100), dict.fromkeys([1, 2]))
>>> cn._txn_time >>> cn._txn_time == p64(u64(st.lastTransaction()) + 1)
100 True
>>> cn.invalidate(200, dict.fromkeys([1, 2])) >>> cn.db().invalidate(p64(200), dict.fromkeys([1, 2]))
>>> cn._txn_time >>> cn._txn_time == p64(u64(st.lastTransaction()) + 1)
100 True
A connection's high-water mark is set to the transaction id taken from A connection's high-water mark is set to the transaction id taken from
the first invalidation processed by the connection. Transaction ids are the first invalidation processed by the connection. Transaction ids are
...@@ -95,8 +105,8 @@ but that doesn't work unless an object is modified. sync() will abort ...@@ -95,8 +105,8 @@ but that doesn't work unless an object is modified. sync() will abort
a transaction and process invalidations. a transaction and process invalidations.
>>> cn.sync() >>> cn.sync()
>>> print(cn._txn_time) # the high-water mark got reset to None >>> cn._txn_time == p64(u64(st.lastTransaction()) + 1)
None True
Basic functionality Basic functionality
------------------- -------------------
...@@ -109,9 +119,9 @@ will modify "a." The other transaction will then modify "b" and commit. ...@@ -109,9 +119,9 @@ will modify "a." The other transaction will then modify "b" and commit.
>>> tm1.get().commit() >>> tm1.get().commit()
>>> txn = db.lastTransaction() >>> txn = db.lastTransaction()
The second connection has its high-water mark set now. The second connection already has its high-water mark set.
>>> cn2._txn_time == txn >>> cn2._txn_time == txn_time2
True True
It is safe to read "b," because it was not modified by the concurrent It is safe to read "b," because it was not modified by the concurrent
...@@ -153,22 +163,23 @@ ConflictError: database conflict error (oid 0x01, class ZODB.tests.MinPO.MinPO) ...@@ -153,22 +163,23 @@ ConflictError: database conflict error (oid 0x01, class ZODB.tests.MinPO.MinPO)
This example will demonstrate that we can commit a transaction if we only This example will demonstrate that we can commit a transaction if we only
modify current revisions. modify current revisions.
>>> print(cn2._txn_time) >>> cn2._txn_time == p64(u64(st.lastTransaction()) + 1)
None True
>>> txn_time2 = cn2._txn_time
>>> r1 = cn1.root() >>> r1 = cn1.root()
>>> r1["a"].value = 3 >>> r1["a"].value = 3
>>> tm1.get().commit() >>> tm1.get().commit()
>>> txn = db.lastTransaction() >>> txn = db.lastTransaction()
>>> cn2._txn_time == txn >>> cn2._txn_time == txn_time2
True True
>>> r2["b"].value = r2["a"].value + 1 >>> r2["b"].value = r2["a"].value + 1
>>> r2["b"].value >>> r2["b"].value
3 3
>>> tm2.get().commit() >>> tm2.get().commit()
>>> print(cn2._txn_time) >>> cn2._txn_time == p64(u64(st.lastTransaction()) + 1)
None True
Object cache Object cache
------------ ------------
...@@ -302,22 +313,18 @@ same things now. ...@@ -302,22 +313,18 @@ same things now.
>>> r2["a"].value, r2["b"].value >>> r2["a"].value, r2["b"].value
(42, 43) (42, 43)
>>> db.close()
Late invalidation Late invalidation
----------------- -----------------
The combination of ZEO and MVCC adds more complexity. Since The combination of ZEO and MVCC used to add more complexity. That's
invalidations are delivered asynchronously by ZEO, it is possible for why ZODB no-longer calls load. :)
an invalidation to arrive just after a request to load the invalidated
object is sent. The connection can't use the just-loaded data,
because the invalidation arrived first. The complexity for MVCC is
that it must check for invalidated objects after it has loaded them,
just in case.
Rather than add all the complexity of ZEO to these tests, the Rather than add all the complexity of ZEO to these tests, the
MinimalMemoryStorage has a hook. We'll write a subclass that will MinimalMemoryStorage has a hook. We'll write a subclass that will
deliver an invalidation when it loads an object. The hook allows us deliver an invalidation when it loads (or loadBefore's) an object.
to test the Connection code. The hook allows us to test the Connection code.
>>> class TestStorage(MinimalMemoryStorage): >>> class TestStorage(MinimalMemoryStorage):
... def __init__(self): ... def __init__(self):
...@@ -351,6 +358,12 @@ non-current revision to load. ...@@ -351,6 +358,12 @@ non-current revision to load.
>>> oid = r1["b"]._p_oid >>> oid = r1["b"]._p_oid
>>> ts.hooked[oid] = 1 >>> ts.hooked[oid] = 1
This test is kinda screwy because it depends on an old approach that
has changed. We'll hack the _txn_time to get the original expected
result, even though what's going on now is much simpler.
>>> cn1._txn_time = ts.lastTransaction()
Once the oid is hooked, an invalidation will be delivered the next Once the oid is hooked, an invalidation will be delivered the next
time it is activated. The code below activates the object, then time it is activated. The code below activates the object, then
confirms that the hook worked and that the old state was retrieved. confirms that the hook worked and that the old state was retrieved.
...@@ -395,14 +408,12 @@ section above, this is no older state to retrieve. ...@@ -395,14 +408,12 @@ section above, this is no older state to retrieve.
False False
>>> r1["b"]._p_state >>> r1["b"]._p_state
-1 -1
>>> r1["b"]._p_activate()
>>> cn1._txn_time = st.lastTransaction()
>>> r1["b"]._p_activate() # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
... ...
ReadConflictError: database read conflict error (oid 0x02, class ZODB.tests.MinPO.MinPO) ReadConflictError: ...
>>> oid in cn1._invalidated
True
>>> ts.count
1
""" """
import doctest import doctest
import re import re
......
...@@ -61,6 +61,7 @@ checker = renormalizing.RENormalizing([ ...@@ -61,6 +61,7 @@ checker = renormalizing.RENormalizing([
]) ])
def setUp(test, name='test'): def setUp(test, name='test'):
clear_transaction_syncs()
transaction.abort() transaction.abort()
d = tempfile.mkdtemp(prefix=name) d = tempfile.mkdtemp(prefix=name)
zope.testing.setupstack.register(test, zope.testing.setupstack.rmtree, d) zope.testing.setupstack.register(test, zope.testing.setupstack.rmtree, d)
...@@ -71,7 +72,9 @@ def setUp(test, name='test'): ...@@ -71,7 +72,9 @@ def setUp(test, name='test'):
os.chdir(d) os.chdir(d)
zope.testing.setupstack.register(test, transaction.abort) zope.testing.setupstack.register(test, transaction.abort)
tearDown = zope.testing.setupstack.tearDown def tearDown(test):
clear_transaction_syncs()
zope.testing.setupstack.tearDown(test)
class TestCase(unittest.TestCase): class TestCase(unittest.TestCase):
...@@ -186,3 +189,18 @@ def mess_with_time(test=None, globs=None, now=1278864701.5): ...@@ -186,3 +189,18 @@ def mess_with_time(test=None, globs=None, now=1278864701.5):
time.time = staticmethod(faux_time) # jython time.time = staticmethod(faux_time) # jython
else: else:
time.time = faux_time time.time = faux_time
def clear_transaction_syncs():
"""Clear data managers registered with the global transaction manager
Many tests don't clean up synchronizer's registered with the
global transaction managers, which can wreak havoc with following
tests, now that connections interact with their storages at
transaction boundaries. We need to make sure that we clear any
registered data managers.
For now, we'll use the transaction manager's
underware. Eventually, an transaction managers need to grow an API
for this.
"""
transaction.manager.clearSynchs()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment