Commit 568aa533 authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #69 from zopefoundation/tpc_finish

Allow serial to be returned as late as tpc_finish
parents f46359eb caea03ca
...@@ -28,7 +28,7 @@ from pickle import PicklingError ...@@ -28,7 +28,7 @@ from pickle import PicklingError
logger = logging.getLogger('ZODB.ConflictResolution') logger = logging.getLogger('ZODB.ConflictResolution')
ResolvedSerial = b'rs' ResolvedSerial = b'rs' # deprecated: store/tpc_finish should just use True
class BadClassName(Exception): class BadClassName(Exception):
pass pass
......
...@@ -705,7 +705,7 @@ class Connection(ExportImport, object): ...@@ -705,7 +705,7 @@ class Connection(ExportImport, object):
self._handle_serial(oid, s) self._handle_serial(oid, s)
def _handle_serial(self, oid, serial, change=True): def _handle_serial(self, oid, serial=True, change=True):
# if we write an object, we don't want to check if it was read # if we write an object, we don't want to check if it was read
# while current. This is a convenient choke point to do this. # while current. This is a convenient choke point to do this.
...@@ -713,7 +713,9 @@ class Connection(ExportImport, object): ...@@ -713,7 +713,9 @@ class Connection(ExportImport, object):
if not serial: if not serial:
return return
if not isinstance(serial, bytes): if serial is True:
serial = ResolvedSerial
elif not isinstance(serial, bytes):
raise serial raise serial
obj = self._cache.get(oid, None) obj = self._cache.get(oid, None)
if obj is None: if obj is None:
...@@ -721,6 +723,7 @@ class Connection(ExportImport, object): ...@@ -721,6 +723,7 @@ class Connection(ExportImport, object):
if serial == ResolvedSerial: if serial == ResolvedSerial:
del obj._p_changed # transition from changed to ghost del obj._p_changed # transition from changed to ghost
else: else:
self._warn_about_returned_serial()
if change: if change:
obj._p_changed = 0 # transition from changed to up-to-date obj._p_changed = 0 # transition from changed to up-to-date
obj._p_serial = serial obj._p_serial = serial
...@@ -790,6 +793,11 @@ class Connection(ExportImport, object): ...@@ -790,6 +793,11 @@ class Connection(ExportImport, object):
raise raise
if s: if s:
if type(s[0]) is bytes:
for oid in s:
self._handle_serial(oid)
return
self._warn_about_returned_serial()
for oid, serial in s: for oid, serial in s:
self._handle_serial(oid, serial) self._handle_serial(oid, serial)
...@@ -808,9 +816,33 @@ class Connection(ExportImport, object): ...@@ -808,9 +816,33 @@ class Connection(ExportImport, object):
# to be able to read any updated data until we've had a chance # to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other # to send an invalidation message to all of the other
# connections! # connections!
self._storage.tpc_finish(transaction, callback) serial = self._storage.tpc_finish(transaction, callback)
if serial is not None:
assert type(serial) is bytes, repr(serial)
for oid_iterator in self._modified, self._creating:
for oid in oid_iterator:
obj = self._cache.get(oid)
# Ignore missing objects and don't update ghosts.
if obj is not None and obj._p_changed is not None:
obj._p_changed = 0
obj._p_serial = serial
else:
self._warn_about_returned_serial()
self._tpc_cleanup() self._tpc_cleanup()
def _warn_about_returned_serial(self):
# Do not warn about own implementations of ZODB.
# We're aware and the user can't do anything about it.
if self._normal_storage.__module__.startswith("_ZODB."):
self._warn_about_returned_serial = lambda: None
else:
warnings.warn(
"In ZODB 5+, the new API for the returned value of"
" store/tpc_vote/tpc_finish will be mandatory."
" See IStorage for more information.",
DeprecationWarning, 2)
Connection._warn_about_returned_serial = lambda self: None
def sortKey(self): def sortKey(self):
"""Return a consistent sort key for this connection.""" """Return a consistent sort key for this connection."""
return "%s:%s" % (self._storage.sortKey(), id(self)) return "%s:%s" % (self._storage.sortKey(), id(self))
......
...@@ -776,6 +776,12 @@ class IStorage(Interface): ...@@ -776,6 +776,12 @@ class IStorage(Interface):
called while the storage transaction lock is held. It takes called while the storage transaction lock is held. It takes
the new transaction id generated by the transaction. the new transaction id generated by the transaction.
The return value must be the committed tid. It is used to set the
serial for objects whose ids were passed to previous store calls
in the same transaction.
For compatibility, the return value can also be None, in which case
store/tpc_vote must return the serial of stored objects.
""" """
def tpc_vote(transaction): def tpc_vote(transaction):
...@@ -791,17 +797,18 @@ class IStorage(Interface): ...@@ -791,17 +797,18 @@ class IStorage(Interface):
without an error, then there must not be an error if without an error, then there must not be an error if
tpc_finish or tpc_abort is called subsequently. tpc_finish or tpc_abort is called subsequently.
The return value can be either None or a sequence of object-id The return value can be either None or a sequence of oids for which
and serial pairs giving new serials for objects who's ids were a conflict was resolved.
passed to previous store calls in the same transaction.
After the tpc_vote call, new serials must have been returned,
either from tpc_vote or store for objects passed to store.
A serial returned in a sequence of oid/serial pairs, may be For compatibility, the return value can also be a sequence of object-id
the special value ZODB.ConflictResolution.ResolvedSerial to and serial pairs giving new serials for objects whose ids were
indicate that a conflict occured and that the object should be passed to previous store calls in the same transaction. The serial
can be the special value ZODB.ConflictResolution.ResolvedSerial to
indicate that a conflict occurred and that the object should be
invalidated. invalidated.
After the tpc_vote call, all solved conflicts must have been notified,
either from tpc_vote or store for objects passed to store.
""" """
......
...@@ -69,8 +69,10 @@ class BasicStorage: ...@@ -69,8 +69,10 @@ class BasicStorage:
r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)), r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
'', txn) '', txn)
r2 = self._storage.tpc_vote(txn) r2 = self._storage.tpc_vote(txn)
self._storage.tpc_finish(txn) serial = self._storage.tpc_finish(txn)
newrevid = handle_serials(oid, r1, r2) newrevid = handle_serials(oid, r1, r2)
if newrevid is None and serial is not None:
newrevid = serial
data, revid = self._storage.load(oid, '') data, revid = self._storage.load(oid, '')
value = zodb_unpickle(data) value = zodb_unpickle(data)
eq(value, MinPO(11)) eq(value, MinPO(11))
......
...@@ -13,9 +13,10 @@ ...@@ -13,9 +13,10 @@
############################################################################## ##############################################################################
"""Tests for application-level conflict resolution.""" """Tests for application-level conflict resolution."""
from ZODB import DB
from ZODB.POSException import ConflictError, UndoError from ZODB.POSException import ConflictError, UndoError
from persistent import Persistent from persistent import Persistent
from transaction import Transaction from transaction import Transaction, TransactionManager
from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
...@@ -26,8 +27,8 @@ class PCounter(Persistent): ...@@ -26,8 +27,8 @@ class PCounter(Persistent):
def __repr__(self): def __repr__(self):
return "<PCounter %d>" % self._value return "<PCounter %d>" % self._value
def inc(self): def inc(self, n=1):
self._value = self._value + 1 self._value = self._value + n
def _p_resolveConflict(self, oldState, savedState, newState): def _p_resolveConflict(self, oldState, savedState, newState):
savedDiff = savedState['_value'] - oldState['_value'] savedDiff = savedState['_value'] - oldState['_value']
...@@ -55,46 +56,38 @@ class PCounter4(PCounter): ...@@ -55,46 +56,38 @@ class PCounter4(PCounter):
class ConflictResolvingStorage: class ConflictResolvingStorage:
def checkResolve(self): def checkResolve(self, resolvable=True):
obj = PCounter() db = DB(self._storage)
obj.inc()
oid = self._storage.new_oid()
revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))
obj.inc()
obj.inc()
# The effect of committing two transactions with the same
# pickle is to commit two different transactions relative to
# revid1 that add two to _value.
revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
revid3 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
data, serialno = self._storage.load(oid, '')
inst = zodb_unpickle(data)
self.assertEqual(inst._value, 5)
def checkUnresolvable(self):
obj = PCounter2()
obj.inc()
oid = self._storage.new_oid() t1 = TransactionManager()
c1 = db.open(t1)
o1 = c1.root()['p'] = (PCounter if resolvable else PCounter2)()
o1.inc()
t1.commit()
revid1 = self._dostoreNP(oid, data=zodb_pickle(obj)) t2 = TransactionManager()
c2 = db.open(t2)
o2 = c2.root()['p']
o2.inc(2)
t2.commit()
obj.inc() o1.inc(3)
obj.inc()
# The effect of committing two transactions with the same
# pickle is to commit two different transactions relative to
# revid1 that add two to _value.
revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
try: try:
self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj)) t1.commit()
except ConflictError as err: except ConflictError as err:
self.assertTrue("PCounter2" in str(err)) self.assertIn(".PCounter2,", str(err))
self.assertEqual(o1._value, 3)
else: else:
self.fail("Expected ConflictError") self.assertTrue(resolvable, "Expected ConflictError")
self.assertEqual(o1._value, 6)
t2.begin()
self.assertEqual(o2._value, o1._value)
db.close()
def checkUnresolvable(self):
self.checkResolve(False)
def checkZClassesArentResolved(self): def checkZClassesArentResolved(self):
from ZODB.ConflictResolution import find_global, BadClassName from ZODB.ConflictResolution import find_global, BadClassName
......
...@@ -152,10 +152,12 @@ class StorageClientThread(TestThread): ...@@ -152,10 +152,12 @@ class StorageClientThread(TestThread):
r2 = self.storage.tpc_vote(t) r2 = self.storage.tpc_vote(t)
self.pause() self.pause()
self.storage.tpc_finish(t) serial = self.storage.tpc_finish(t)
self.pause() self.pause()
revid = handle_serials(oid, r1, r2) revid = handle_serials(oid, r1, r2)
if serial is not None and revid is None:
revid = serial
self.oids[oid] = revid self.oids[oid] = revid
class ExtStorageClientThread(StorageClientThread): class ExtStorageClientThread(StorageClientThread):
......
...@@ -150,10 +150,12 @@ class RevisionStorage: ...@@ -150,10 +150,12 @@ class RevisionStorage:
# Finish the transaction # Finish the transaction
r2 = self._storage.tpc_vote(t) r2 = self._storage.tpc_vote(t)
newrevid = handle_serials(oid, r1, r2) newrevid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t) serial = self._storage.tpc_finish(t)
except: except:
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
raise raise
if serial is not None and newrevid is None:
newrevid = serial
return newrevid return newrevid
revid1 = helper(1, None, 1) revid1 = helper(1, None, 1)
revid2 = helper(2, revid1, 2) revid2 = helper(2, revid1, 2)
......
...@@ -132,7 +132,7 @@ def handle_serials(oid, *args): ...@@ -132,7 +132,7 @@ def handle_serials(oid, *args):
A helper for function _handle_all_serials(). A helper for function _handle_all_serials().
""" """
return handle_all_serials(oid, *args)[oid] return handle_all_serials(oid, *args).get(oid)
def import_helper(name): def import_helper(name):
__import__(name) __import__(name)
...@@ -189,7 +189,9 @@ class StorageTestBase(ZODB.tests.util.TestCase): ...@@ -189,7 +189,9 @@ class StorageTestBase(ZODB.tests.util.TestCase):
# Finish the transaction # Finish the transaction
r2 = self._storage.tpc_vote(t) r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2) revid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t) serial = self._storage.tpc_finish(t)
if serial is not None and revid is None:
revid = serial
except: except:
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
raise raise
...@@ -209,8 +211,8 @@ class StorageTestBase(ZODB.tests.util.TestCase): ...@@ -209,8 +211,8 @@ class StorageTestBase(ZODB.tests.util.TestCase):
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
undo_result = self._storage.undo(tid, t) undo_result = self._storage.undo(tid, t)
vote_result = self._storage.tpc_vote(t) vote_result = self._storage.tpc_vote(t)
self._storage.tpc_finish(t) serial = self._storage.tpc_finish(t)
if expected_oids is not None: if expected_oids is not None and serial is None:
oids = list(undo_result[1]) if undo_result else [] oids = list(undo_result[1]) if undo_result else []
oids.extend(oid for (oid, _) in vote_result or ()) oids.extend(oid for (oid, _) in vote_result or ())
self.assertEqual(len(oids), len(expected_oids), repr(oids)) self.assertEqual(len(oids), len(expected_oids), repr(oids))
......
...@@ -73,6 +73,12 @@ class TransactionalUndoStorage: ...@@ -73,6 +73,12 @@ class TransactionalUndoStorage:
def _transaction_newserial(self, oid): def _transaction_newserial(self, oid):
return self.__serials[oid] return self.__serials[oid]
def _transaction_finish(self, t, oid_list):
tid = self._storage.tpc_finish(t)
if tid is not None:
for oid in oid_list:
self.__serials[oid] = tid
def _multi_obj_transaction(self, objs): def _multi_obj_transaction(self, objs):
newrevs = {} newrevs = {}
t = Transaction() t = Transaction()
...@@ -82,7 +88,7 @@ class TransactionalUndoStorage: ...@@ -82,7 +88,7 @@ class TransactionalUndoStorage:
self._transaction_store(oid, rev, data, '', t) self._transaction_store(oid, rev, data, '', t)
newrevs[oid] = None newrevs[oid] = None
self._transaction_vote(t) self._transaction_vote(t)
self._storage.tpc_finish(t) self._transaction_finish(t, [x[0] for x in objs])
for oid in newrevs.keys(): for oid in newrevs.keys():
newrevs[oid] = self._transaction_newserial(oid) newrevs[oid] = self._transaction_newserial(oid)
return newrevs return newrevs
...@@ -219,9 +225,9 @@ class TransactionalUndoStorage: ...@@ -219,9 +225,9 @@ class TransactionalUndoStorage:
self._transaction_store(oid2, revid2, p51, '', t) self._transaction_store(oid2, revid2, p51, '', t)
# Finish the transaction # Finish the transaction
self._transaction_vote(t) self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1) revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2) revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(t)
eq(revid1, revid2) eq(revid1, revid2)
# Update those same two objects # Update those same two objects
t = Transaction() t = Transaction()
...@@ -231,9 +237,9 @@ class TransactionalUndoStorage: ...@@ -231,9 +237,9 @@ class TransactionalUndoStorage:
self._transaction_store(oid2, revid2, p52, '', t) self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction # Finish the transaction
self._transaction_vote(t) self._transaction_vote(t)
self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1) revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2) revid2 = self._transaction_newserial(oid2)
self._storage.tpc_finish(t)
eq(revid1, revid2) eq(revid1, revid2)
# Make sure the objects have the current value # Make sure the objects have the current value
data, revid1 = self._storage.load(oid1, '') data, revid1 = self._storage.load(oid1, '')
...@@ -289,10 +295,11 @@ class TransactionalUndoStorage: ...@@ -289,10 +295,11 @@ class TransactionalUndoStorage:
tid1 = info[1]['id'] tid1 = info[1]['id']
t = Transaction() t = Transaction()
oids = self._begin_undos_vote(t, tid, tid1) oids = self._begin_undos_vote(t, tid, tid1)
self._storage.tpc_finish(t) serial = self._storage.tpc_finish(t)
# We may get the finalization stuff called an extra time, # We may get the finalization stuff called an extra time,
# depending on the implementation. # depending on the implementation.
self.assertEqual(set(oids), set((oid1, oid2))) if serial is None:
self.assertEqual(set(oids), {oid1, oid2})
data, revid1 = self._storage.load(oid1, '') data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(30)) eq(zodb_unpickle(data), MinPO(30))
data, revid2 = self._storage.load(oid2, '') data, revid2 = self._storage.load(oid2, '')
...@@ -326,7 +333,7 @@ class TransactionalUndoStorage: ...@@ -326,7 +333,7 @@ class TransactionalUndoStorage:
self._transaction_store(oid2, revid2, p52, '', t) self._transaction_store(oid2, revid2, p52, '', t)
# Finish the transaction # Finish the transaction
self._transaction_vote(t) self._transaction_vote(t)
self._storage.tpc_finish(t) self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1) revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2) revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2) eq(revid1, revid2)
...@@ -346,7 +353,7 @@ class TransactionalUndoStorage: ...@@ -346,7 +353,7 @@ class TransactionalUndoStorage:
self._transaction_store(oid2, revid2, p53, '', t) self._transaction_store(oid2, revid2, p53, '', t)
# Finish the transaction # Finish the transaction
self._transaction_vote(t) self._transaction_vote(t)
self._storage.tpc_finish(t) self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1) revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2) revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2) eq(revid1, revid2)
...@@ -358,10 +365,11 @@ class TransactionalUndoStorage: ...@@ -358,10 +365,11 @@ class TransactionalUndoStorage:
tid = info[1]['id'] tid = info[1]['id']
t = Transaction() t = Transaction()
oids = self._begin_undos_vote(t, tid) oids = self._begin_undos_vote(t, tid)
self._storage.tpc_finish(t) serial = self._storage.tpc_finish(t)
eq(len(oids), 1) if serial is None:
self.assertTrue(oid1 in oids) eq(len(oids), 1)
self.assertTrue(not oid2 in oids) self.assertTrue(oid1 in oids)
self.assertTrue(not oid2 in oids)
data, revid1 = self._storage.load(oid1, '') data, revid1 = self._storage.load(oid1, '')
eq(zodb_unpickle(data), MinPO(33)) eq(zodb_unpickle(data), MinPO(33))
data, revid2 = self._storage.load(oid2, '') data, revid2 = self._storage.load(oid2, '')
...@@ -398,7 +406,7 @@ class TransactionalUndoStorage: ...@@ -398,7 +406,7 @@ class TransactionalUndoStorage:
self._transaction_store(oid1, revid1, p81, '', t) self._transaction_store(oid1, revid1, p81, '', t)
self._transaction_store(oid2, revid2, p91, '', t) self._transaction_store(oid2, revid2, p91, '', t)
self._transaction_vote(t) self._transaction_vote(t)
self._storage.tpc_finish(t) self._transaction_finish(t, [oid1, oid2])
revid1 = self._transaction_newserial(oid1) revid1 = self._transaction_newserial(oid1)
revid2 = self._transaction_newserial(oid2) revid2 = self._transaction_newserial(oid2)
eq(revid1, revid2) eq(revid1, revid2)
......
...@@ -39,6 +39,45 @@ import ZODB.tests.util ...@@ -39,6 +39,45 @@ import ZODB.tests.util
import ZODB.utils import ZODB.utils
from zope.testing import renormalizing from zope.testing import renormalizing
# With the following monkey-patch, we can test the different ways
# to update _p_changed/_p_serial status of committed oids.
from ZODB.ConflictResolution import ResolvedSerial
class DemoStorage(ZODB.DemoStorage.DemoStorage):
delayed_store = False
def tpc_begin(self, *args):
super(DemoStorage, self).tpc_begin(*args)
self.__stored = []
def store(self, oid, *args):
s = super(DemoStorage, self).store(oid, *args)
if s != ResolvedSerial:
assert type(s) is bytes, s
return
if not self.delayed_store:
return True
self.__stored.append(oid)
tpc_vote = property(lambda self: self._tpc_vote, lambda *_: None)
def _tpc_vote(self, transaction):
s = self.changes.tpc_vote(transaction)
assert s is None, s
return self.__stored if self.delayed_store else s
def tpc_finish(self, transaction, func = lambda tid: None):
r = []
def callback(tid):
func(tid)
r.append(tid)
tid = super(DemoStorage, self).tpc_finish(transaction, callback)
assert tid is None, tid
return r[0]
ZODB.DemoStorage.DemoStorage = DemoStorage
class DemoStorageTests( class DemoStorageTests(
StorageTestBase.StorageTestBase, StorageTestBase.StorageTestBase,
...@@ -104,6 +143,10 @@ class DemoStorageTests( ...@@ -104,6 +143,10 @@ class DemoStorageTests(
self._checkHistory(base_and_changes()) self._checkHistory(base_and_changes())
self._storage = self._storage.pop() self._storage = self._storage.pop()
def checkResolveLate(self):
self._storage.delayed_store = True
self.checkResolve()
class DemoStorageHexTests(DemoStorageTests): class DemoStorageHexTests(DemoStorageTests):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment