Commit 420984b7 authored by Julien Muchembled's avatar Julien Muchembled

Drop old commit protocol

parent 683f7faf
......@@ -28,8 +28,6 @@ from pickle import PicklingError
logger = logging.getLogger('ZODB.ConflictResolution')
ResolvedSerial = b'rs' # deprecated: see IMultiCommitStorage.tpc_vote
class BadClassName(Exception):
pass
......
......@@ -37,7 +37,6 @@ import transaction
import ZODB
from ZODB.blob import SAVEPOINT_SUFFIX
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
......@@ -589,27 +588,13 @@ class Connection(ExportImport, object):
self._cache.update_object_size_estimation(oid, len(p))
obj._p_estimated_size = len(p)
self._handle_serial(oid, s)
def _handle_serial(self, oid, serial=ResolvedSerial, change=True):
# if we write an object, we don't want to check if it was read
# while current. This is a convenient choke point to do this.
self._readCurrent.pop(oid, None)
if not serial:
return
assert isinstance(serial, bytes), serial
obj = self._cache.get(oid, None)
if obj is None:
return
if serial == ResolvedSerial:
del obj._p_changed # transition from changed to ghost
else:
self._warn_about_returned_serial()
if change:
# if we write an object, we don't want to check if it was read
# while current. This is a convenient choke point to do this.
self._readCurrent.pop(oid, None)
if s:
# savepoint
obj._p_changed = 0 # transition from changed to up-to-date
obj._p_serial = serial
obj._p_serial = s
def tpc_abort(self, transaction):
if self._import:
......@@ -674,46 +659,27 @@ class Connection(ExportImport, object):
if v.oid:
self._cache.invalidate(v.oid)
raise
if s:
if type(next(iter(s))) is bytes:
for oid in s:
self._handle_serial(oid)
return
self._warn_about_returned_serial()
for oid, serial in s:
self._handle_serial(oid, serial)
# Resolved conflicts.
for oid in s:
obj = self._cache.get(oid)
if obj is not None:
del obj._p_changed # transition from changed to ghost
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done.
"""
serial = self._storage.tpc_finish(transaction)
if serial is not None:
assert type(serial) is bytes, repr(serial)
for oid_iterator in self._modified, self._creating:
for oid in oid_iterator:
obj = self._cache.get(oid)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
else:
self._warn_about_returned_serial()
assert type(serial) is bytes, repr(serial)
for oid_iterator in self._modified, self._creating:
for oid in oid_iterator:
obj = self._cache.get(oid)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
self._tpc_cleanup()
def _warn_about_returned_serial(self):
# Do not warn about own implementations of ZODB.
# We're aware and the user can't do anything about it.
if self._normal_storage.__module__.startswith("_ZODB."):
self._warn_about_returned_serial = lambda: None
else:
warnings.warn(
"In ZODB 5+, the new API for the returned value of"
" store/tpc_vote/tpc_finish will be mandatory."
" See IStorage for more information.",
DeprecationWarning, 2)
Connection._warn_about_returned_serial = lambda self: None
def sortKey(self):
"""Return a consistent sort key for this connection."""
return "%s:%s" % (self._storage.sortKey(), id(self))
......@@ -1033,7 +999,7 @@ class Connection(ExportImport, object):
obj._p_estimated_size = len(data)
if isinstance(self._reader.getGhost(data), Blob):
blobfilename = src.loadBlob(oid, serial)
s = self._storage.storeBlob(
self._storage.storeBlob(
oid, serial, data, blobfilename,
'', transaction)
# we invalidate the object here in order to ensure
......@@ -1042,10 +1008,9 @@ class Connection(ExportImport, object):
# to be reattached "cleanly"
self._cache.invalidate(oid)
else:
s = self._storage.store(oid, serial, data,
'', transaction)
self._storage.store(oid, serial, data, '', transaction)
self._handle_serial(oid, s, change=False)
self._readCurrent.pop(oid, None) # same as in _store_objects()
finally:
src.close()
......
......@@ -709,9 +709,8 @@ class BlobStorageMixin(object):
transaction):
"""Stores data that has a BLOB attached."""
assert not version, "Versions aren't supported."
serial = self.store(oid, oldserial, data, '', transaction)
self.store(oid, oldserial, data, '', transaction)
self._blob_storeblob(oid, self._tid, blobfilename)
return serial
def temporaryDirectory(self):
return self.fshelper.temp_dir
......
......@@ -653,23 +653,6 @@ class IStorage(Interface):
A transaction object. This should match the current
transaction for the storage, set by tpc_begin.
The new serial for the object is returned, but not necessarily
immediately. It may be returned directly, or on a subsequent
store or tpc_vote call.
The return value may be:
- None, or
- A new serial (string) for the object
If None is returned, then a new serial (or other special
values) must ve returned in tpc_vote results.
A serial, returned as a string, may be the special value
ZODB.ConflictResolution.ResolvedSerial to indicate that a
conflict occured and that the object should be invalidated.
Several different exceptions may be raised when an error occurs.
ConflictError
......@@ -739,18 +722,8 @@ class IStorage(Interface):
without an error, then there must not be an error if
tpc_finish or tpc_abort is called subsequently.
The return value can be None or a sequence of object-id
and serial pairs giving new serials for objects whose ids were
passed to previous store calls in the same transaction. The serial
can be the special value ZODB.ConflictResolution.ResolvedSerial to
indicate that a conflict occurred and that the object should be
invalidated.
The return value can also be a sequence of object ids, as
described in IMultiCommitStorage.tpc_vote.
After the tpc_vote call, all solved conflicts must have been notified,
either from tpc_vote or store for objects passed to store.
The return value can be None or a sequence of a sequence of object ids,
as described in IMultiCommitStorage.tpc_vote.
"""
......@@ -790,7 +763,7 @@ class IMultiCommitStorage(IStorage):
def tpc_vote(transaction):
"""Provide a storage with an opportunity to veto a transaction
See IStorage.store. For objects implementing this interface,
See IStorage.tpc_vote. For objects implementing this interface,
the return value can be either None or a sequence of oids for which
a conflict was resolved.
"""
......@@ -1236,24 +1209,6 @@ class IBlobStorage(Interface):
(or copy and remove it) immediately, or at transaction-commit
time. The file must not be open.
The new serial for the object is returned, but not necessarily
immediately. It may be returned directly, or on a subsequent
store or tpc_vote call.
The return value may be:
- None
- A new serial (string) for the object, or
- An iterable of object-id and serial pairs giving new serials
for objects.
A serial, returned as a string or in a sequence of oid/serial
pairs, may be the special value
ZODB.ConflictResolution.ResolvedSerial to indicate that a
conflict occured and that the object should be invalidated.
Several different exceptions may be raised when an error occurs.
ConflictError
......
"""Adapt non-IMultiCommitStorage storages to IMultiCommitStorage
"""
import zope.interface
from .ConflictResolution import ResolvedSerial
class MultiCommitAdapter:
def __init__(self, storage):
self._storage = storage
ifaces = zope.interface.providedBy(storage)
zope.interface.alsoProvides(self, ifaces)
self._resolved = set() # {OID}, here to make linters happy
def __getattr__(self, name):
v = getattr(self._storage, name)
self.__dict__[name] = v
return v
def tpc_begin(self, *args):
self._storage.tpc_begin(*args)
self._resolved = set()
def store(self, oid, *args):
if self._storage.store(oid, *args) == ResolvedSerial:
self._resolved.add(oid)
def storeBlob(self, oid, *args):
s = self._storage.storeBlob(oid, *args)
if s:
if isinstance(s, bytes):
s = ((oid, s), )
for oid, serial in s:
if s == ResolvedSerial:
self._resolved.add(oid)
def undo(self, transaction_id, transaction):
r = self._storage.undo(transaction_id, transaction)
if r:
self._resolved.update(r[1])
def tpc_vote(self, *args):
s = self._storage.tpc_vote(*args)
for (oid, serial) in (s or ()):
if serial == ResolvedSerial:
self._resolved.add(oid)
return self._resolved
def tpc_finish(self, transaction, f=lambda tid: None):
t = []
def func(tid):
t.append(tid)
f(tid)
self._storage.tpc_finish(transaction, func)
return t[0]
def __len__(self):
return len(self._storage)
......@@ -163,15 +163,13 @@ class MVCCAdapterInstance(Base):
self._modified = set()
def store(self, oid, serial, data, version, transaction):
s = self._storage.store(oid, serial, data, version, transaction)
self._storage.store(oid, serial, data, version, transaction)
self._modified.add(oid)
return s
def storeBlob(self, oid, serial, data, blobfilename, version, transaction):
s = self._storage.storeBlob(
self._storage.storeBlob(
oid, serial, data, blobfilename, '', transaction)
self._modified.add(oid)
return s
def tpc_finish(self, transaction, func = lambda tid: None):
modified = self._modified
......@@ -253,11 +251,7 @@ class UndoAdapterInstance(Base):
def tpc_vote(self, transaction):
result = self._storage.tpc_vote(transaction)
if result:
if isinstance(next(iter(result)), bytes):
self._undone.update(result)
else:
for oid, _ in result:
self._undone.add(oid)
self._undone.update(result)
def tpc_finish(self, transaction, func = lambda tid: None):
......
......@@ -175,11 +175,9 @@ class StorageTestBase(ZODB.tests.util.TestCase):
self._storage.tpc_begin(t)
undo_result = self._storage.undo(tid, t)
vote_result = self._storage.tpc_vote(t)
serial = self._storage.tpc_finish(t)
if expected_oids is not None and serial is None:
oids = list(undo_result[1]) if undo_result else []
oids.extend(oid for (oid, _) in vote_result or ())
self.assertEqual(len(oids), len(expected_oids), repr(oids))
for oid in expected_oids:
self.assertTrue(oid in oids)
return self._storage.lastTransaction()
if expected_oids is not None:
oids = set(undo_result[1]) if undo_result else set()
if vote_result:
oids.update(vote_result)
self.assertEqual(oids, set(expected_oids))
return self._storage.tpc_finish(t)
......@@ -52,46 +52,13 @@ def listeq(L1, L2):
class TransactionalUndoStorage:
def _transaction_begin(self):
self.__serials = {}
def _transaction_store(self, oid, rev, data, vers, trans):
r = self._storage.store(oid, rev, data, vers, trans)
if r:
if isinstance(r, bytes):
self.__serials[oid] = r
else:
for oid, serial in r:
self.__serials[oid] = serial
def _transaction_vote(self, trans):
r = self._storage.tpc_vote(trans)
if r:
for oid, serial in r:
self.__serials[oid] = serial
def _transaction_newserial(self, oid):
return self.__serials[oid]
def _transaction_finish(self, t, oid_list):
tid = self._storage.tpc_finish(t)
if tid is not None:
for oid in oid_list:
self.__serials[oid] = tid
def _multi_obj_transaction(self, objs):
newrevs = {}
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
for oid, rev, data in objs:
self._transaction_store(oid, rev, data, '', t)
newrevs[oid] = None
self._transaction_vote(t)
self._transaction_finish(t, [x[0] for x in objs])
for oid in newrevs.keys():
newrevs[oid] = self._transaction_newserial(oid)
return newrevs
self._storage.store(oid, rev, data, '', t)
self._storage.tpc_vote(t)
return self._storage.tpc_finish(t)
def _iterate(self):
"""Iterate over the storage in its final state."""
......@@ -106,22 +73,18 @@ class TransactionalUndoStorage:
def _begin_undos_vote(self, t, *tids):
self._storage.tpc_begin(t)
oids = []
oids = set()
for tid in tids:
undo_result = self._storage.undo(tid, t)
if undo_result:
oids.extend(undo_result[1])
v = self._storage.tpc_vote(t)
if v:
if isinstance(next(iter(v)), bytes):
oids.extend(v)
else:
oids.extend(oid for (oid, _) in v)
oids.update(undo_result[1])
oids.update(self._storage.tpc_vote(t) or ())
return oids
def undo(self, tid, note):
def undo(self, tid, note=None):
t = Transaction()
t.note(note)
if note is not None:
t.note(note)
oids = self._begin_undos_vote(t, tid)
self._storage.tpc_finish(t)
return oids
......@@ -165,10 +128,7 @@ class TransactionalUndoStorage:
# undo its creation
info = self._storage.undoInfo()
tid = info[0]['id']
t = Transaction()
t.note('undo1')
self._begin_undos_vote(t, tid)
self._storage.tpc_finish(t)
self.undo(tid, 'undo1')
# Check that calling getTid on an uncreated object raises a KeyError
# The current version of FileStorage fails this test
self.assertRaises(KeyError, self._storage.getTid, oid)
......@@ -224,27 +184,19 @@ class TransactionalUndoStorage:
# Store two objects in the same transaction
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p31, '', t)
self._transaction_store(oid2, revid2, p51, '', t)
self._storage.store(oid1, revid1, p31, '', t)
self._storage.store(oid2, revid2, p51, '', t)
# Finish the transaction
self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
self._storage.tpc_vote(t)
tid = self._storage.tpc_finish(t)
# Update those same two objects
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p32, '', t)
self._transaction_store(oid2, revid2, p52, '', t)
self._storage.store(oid1, tid, p32, '', t)
self._storage.store(oid2, tid, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# Make sure the objects have the current value
data, revid1 = load_current(self._storage, oid1)
eq(zodb_unpickle(data), MinPO(32))
......@@ -269,25 +221,18 @@ class TransactionalUndoStorage:
(30, 31, 32, 50, 51, 52)))
oid1 = self._storage.new_oid()
oid2 = self._storage.new_oid()
revid1 = revid2 = ZERO
# Store two objects in the same transaction
d = self._multi_obj_transaction([(oid1, revid1, p30),
(oid2, revid2, p50),
])
eq(d[oid1], d[oid2])
tid = self._multi_obj_transaction([(oid1, ZERO, p30),
(oid2, ZERO, p50),
])
# Update those same two objects
d = self._multi_obj_transaction([(oid1, d[oid1], p31),
(oid2, d[oid2], p51),
])
eq(d[oid1], d[oid2])
tid = self._multi_obj_transaction([(oid1, tid, p31),
(oid2, tid, p51),
])
# Update those same two objects
d = self._multi_obj_transaction([(oid1, d[oid1], p32),
(oid2, d[oid2], p52),
])
eq(d[oid1], d[oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
tid = self._multi_obj_transaction([(oid1, tid, p32),
(oid2, tid, p52),
])
# Make sure the objects have the current value
data, revid1 = load_current(self._storage, oid1)
eq(zodb_unpickle(data), MinPO(32))
......@@ -303,7 +248,7 @@ class TransactionalUndoStorage:
# We may get the finalization stuff called an extra time,
# depending on the implementation.
if serial is None:
self.assertEqual(set(oids), {oid1, oid2})
self.assertEqual(oids, {oid1, oid2})
data, revid1 = load_current(self._storage, oid1)
eq(zodb_unpickle(data), MinPO(30))
data, revid2 = load_current(self._storage, oid2)
......@@ -332,15 +277,11 @@ class TransactionalUndoStorage:
# Update those same two objects
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p32, '', t)
self._transaction_store(oid2, revid2, p52, '', t)
self._storage.store(oid1, revid1, p32, '', t)
self._storage.store(oid2, revid2, p52, '', t)
# Finish the transaction
self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
self._undo(info[0]["id"], [oid1, oid2])
......@@ -352,28 +293,17 @@ class TransactionalUndoStorage:
# one object.
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p33, '', t)
self._transaction_store(oid2, revid2, p53, '', t)
self._storage.store(oid1, revid1, p33, '', t)
self._storage.store(oid2, revid2, p53, '', t)
# Finish the transaction
self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
self._storage.tpc_vote(t)
tid = self._storage.tpc_finish(t)
# Update in different transactions
revid1 = self._dostore(oid1, revid=revid1, data=MinPO(34))
revid2 = self._dostore(oid2, revid=revid2, data=MinPO(54))
revid1 = self._dostore(oid1, revid=tid, data=MinPO(34))
revid2 = self._dostore(oid2, revid=tid, data=MinPO(54))
# Now attempt to undo the transaction containing two objects
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
oids = self._begin_undos_vote(t, tid)
serial = self._storage.tpc_finish(t)
if serial is None:
eq(len(oids), 1)
self.assertTrue(oid1 in oids)
self.assertTrue(not oid2 in oids)
self.undo(info[1]['id'])
data, revid1 = load_current(self._storage, oid1)
eq(zodb_unpickle(data), MinPO(33))
data, revid2 = load_current(self._storage, oid2)
......@@ -406,25 +336,20 @@ class TransactionalUndoStorage:
t = Transaction()
self._storage.tpc_begin(t)
self._transaction_begin()
self._transaction_store(oid1, revid1, p81, '', t)
self._transaction_store(oid2, revid2, p91, '', t)
self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2)
self._storage.store(oid1, revid1, p81, '', t)
self._storage.store(oid2, revid2, p91, '', t)
self._storage.tpc_vote(t)
tid = self._storage.tpc_finish(t)
# Make sure the objects have the expected values
data, revid_11 = load_current(self._storage, oid1)
eq(zodb_unpickle(data), MinPO(81))
data, revid_22 = load_current(self._storage, oid2)
eq(zodb_unpickle(data), MinPO(91))
eq(revid_11, revid1)
eq(revid_22, revid2)
eq(revid_11, tid)
eq(revid_22, tid)
# Now modify oid2
revid2 = self._dostore(oid2, revid=revid2, data=MinPO(92))
self.assertNotEqual(revid1, revid2)
self.assertNotEqual(revid2, revid_22)
revid2 = self._dostore(oid2, tid, MinPO(92))
self.assertNotEqual(tid, revid2)
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
......@@ -468,11 +393,8 @@ class TransactionalUndoStorage:
info2 = self._storage.undoInfo()
self.assertEqual(len(info2), 2)
# And now attempt to undo the last transaction
t = Transaction()
oids = self._begin_undos_vote(t, tid)
self._storage.tpc_finish(t)
self.assertEqual(len(oids), 1)
self.assertEqual(oids[0], oid)
undone, = self.undo(tid)
self.assertEqual(undone, oid)
data, revid = load_current(self._storage, oid)
# The object must now be at the second state
self.assertEqual(zodb_unpickle(data), MinPO(52))
......@@ -805,8 +727,9 @@ class TransactionalUndoStorage:
from .ConflictResolution import PCounter
db = DB(self._storage)
with db.transaction() as conn:
conn.root.x = PCounter()
cn = db.open()
cn.root.x = PCounter()
transaction.commit()
for i in range(4):
with db.transaction() as conn:
......@@ -815,10 +738,13 @@ class TransactionalUndoStorage:
ids = [l['id'] for l in db.undoLog(1, 3)]
if reverse:
ids = list(reversed(ids))
ids.reverse()
db.undoMultiple(ids)
transaction.commit()
self.assertEqual(cn.root.x._value, 2)
cn.close()
def checkUndoMultipleConflictResolutionReversed(self):
self.checkUndoMultipleConflictResolution(True)
......@@ -1275,6 +1275,7 @@ class StubStorage:
del self._transaction
self._transdata.clear()
self._transstored = []
return z64
def load(self, oid, version=''):
if version != '':
......@@ -1295,9 +1296,6 @@ class StubStorage:
self._stored.append(oid)
self._transstored.append(oid)
self._transdata[oid] = (p, serial)
# Explicitly returning None, as we're not pretending to be a ZEO
# storage
return None
def lastTransaction(self):
return z64
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment