Commit cf938cee authored by Jim Fulton's avatar Jim Fulton

Bug fixed:

- The undo implementation was incorrect in ways that could cause
  subtle missbehaviors.

API changes:

- The API for undoing multiple transactions has changed.  To undo
  multiple transactions in a single transaction, pass pass a list of
  transaction identifiers to a database's undo method. Calling a
  database's undo method multiple times in the same transaction now
  raises an exception.

- The storage API (IStorage) has been tightened. Now, storages should
  raise a StorageTransactionError when invalid transactions are passed
  to tpc_begin, tpc_vote, or tpc_finish.
parent 4e826256
...@@ -8,6 +8,16 @@ ...@@ -8,6 +8,16 @@
New Features New Features
------------ ------------
- The API for undoing multiple transactions has changed. To undo
multiple transactions in a single transaction, pass pass a list of
transaction identifiers to a database's undo method. Calling a
database's undo method multiple times in the same transaction now
raises an exception.
- The storage API (IStorage) has been tightened. Now, storages should
raise a StorageTransactionError when invalid transactions are passed
to tpc_begin, tpc_vote, or tpc_finish.
- Broken objects now provide the IBroken interface. - Broken objects now provide the IBroken interface.
Bugs Fixed Bugs Fixed
...@@ -43,6 +53,9 @@ Bugs Fixed ...@@ -43,6 +53,9 @@ Bugs Fixed
- C Header files weren't installed correctly. - C Header files weren't installed correctly.
- The undo implementation was incorrect in ways that could cause
subtle missbehaviors.
3.9.3 (2009-10-23) 3.9.3 (2009-10-23)
================== ==================
......
...@@ -1075,7 +1075,8 @@ class ClientStorage(object): ...@@ -1075,7 +1075,8 @@ class ClientStorage(object):
def tpc_vote(self, txn): def tpc_vote(self, txn):
"""Storage API: vote on a transaction.""" """Storage API: vote on a transaction."""
if txn is not self._transaction: if txn is not self._transaction:
return raise POSException.StorageTransactionError(
"tpc_vote called with wrong transaction")
self._server.vote(id(txn)) self._server.vote(id(txn))
return self._check_serials() return self._check_serials()
...@@ -1094,7 +1095,9 @@ class ClientStorage(object): ...@@ -1094,7 +1095,9 @@ class ClientStorage(object):
# must be ignored. # must be ignored.
if self._transaction == txn: if self._transaction == txn:
self._tpc_cond.release() self._tpc_cond.release()
return raise POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
self._tpc_cond.wait(30) self._tpc_cond.wait(30)
self._transaction = txn self._transaction = txn
self._tpc_cond.release() self._tpc_cond.release()
...@@ -1148,7 +1151,8 @@ class ClientStorage(object): ...@@ -1148,7 +1151,8 @@ class ClientStorage(object):
def tpc_finish(self, txn, f=None): def tpc_finish(self, txn, f=None):
"""Storage API: finish a transaction.""" """Storage API: finish a transaction."""
if txn is not self._transaction: if txn is not self._transaction:
return raise POSException.StorageTransactionError(
"tpc_finish called with wrong transaction")
self._load_lock.acquire() self._load_lock.acquire()
try: try:
if self._midtxn_disconnect: if self._midtxn_disconnect:
......
...@@ -1093,7 +1093,8 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -1093,7 +1093,8 @@ class TimeoutTests(CommonSetupTearDown):
self.assertRaises(ConflictError, storage.tpc_vote, t) self.assertRaises(ConflictError, storage.tpc_vote, t)
# Even aborting won't help. # Even aborting won't help.
storage.tpc_abort(t) storage.tpc_abort(t)
storage.tpc_finish(t) self.assertRaises(ZODB.POSException.StorageTransactionError,
storage.tpc_finish, t)
# Try again. # Try again.
obj.value = 10 obj.value = 10
t = Transaction() t = Transaction()
...@@ -1103,7 +1104,7 @@ class TimeoutTests(CommonSetupTearDown): ...@@ -1103,7 +1104,7 @@ class TimeoutTests(CommonSetupTearDown):
self.assertRaises(ConflictError, storage.tpc_vote, t) self.assertRaises(ConflictError, storage.tpc_vote, t)
# Abort this one and try a transaction that should succeed. # Abort this one and try a transaction that should succeed.
storage.tpc_abort(t) storage.tpc_abort(t)
storage.tpc_finish(t)
# Now do a store. # Now do a store.
obj.value = 11 obj.value = 11
t = Transaction() t = Transaction()
......
...@@ -221,7 +221,8 @@ class BaseStorage(UndoLogCompatible): ...@@ -221,7 +221,8 @@ class BaseStorage(UndoLogCompatible):
self._lock_acquire() self._lock_acquire()
try: try:
if self._transaction is transaction: if self._transaction is transaction:
return raise POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
self._lock_release() self._lock_release()
self._commit_lock_acquire() self._commit_lock_acquire()
self._lock_acquire() self._lock_acquire()
...@@ -264,7 +265,8 @@ class BaseStorage(UndoLogCompatible): ...@@ -264,7 +265,8 @@ class BaseStorage(UndoLogCompatible):
self._lock_acquire() self._lock_acquire()
try: try:
if transaction is not self._transaction: if transaction is not self._transaction:
return raise POSException.StorageTransactionError(
"tpc_vote called with wrong transaction")
self._vote() self._vote()
finally: finally:
self._lock_release() self._lock_release()
...@@ -284,7 +286,8 @@ class BaseStorage(UndoLogCompatible): ...@@ -284,7 +286,8 @@ class BaseStorage(UndoLogCompatible):
self._lock_acquire() self._lock_acquire()
try: try:
if transaction is not self._transaction: if transaction is not self._transaction:
return raise POSException.StorageTransactionError(
"tpc_finish called with wrong transaction")
try: try:
if f is not None: if f is not None:
f(self._tid) f(self._tid)
......
...@@ -896,7 +896,7 @@ class DB(object): ...@@ -896,7 +896,7 @@ class DB(object):
finally: finally:
self._r() self._r()
def undo(self, id, txn=None): def undo(self, ids, txn=None):
"""Undo a transaction identified by id. """Undo a transaction identified by id.
A transaction can be undone if all of the objects involved in A transaction can be undone if all of the objects involved in
...@@ -909,13 +909,16 @@ class DB(object): ...@@ -909,13 +909,16 @@ class DB(object):
transaction id used by other methods; it is unique to undo(). transaction id used by other methods; it is unique to undo().
:Parameters: :Parameters:
- `id`: a storage-specific transaction identifier - `ids`: a sequence of storage-specific transaction identifiers
or a single transaction identifier
- `txn`: transaction context to use for undo(). - `txn`: transaction context to use for undo().
By default, uses the current transaction. By default, uses the current transaction.
""" """
if txn is None: if txn is None:
txn = transaction.get() txn = transaction.get()
txn.register(TransactionalUndo(self, id)) if isinstance(ids, basestring):
ids = [ids]
txn.join(TransactionalUndo(self, ids))
def transaction(self): def transaction(self):
return ContextManager(self) return ContextManager(self)
...@@ -943,60 +946,41 @@ class ContextManager: ...@@ -943,60 +946,41 @@ class ContextManager:
resource_counter_lock = threading.Lock() resource_counter_lock = threading.Lock()
resource_counter = 0 resource_counter = 0
class ResourceManager(object): class TransactionalUndo(object):
"""Transaction participation for an undo resource."""
# XXX This implementation is broken. Subclasses invalidate oids def __init__(self, db, tids):
# in their commit calls. Invalidations should not be sent until
# tpc_finish is called. In fact, invalidations should be sent to
# the db *while* tpc_finish is being called on the storage.
def __init__(self, db):
self._db = db self._db = db
# Delegate the actual 2PC methods to the storage self._storage = db.storage
self.tpc_vote = self._db.storage.tpc_vote self._tids = tids
self.tpc_finish = self._db.storage.tpc_finish self._oids = set()
self.tpc_abort = self._db.storage.tpc_abort
# Get a number from a simple thread-safe counter, then
# increment it, for the purpose of sorting ResourceManagers by
# creation order. This ensures that multiple ResourceManagers
# within a transaction commit in a predictable sequence.
resource_counter_lock.acquire()
try:
global resource_counter
self._count = resource_counter
resource_counter += 1
finally:
resource_counter_lock.release()
def sortKey(self): def abort(self, transaction):
return "%s:%016x" % (self._db.storage.sortKey(), self._count) pass
def tpc_begin(self, txn, sub=False):
if sub:
raise ValueError("doesn't support sub-transactions")
self._db.storage.tpc_begin(txn)
# The object registers itself with the txn manager, so the ob def tpc_begin(self, transaction):
# argument to the methods below is self. self._storage.tpc_begin(transaction)
def abort(self, obj, txn): def commit(self, transaction):
raise NotImplementedError for tid in self._tids:
result = self._storage.undo(tid, transaction)
if result:
self._oids.update(result[1])
def commit(self, obj, txn): def tpc_vote(self, transaction):
raise NotImplementedError for oid, _ in self._storage.tpc_vote(transaction) or ():
self._oids.add(oid)
class TransactionalUndo(ResourceManager): def tpc_finish(self, transaction):
self._storage.tpc_finish(
transaction,
lambda tid: self._db.invalidate(tid, self._oids)
)
def __init__(self, db, tid): def tpc_abort(self, transaction):
super(TransactionalUndo, self).__init__(db) self._storage.tpc_abort(transaction)
self._tid = tid
def commit(self, ob, t): def sortKey(self):
# XXX see XXX in ResourceManager return "%s:%s" % (self._storage.sortKey(), id(self))
tid, oids = self._db.storage.undo(self._tid, t)
self._db.invalidate(tid, dict.fromkeys(oids, 1))
def connection(*args, **kw): def connection(*args, **kw):
db = DB(*args, **kw) db = DB(*args, **kw)
......
...@@ -309,7 +309,8 @@ class DemoStorage(object): ...@@ -309,7 +309,8 @@ class DemoStorage(object):
def tpc_begin(self, transaction, *a, **k): def tpc_begin(self, transaction, *a, **k):
# The tid argument exists to support testing. # The tid argument exists to support testing.
if transaction is self._transaction: if transaction is self._transaction:
return raise ZODB.POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
self._lock_release() self._lock_release()
self._commit_lock.acquire() self._commit_lock.acquire()
self._lock_acquire() self._lock_acquire()
...@@ -320,7 +321,8 @@ class DemoStorage(object): ...@@ -320,7 +321,8 @@ class DemoStorage(object):
@ZODB.utils.locked @ZODB.utils.locked
def tpc_finish(self, transaction, func = lambda tid: None): def tpc_finish(self, transaction, func = lambda tid: None):
if (transaction is not self._transaction): if (transaction is not self._transaction):
return raise ZODB.POSException.StorageTransactionError(
"tpc_finish called with wrong transaction")
self._issued_oids.difference_update(self._stored_oids) self._issued_oids.difference_update(self._stored_oids)
self._stored_oids = set() self._stored_oids = set()
self._transaction = None self._transaction = None
......
...@@ -705,7 +705,8 @@ class FileStorage( ...@@ -705,7 +705,8 @@ class FileStorage(
self._lock_acquire() self._lock_acquire()
try: try:
if transaction is not self._transaction: if transaction is not self._transaction:
return raise POSException.StorageTransactionError(
"tpc_vote called with wrong transaction")
dlen = self._tfile.tell() dlen = self._tfile.tell()
if not dlen: if not dlen:
return # No data in this trans return # No data in this trans
......
...@@ -274,7 +274,8 @@ class MappingStorage(object): ...@@ -274,7 +274,8 @@ class MappingStorage(object):
def tpc_begin(self, transaction, tid=None): def tpc_begin(self, transaction, tid=None):
# The tid argument exists to support testing. # The tid argument exists to support testing.
if transaction is self._transaction: if transaction is self._transaction:
return raise ZODB.POSException.StorageTransactionError(
"Duplicate tpc_begin calls for same transaction")
self._lock_release() self._lock_release()
self._commit_lock.acquire() self._commit_lock.acquire()
self._lock_acquire() self._lock_acquire()
...@@ -292,7 +293,8 @@ class MappingStorage(object): ...@@ -292,7 +293,8 @@ class MappingStorage(object):
@ZODB.utils.locked(opened) @ZODB.utils.locked(opened)
def tpc_finish(self, transaction, func = lambda tid: None): def tpc_finish(self, transaction, func = lambda tid: None):
if (transaction is not self._transaction): if (transaction is not self._transaction):
return raise ZODB.POSException.StorageTransactionError(
"tpc_finish called with wrong transaction")
tid = self._tid tid = self._tid
func(tid) func(tid)
...@@ -318,7 +320,9 @@ class MappingStorage(object): ...@@ -318,7 +320,9 @@ class MappingStorage(object):
# ZODB.interfaces.IStorage # ZODB.interfaces.IStorage
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
pass if transaction is not self._transaction:
raise POSException.StorageTransactionError(
"tpc_vote called with wrong transaction")
class TransactionRecord: class TransactionRecord:
......
...@@ -690,7 +690,7 @@ class IStorage(Interface): ...@@ -690,7 +690,7 @@ class IStorage(Interface):
"""Begin the two-phase commit process. """Begin the two-phase commit process.
If storage is already participating in a two-phase commit If storage is already participating in a two-phase commit
using the same transaction, the call is ignored. using the same transaction, a StorageTransactionError is raised.
If the storage is already participating in a two-phase commit If the storage is already participating in a two-phase commit
using a different transaction, the call blocks until the using a different transaction, the call blocks until the
...@@ -702,9 +702,10 @@ class IStorage(Interface): ...@@ -702,9 +702,10 @@ class IStorage(Interface):
Changes must be made permanent at this point. Changes must be made permanent at this point.
This call is ignored if the storage isn't participating in This call raises a StorageTransactionError if the storage
two-phase commit or if it is committing a different isn't participating in two-phase commit or if it is committing
transaction. Failure of this method is extremely serious. a different transaction. Failure of this method is extremely
serious.
The second argument is a call-back function that must be The second argument is a call-back function that must be
called while the storage transaction lock is held. It takes called while the storage transaction lock is held. It takes
...@@ -715,9 +716,9 @@ class IStorage(Interface): ...@@ -715,9 +716,9 @@ class IStorage(Interface):
def tpc_vote(transaction): def tpc_vote(transaction):
"""Provide a storage with an opportunity to veto a transaction """Provide a storage with an opportunity to veto a transaction
This call is ignored if the storage isn't participating in This call raises a StorageTransactionError if the storage
two-phase commit or if it is commiting a different isn't participating in two-phase commit or if it is commiting
transaction. Failure of this method is extremely serious. a different transaction.
If a transaction can be committed by a storage, then the If a transaction can be committed by a storage, then the
method should return. If a transaction cannot be committed, method should return. If a transaction cannot be committed,
......
...@@ -34,8 +34,8 @@ class BasicStorage: ...@@ -34,8 +34,8 @@ class BasicStorage:
def checkBasics(self): def checkBasics(self):
t = transaction.Transaction() t = transaction.Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
# This should simply return self.assertRaises(POSException.StorageTransactionError,
self._storage.tpc_begin(t) self._storage.tpc_begin, t)
# Aborting is easy # Aborting is easy
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
# Test a few expected exceptions when we're doing operations giving a # Test a few expected exceptions when we're doing operations giving a
......
...@@ -99,19 +99,20 @@ class SynchronizedStorage: ...@@ -99,19 +99,20 @@ class SynchronizedStorage:
def checkFinishNotCommitting(self): def checkFinishNotCommitting(self):
t = Transaction() t = Transaction()
self._storage.tpc_finish(t) self.assertRaises(StorageTransactionError,
self._storage.tpc_finish, t)
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
def checkFinishWrongTrans(self): def checkFinishWrongTrans(self):
t = Transaction() t = Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
self._storage.tpc_finish(Transaction()) self.assertRaises(StorageTransactionError,
self._storage.tpc_finish, Transaction())
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
def checkBeginCommitting(self): def checkBeginCommitting(self):
t = Transaction() t = Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
self._storage.tpc_begin(t)
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
# TODO: how to check undo? # TODO: how to check undo?
...@@ -420,8 +420,8 @@ class ZODBTests(ZODB.tests.util.TestCase): ...@@ -420,8 +420,8 @@ class ZODBTests(ZODB.tests.util.TestCase):
# performed yet. # performed yet.
transaction.begin() transaction.begin()
log = self._db.undoLog() log = self._db.undoLog()
for i in range(5): self._db.undo([log[i]['id'] for i in range(5)])
self._db.undo(log[i]['id'])
transaction.get().note('undo states 1 through 5') transaction.get().note('undo states 1 through 5')
# Now attempt all those undo operations. # Now attempt all those undo operations.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment