Commit 646b2b33 authored by Tim Peters's avatar Tim Peters

Merge Jim's savepoint/rollback work from the 3.4 branch. Yay!

Original checkin msgs follow:

r30131 | jim | 2005-04-23 23:33:29 -0400 (Sat, 23 Apr 2005) | 5 lines
   M /ZODB/branches/3.4/src/ZODB/Connection.py
   M /ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py

Fixed a bug in commits with savepoints and changes since savepoints.
Once we start using savepoints, we need to make sure that all data are
committed through the savepoints. Otherwise, things can get committed
in the wrong order, leading to conflicts.

r30130 | jim | 2005-04-23 23:02:00 -0400 (Sat, 23 Apr 2005) | 6 lines
   M /ZODB/branches/3.4/src/ZODB/Connection.py
   M /ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py

Fixed a bug that caused assertion errors if an object was added in a
savepoint, then modified and then aborted.

Also added missing code to clear registered objects when a savepoint
was rolled back.

r30129 | jim | 2005-04-23 21:29:02 -0400 (Sat, 23 Apr 2005) | 5 lines
   M /ZODB/branches/3.4/src/ZODB/Connection.py
   D /ZODB/branches/3.4/src/ZODB/TmpStore.py
   M /ZODB/branches/3.4/src/ZODB/tests/testConnection.py
   A /ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.py
   A /ZODB/branches/3.4/src/ZODB/tests/testConnectionSavepoint.txt
   M /ZODB/branches/3.4/src/ZODB/tests/testZODB.py
   M /ZODB/branches/3.4/src/ZODB/tests/test_datamanageradapter.py
   M /ZODB/branches/3.4/src/transaction/__init__.py
   M /ZODB/branches/3.4/src/transaction/_manager.py
   M /ZODB/branches/3.4/src/transaction/_transaction.py
   M /ZODB/branches/3.4/src/transaction/interfaces.py
   A /ZODB/branches/3.4/src/transaction/savepoint.txt
   A /ZODB/branches/3.4/src/transaction/tests/savepointsample.py
   M /ZODB/branches/3.4/src/transaction/tests/test_register_compat.py
   A /ZODB/branches/3.4/src/transaction/tests/test_savepoint.py

Added savepoints!

(And also added interfaces and rearranged some code to hopefully make
it easier to read.)

r30128 | jim | 2005-04-23 21:28:59 -0400 (Sat, 23 Apr 2005) | 2 lines
   M /ZODB/branches/3.4/src/transaction/tests/test_transaction.py

Removed some tests that son't make sense after the savepoont refactoring

r30127 | jim | 2005-04-23 21:28:57 -0400 (Sat, 23 Apr 2005) | 2 lines
   M /ZODB/branches/3.4/src/persistent/interfaces.py

Commented out mtime
parent cde26ec4
...@@ -17,6 +17,7 @@ $Id$""" ...@@ -17,6 +17,7 @@ $Id$"""
import logging import logging
import sys import sys
import tempfile
import threading import threading
import warnings import warnings
from time import time from time import time
...@@ -33,13 +34,12 @@ import transaction ...@@ -33,13 +34,12 @@ import transaction
from ZODB.ConflictResolution import ResolvedSerial from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport from ZODB.ExportImport import ExportImport
from ZODB.POSException \ from ZODB import POSException
import ConflictError, ReadConflictError, InvalidObjectReference, \ from ZODB.POSException import InvalidObjectReference, ConnectionStateError
ConnectionStateError from ZODB.POSException import ConflictError, ReadConflictError
from ZODB.TmpStore import TmpStore
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
from ZODB.utils import u64, oid_repr, z64, positive_id, \ from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
DEPRECATED_ARGUMENT, deprecated36 from ZODB.utils import p64, u64, z64, oid_repr, positive_id
global_reset_counter = 0 global_reset_counter = 0
...@@ -61,17 +61,18 @@ class Connection(ExportImport, object): ...@@ -61,17 +61,18 @@ class Connection(ExportImport, object):
implements(IConnection, IDataManager, IPersistentDataManager) implements(IConnection, IDataManager, IPersistentDataManager)
_tmp = None _storage = _normal_storage = _savepoint_storage = None
_code_timestamp = 0 _code_timestamp = 0
# ZODB.IConnection ##########################################################################
# Connection methods, ZODB.IConnection
def __init__(self, version='', cache_size=400, def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None, cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True): synch=True):
"""Create a new Connection.""" """Create a new Connection."""
self._log = logging.getLogger("ZODB.Connection") self._log = logging.getLogger("ZODB.Connection")
self._storage = None
self._debug_info = () self._debug_info = ()
self._opened = None # time.time() when DB.open() opened us self._opened = None # time.time() when DB.open() opened us
...@@ -150,39 +151,6 @@ class Connection(ExportImport, object): ...@@ -150,39 +151,6 @@ class Connection(ExportImport, object):
self.connections = None self.connections = None
def get_connection(self, database_name):
"""Return a Connection for the named database."""
connection = self.connections.get(database_name)
if connection is None:
new_con = self._db.databases[database_name].open()
self.connections.update(new_con.connections)
new_con.connections = self.connections
connection = new_con
return connection
def get(self, oid):
"""Return the persistent object with oid 'oid'."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
obj = self._cache.get(oid, None)
if obj is not None:
return obj
obj = self._added.get(oid, None)
if obj is not None:
return obj
p, serial = self._storage.load(oid, self._version)
obj = self._reader.getGhost(p)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = None
obj._p_serial = serial
self._cache[oid] = obj
return obj
def add(self, obj): def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid.""" """Add a new object 'obj' to the database and assign it an oid."""
if self._storage is None: if self._storage is None:
...@@ -207,52 +175,41 @@ class Connection(ExportImport, object): ...@@ -207,52 +175,41 @@ class Connection(ExportImport, object):
elif obj._p_jar is not self: elif obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar) raise InvalidObjectReference(obj, obj._p_jar)
def sortKey(self): def get(self, oid):
"""Return a consistent sort key for this connection.""" """Return the persistent object with oid 'oid'."""
return "%s:%s" % (self._storage.sortKey(), id(self)) if self._storage is None:
raise ConnectionStateError("The database connection is closed")
def abort(self, transaction): obj = self._cache.get(oid, None)
"""Abort a transaction and forget all changes.""" if obj is not None:
for obj in self._registered_objects: return obj
oid = obj._p_oid obj = self._added.get(oid, None)
assert oid is not None if obj is not None:
if oid in self._added: return obj
del self._added[oid]
del obj._p_jar
del obj._p_oid
else:
# Note: If we invalidate a non-ghostifiable object p, serial = self._storage.load(oid, self._version)
# (i.e. a persistent class), the object will obj = self._reader.getGhost(p)
# immediately reread it's state. That means that the
# following call could result in a call to
# self.setstate, which, of course, must suceed.
# In general, it would be better if the read could be
# delayed until the start of the next transaction. If
# we read at the end of a transaction and if the
# object was invalidated during this transaction, then
# we'll read non-current data, which we'll discard
# later in transaction finalization. Unfortnately, we
# can only delay the read if this abort corresponds to
# a top-level-transaction abort. We can't tell if
# this is a top-level-transaction abort, so we have to
# go ahead and invalidate now. Fortunately, it's
# pretty unlikely that the object we are invalidating
# was invalidated by another thread, so the risk of a
# reread is pretty low.
self._cache.invalidate(oid) obj._p_oid = oid
obj._p_jar = self
obj._p_changed = None
obj._p_serial = serial
self._tpc_cleanup() self._cache[oid] = obj
return obj
# TODO: we should test what happens when cacheGC is called mid-transaction. def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache."""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
# TODO: we should test what happens when cacheGC is called mid-transaction.
def cacheGC(self): def cacheGC(self):
"""Reduce cache size to target size.""" """Reduce cache size to target size."""
self._cache.incrgc() self._cache.incrgc()
__onCloseCallbacks = None __onCloseCallbacks = None
def onCloseCallback(self, f): def onCloseCallback(self, f):
"""Register a callable, f, to be called by close().""" """Register a callable, f, to be called by close()."""
if self.__onCloseCallbacks is None: if self.__onCloseCallbacks is None:
...@@ -266,12 +223,6 @@ class Connection(ExportImport, object): ...@@ -266,12 +223,6 @@ class Connection(ExportImport, object):
raise ConnectionStateError("Cannot close a connection joined to " raise ConnectionStateError("Cannot close a connection joined to "
"a transaction") "a transaction")
if self._tmp is not None:
# There are no direct modifications pending, but a subtransaction
# is pending.
raise ConnectionStateError("Cannot close a connection with a "
"pending subtransaction")
if self._cache is not None: if self._cache is not None:
self._cache.incrgc() # This is a good time to do some GC self._cache.incrgc() # This is a good time to do some GC
...@@ -285,7 +236,8 @@ class Connection(ExportImport, object): ...@@ -285,7 +236,8 @@ class Connection(ExportImport, object):
self._log.error("Close callback failed for %s", f, self._log.error("Close callback failed for %s", f,
exc_info=sys.exc_info()) exc_info=sys.exc_info())
self.__onCloseCallbacks = None self.__onCloseCallbacks = None
self._storage = self._tmp = self.new_oid = None self._storage = self._savepoint_storage = self._normal_storage = None
self.new_oid = None
self._debug_info = () self._debug_info = ()
self._opened = None self._opened = None
# Return the connection to the pool. # Return the connection to the pool.
...@@ -297,156 +249,136 @@ class Connection(ExportImport, object): ...@@ -297,156 +249,136 @@ class Connection(ExportImport, object):
# assert that here, because self may have been reused (by # assert that here, because self may have been reused (by
# another thread) by the time we get back here. # another thread) by the time we get back here.
# transaction.interfaces.IDataManager def db(self):
"""Returns a handle to the database this connection belongs to."""
return self._db
def commit(self, transaction): def isReadOnly(self):
"""Commit changes to an object""" """Returns True if the storage for this connection is read only."""
if self._import: if self._storage is None:
# TODO: This code seems important for Zope, but needs docs raise ConnectionStateError("The database connection is closed")
# to explain why. return self._storage.isReadOnly()
self._importDuringCommit(transaction, *self._import)
self._import = None
# Just in case an object is added as a side-effect of storing def invalidate(self, tid, oids):
# a modified object. If, for example, a __getstate__() method """Notify the Connection that transaction 'tid' invalidated oids."""
# calls add(), the newly added objects will show up in self._inv_lock.acquire()
# _added_during_commit. This sounds insane, but has actually try:
# happened. if self._txn_time is None:
self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
self._added_during_commit = [] def root(self):
"""Return the database root object."""
return self.get(z64)
for obj in self._registered_objects: def getVersion(self):
oid = obj._p_oid """Returns the version this connection is attached to."""
assert oid if self._storage is None:
if oid in self._conflicts: raise ConnectionStateError("The database connection is closed")
raise ReadConflictError(object=obj) return self._version
if obj._p_jar is not self: def get_connection(self, database_name):
raise InvalidObjectReference(obj, obj._p_jar) """Return a Connection for the named database."""
elif oid in self._added: connection = self.connections.get(database_name)
assert obj._p_serial == z64 if connection is None:
elif obj._p_changed: new_con = self._db.databases[database_name].open()
if oid in self._invalidated: self.connections.update(new_con.connections)
resolve = getattr(obj, "_p_resolveConflict", None) new_con.connections = self.connections
if resolve is None: connection = new_con
raise ConflictError(object=obj) return connection
self._modified.append(oid)
else:
# Nothing to do. It's been said that it's legal, e.g., for
# an object to set _p_changed to false after it's been
# changed and registered.
continue
self._store_objects(ObjectWriter(obj), transaction) def sync(self):
"""Manually update the view on the database."""
self._txn_mgr.get().abort()
sync = getattr(self._storage, 'sync', 0)
if sync:
sync()
self._flush_invalidations()
for obj in self._added_during_commit: def getDebugInfo(self):
self._store_objects(ObjectWriter(obj), transaction) """Returns a tuple with different items for debugging the
self._added_during_commit = None connection.
"""
return self._debug_info
def _store_objects(self, writer, transaction): def setDebugInfo(self, *args):
for obj in writer: """Add the given items to the debug information of this connection."""
oid = obj._p_oid self._debug_info = self._debug_info + args
serial = getattr(obj, "_p_serial", z64)
if serial == z64: def getTransferCounts(self, clear=False):
# obj is a new object """Returns the number of objects loaded and stored."""
self._creating.append(oid) res = self._load_count, self._store_count
# Because obj was added, it is now in _creating, so it can if clear:
# be removed from _added. self._load_count = 0
self._added.pop(oid, None) self._store_count = 0
else: return res
if (oid in self._invalidated
and not hasattr(obj, '_p_resolveConflict')):
raise ConflictError(object=obj)
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
s = self._storage.store(oid, serial, p, self._version, transaction)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
# serial number for a newly created object
try:
self._cache[oid] = obj
except:
# Dang, I bet it's wrapped:
# TODO: Deprecate, then remove, this.
if hasattr(obj, 'aq_base'):
self._cache[oid] = obj.aq_base
else:
raise
self._handle_serial(s, oid) # Connection methods
##########################################################################
def commit_sub(self, t): ##########################################################################
"""Commit all changes made in subtransactions and begin 2-phase commit # Data manager (IDataManager) methods
"""
if self._tmp is None:
return
src = self._storage
self._storage = self._tmp
self._tmp = None
self._log.debug("Commiting subtransaction of size %s", src.getSize()) def abort(self, transaction):
oids = src._index.keys() """Abort a transaction and forget all changes."""
self._storage.tpc_begin(t)
# Copy invalidating and creating info from temporary storage: # The order is important here. We want to abort registered
self._modified.extend(oids) # objects before we process the cache. Otherwise, we may un-add
self._creating.extend(src._creating) # objects added in savepoints. If they've been modified since
# the savepoint, then they won't have _p_oid or _p_jar after
# they've been unadded. This will make the code in _abort
# confused.
for oid in oids:
data, serial = src.load(oid, src)
s = self._storage.store(oid, serial, data, self._version, t)
self._handle_serial(s, oid, change=False)
def abort_sub(self, t): self._abort()
"""Discard all subtransaction data."""
if self._tmp is None:
return
src = self._storage
self._storage = self._tmp
self._tmp = None
# Note: If we invalidate a non-ghostifiable object (i.e. a if self._savepoint_storage is not None:
# persistent class), the object will immediately reread it's self._abort_savepoint()
# state. That means that the following call could result in a
# call to self.setstate, which, of course, must succeed. In
# general, it would be better if the read could be delayed
# until the start of the next transaction. If we read at the
# end of a transaction and if the object was invalidated
# during this transaction, then we'll read non-current data,
# which we'll discard later in transaction finalization. We
# could, theoretically queue this invalidation by calling
# self.invalidate. Unfortunately, attempts to make that
# change resulted in mysterious test failures. It's pretty
# unlikely that the object we are invalidating was invalidated
# by another thread, so the risk of a reread is pretty low.
# It's really not worth the effort to pursue this.
self._cache.invalidate(src._index.keys()) self._tpc_cleanup()
self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None): def _abort(self):
"""Disown any objects newly saved in an uncommitted transaction.""" """Abort a transaction and forget all changes."""
if creating is None:
creating = self._creating
self._creating = []
for oid in creating: for obj in self._registered_objects:
o = self._cache.get(oid) oid = obj._p_oid
if o is not None: assert oid is not None
del self._cache[oid] if oid in self._added:
del o._p_jar del self._added[oid]
del o._p_oid del obj._p_jar
del obj._p_oid
else:
# The next two methods are callbacks for transaction synchronization. # Note: If we invalidate a non-ghostifiable object
# (i.e. a persistent class), the object will
# immediately reread it's state. That means that the
# following call could result in a call to
# self.setstate, which, of course, must suceed.
# In general, it would be better if the read could be
# delayed until the start of the next transaction. If
# we read at the end of a transaction and if the
# object was invalidated during this transaction, then
# we'll read non-current data, which we'll discard
# later in transaction finalization. Unfortnately, we
# can only delay the read if this abort corresponds to
# a top-level-transaction abort. We can't tell if
# this is a top-level-transaction abort, so we have to
# go ahead and invalidate now. Fortunately, it's
# pretty unlikely that the object we are invalidating
# was invalidated by another thread, so the risk of a
# reread is pretty low.
def beforeCompletion(self, txn): self._cache.invalidate(oid)
# We don't do anything before a commit starts.
pass
def afterCompletion(self, txn): def _tpc_cleanup(self):
"""Performs cleanup operations to support tpc_finish and tpc_abort."""
self._conflicts.clear()
if not self._synch:
self._flush_invalidations() self._flush_invalidations()
self._needs_to_join = True
self._registered_objects = []
def _flush_invalidations(self): def _flush_invalidations(self):
self._inv_lock.acquire() self._inv_lock.acquire()
...@@ -491,54 +423,109 @@ class Connection(ExportImport, object): ...@@ -491,54 +423,109 @@ class Connection(ExportImport, object):
# Now is a good time to collect some garbage. # Now is a good time to collect some garbage.
self._cache.incrgc() self._cache.incrgc()
def root(self): def tpc_begin(self, transaction):
"""Return the database root object.""" """Begin commit of a transaction, starting the two-phase commit."""
return self.get(z64) self._modified = []
def db(self): # _creating is a list of oids of new objects, which is used to
"""Returns a handle to the database this connection belongs to.""" # remove them from the cache if a transaction aborts.
return self._db self._creating = []
self._normal_storage.tpc_begin(transaction)
def isReadOnly(self): def commit(self, transaction):
"""Returns True if the storage for this connection is read only.""" """Commit changes to an object"""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
def invalidate(self, tid, oids): if self._savepoint_storage is not None:
"""Notify the Connection that transaction 'tid' invalidated oids."""
self._inv_lock.acquire() # We first checkpoint the current changes to the savepoint
try: self.savepoint()
if self._txn_time is None:
self._txn_time = tid # then commit all of the savepoint changes at once
self._invalidated.update(oids) self._commit_savepoint(transaction)
finally:
self._inv_lock.release() # No need to call _commit since savepoint did.
else:
self._commit(transaction)
def _commit(self, transaction):
"""Commit changes to an object"""
if self._import:
# TODO: This code seems important for Zope, but needs docs
# to explain why.
self._importDuringCommit(transaction, *self._import)
self._import = None
# Just in case an object is added as a side-effect of storing
# a modified object. If, for example, a __getstate__() method
# calls add(), the newly added objects will show up in
# _added_during_commit. This sounds insane, but has actually
# happened.
self._added_during_commit = []
# IDataManager for obj in self._registered_objects:
oid = obj._p_oid
assert oid
if oid in self._conflicts:
raise ReadConflictError(object=obj)
def tpc_begin(self, transaction, sub=False): if obj._p_jar is not self:
"""Begin commit of a transaction, starting the two-phase commit.""" raise InvalidObjectReference(obj, obj._p_jar)
self._modified = [] elif oid in self._added:
assert obj._p_serial == z64
elif obj._p_changed:
if oid in self._invalidated:
resolve = getattr(obj, "_p_resolveConflict", None)
if resolve is None:
raise ConflictError(object=obj)
self._modified.append(oid)
else:
# Nothing to do. It's been said that it's legal, e.g., for
# an object to set _p_changed to false after it's been
# changed and registered.
continue
# _creating is a list of oids of new objects, which is used to self._store_objects(ObjectWriter(obj), transaction)
# remove them from the cache if a transaction aborts.
self._creating = []
if sub and self._tmp is None:
# Sub-transaction!
self._tmp = self._storage
self._storage = TmpStore(self._version, self._storage)
self._storage.tpc_begin(transaction) for obj in self._added_during_commit:
self._store_objects(ObjectWriter(obj), transaction)
self._added_during_commit = None
def tpc_vote(self, transaction): def _store_objects(self, writer, transaction):
"""Verify that a data manager can commit the transaction.""" for obj in writer:
oid = obj._p_oid
serial = getattr(obj, "_p_serial", z64)
if serial == z64:
# obj is a new object
self._creating.append(oid)
# Because obj was added, it is now in _creating, so it can
# be removed from _added.
self._added.pop(oid, None)
else:
if (oid in self._invalidated
and not hasattr(obj, '_p_resolveConflict')):
raise ConflictError(object=obj)
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
s = self._storage.store(oid, serial, p, self._version, transaction)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
# serial number for a newly created object
try: try:
vote = self._storage.tpc_vote self._cache[oid] = obj
except AttributeError: except:
return # Dang, I bet it's wrapped:
s = vote(transaction) # TODO: Deprecate, then remove, this.
self._handle_serial(s) if hasattr(obj, 'aq_base'):
self._cache[oid] = obj.aq_base
else:
raise
self._handle_serial(s, oid)
def _handle_serial(self, store_return, oid=None, change=1): def _handle_serial(self, store_return, oid=None, change=1):
"""Handle the returns from store() and tpc_vote() calls.""" """Handle the returns from store() and tpc_vote() calls."""
...@@ -582,26 +569,13 @@ class Connection(ExportImport, object): ...@@ -582,26 +569,13 @@ class Connection(ExportImport, object):
obj._p_changed = 0 # transition from changed to up-to-date obj._p_changed = 0 # transition from changed to up-to-date
obj._p_serial = serial obj._p_serial = serial
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
if self._tmp is not None:
# Commiting a subtransaction!
# There is no need to invalidate anything.
self._storage.tpc_finish(transaction)
self._storage._creating[:0]=self._creating
del self._creating[:]
else:
def callback(tid):
d = {}
for oid in self._modified:
d[oid] = 1
self._db.invalidate(tid, d, self)
self._storage.tpc_finish(transaction, callback)
self._tpc_cleanup()
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
if self._import: if self._import:
self._import = None self._import = None
if self._savepoint_storage is not None:
self._abort_savepoint()
self._storage.tpc_abort(transaction) self._storage.tpc_abort(transaction)
# Note: If we invalidate a non-justifiable object (i.e. a # Note: If we invalidate a non-justifiable object (i.e. a
...@@ -628,41 +602,59 @@ class Connection(ExportImport, object): ...@@ -628,41 +602,59 @@ class Connection(ExportImport, object):
del obj._p_jar del obj._p_jar
self._tpc_cleanup() self._tpc_cleanup()
def _tpc_cleanup(self): def _invalidate_creating(self, creating=None):
"""Performs cleanup operations to support tpc_finish and tpc_abort.""" """Disown any objects newly saved in an uncommitted transaction."""
self._conflicts.clear() if creating is None:
if not self._synch: creating = self._creating
self._flush_invalidations() self._creating = []
self._needs_to_join = True
self._registered_objects = []
def sync(self): for oid in creating:
"""Manually update the view on the database.""" o = self._cache.get(oid)
self._txn_mgr.get().abort() if o is not None:
sync = getattr(self._storage, 'sync', 0) del self._cache[oid]
if sync: del o._p_jar
sync() del o._p_oid
self._flush_invalidations()
def getDebugInfo(self): def tpc_vote(self, transaction):
"""Returns a tuple with different items for debugging the """Verify that a data manager can commit the transaction."""
connection. try:
""" vote = self._storage.tpc_vote
return self._debug_info except AttributeError:
return
s = vote(transaction)
self._handle_serial(s)
def setDebugInfo(self, *args): def tpc_finish(self, transaction):
"""Add the given items to the debug information of this connection.""" """Indicate confirmation that the transaction is done."""
self._debug_info = self._debug_info + args def callback(tid):
d = {}
for oid in self._modified:
d[oid] = 1
self._db.invalidate(tid, d, self)
self._storage.tpc_finish(transaction, callback)
self._tpc_cleanup()
def getTransferCounts(self, clear=False): def sortKey(self):
"""Returns the number of objects loaded and stored.""" """Return a consistent sort key for this connection."""
res = self._load_count, self._store_count return "%s:%s" % (self._storage.sortKey(), id(self))
if clear:
self._load_count = 0 # Data manager (IDataManager) methods
self._store_count = 0 ##########################################################################
return res
############################################## ##########################################################################
# Transaction-manager synchronization -- ISynchronizer
def beforeCompletion(self, txn):
# We don't do anything before a commit starts.
pass
def afterCompletion(self, txn):
self._flush_invalidations()
# Transaction-manager synchronization -- ISynchronizer
##########################################################################
##########################################################################
# persistent.interfaces.IPersistentDatamanager # persistent.interfaces.IPersistentDatamanager
def oldstate(self, obj, tid): def oldstate(self, obj, tid):
...@@ -818,12 +810,24 @@ class Connection(ExportImport, object): ...@@ -818,12 +810,24 @@ class Connection(ExportImport, object):
self._register(obj) self._register(obj)
def _register(self, obj=None): def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj) # The order here is important. We need to join before
# registering the object, because joining may take a
# savepoint, and the savepoint should not reflect the change
# to the object.
if self._needs_to_join: if self._needs_to_join:
self._txn_mgr.get().join(self) self._txn_mgr.get().join(self)
self._needs_to_join = False self._needs_to_join = False
if obj is not None:
self._registered_objects.append(obj)
# persistent.interfaces.IPersistentDatamanager
##########################################################################
##########################################################################
# PROTECTED stuff (used by e.g. ZODB.DB.DB) # PROTECTED stuff (used by e.g. ZODB.DB.DB)
def _cache_items(self): def _cache_items(self):
...@@ -862,7 +866,7 @@ class Connection(ExportImport, object): ...@@ -862,7 +866,7 @@ class Connection(ExportImport, object):
# and Storage. # and Storage.
self._db = odb self._db = odb
self._storage = odb._storage self._normal_storage = self._storage = odb._storage
self.new_oid = odb._storage.new_oid self.new_oid = odb._storage.new_oid
self._opened = time() self._opened = time()
if synch is not None: if synch is not None:
...@@ -892,6 +896,7 @@ class Connection(ExportImport, object): ...@@ -892,6 +896,7 @@ class Connection(ExportImport, object):
cache_size = self._cache.cache_size cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size) self._cache = cache = PickleCache(self, cache_size)
##########################################################################
# Python protocol # Python protocol
def __repr__(self): def __repr__(self):
...@@ -901,6 +906,10 @@ class Connection(ExportImport, object): ...@@ -901,6 +906,10 @@ class Connection(ExportImport, object):
ver = '' ver = ''
return '<Connection at %08x%s>' % (positive_id(self), ver) return '<Connection at %08x%s>' % (positive_id(self), ver)
# Python protocol
##########################################################################
##########################################################################
# DEPRECATION candidates # DEPRECATION candidates
__getitem__ = get __getitem__ = get
...@@ -916,33 +925,6 @@ class Connection(ExportImport, object): ...@@ -916,33 +925,6 @@ class Connection(ExportImport, object):
except KeyError: except KeyError:
return self.getVersion() return self.getVersion()
def getVersion(self):
"""Returns the version this connection is attached to."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
try:
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
# We call getGhost(), but we actually get a non-ghost back.
# The object is a class, which can't actually be ghosted.
copy = self._reader.getGhost(p)
obj.__dict__.clear()
obj.__dict__.update(copy.__dict__)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = 0
obj._p_serial = serial
except:
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def exchange(self, old, new): def exchange(self, old, new):
# called by a ZClasses method that isn't executed by the test suite # called by a ZClasses method that isn't executed by the test suite
oid = old._p_oid oid = old._p_oid
...@@ -952,8 +934,20 @@ class Connection(ExportImport, object): ...@@ -952,8 +934,20 @@ class Connection(ExportImport, object):
self._register(new) self._register(new)
self._cache[oid] = new self._cache[oid] = new
# DEPRECATION candidates
##########################################################################
##########################################################################
# DEPRECATED methods # DEPRECATED methods
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def getTransaction(self): def getTransaction(self):
"""Get the current transaction for this connection. """Get the current transaction for this connection.
...@@ -986,16 +980,151 @@ class Connection(ExportImport, object): ...@@ -986,16 +980,151 @@ class Connection(ExportImport, object):
self._txn_mgr.registerSynch(self) self._txn_mgr.registerSynch(self)
return self._txn_mgr return self._txn_mgr
def cacheFullSweep(self, dt=None): # DEPRECATED methods
deprecated36("cacheFullSweep is deprecated. " ##########################################################################
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT): #####################################################################
"""Deactivate all unmodified objects in the cache.""" # Savepoint support
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.") def savepoint(self):
self._cache.minimize() if self._savepoint_storage is None:
self._savepoint_storage = TmpStore(self._version,
self._normal_storage)
self._storage = self._savepoint_storage
self._creating = []
self._commit(None)
self._storage.creating.extend(self._creating)
del self._creating[:]
self._registered_objects = []
state = self._storage.position, self._storage.index.copy()
return Savepoint(self, state)
def _rollback(self, state):
self._abort()
self._registered_objects = []
src = self._storage
self._cache.invalidate(src.index)
src.reset(*state)
def _commit_savepoint(self, transaction):
"""Commit all changes made in subtransactions and begin 2-phase commit
"""
src = self._savepoint_storage
self._storage = self._normal_storage
self._savepoint_storage = None
self._log.debug("Commiting savepoints of size %s", src.getSize())
oids = src.index.keys()
# Copy invalidating and creating info from temporary storage:
self._modified.extend(oids)
self._creating.extend(src.creating)
for oid in oids:
data, serial = src.load(oid, src)
s = self._storage.store(oid, serial, data,
self._version, transaction)
self._handle_serial(s, oid, change=False)
src.close()
def _abort_savepoint(self):
"""Discard all subtransaction data."""
src = self._savepoint_storage
self._storage = self._normal_storage
self._savepoint_storage = None
# Note: If we invalidate a non-ghostifiable object (i.e. a
# persistent class), the object will immediately reread it's
# state. That means that the following call could result in a
# call to self.setstate, which, of course, must succeed. In
# general, it would be better if the read could be delayed
# until the start of the next transaction. If we read at the
# end of a transaction and if the object was invalidated
# during this transaction, then we'll read non-current data,
# which we'll discard later in transaction finalization. We
# could, theoretically queue this invalidation by calling
# self.invalidate. Unfortunately, attempts to make that
# change resulted in mysterious test failures. It's pretty
# unlikely that the object we are invalidating was invalidated
# by another thread, so the risk of a reread is pretty low.
# It's really not worth the effort to pursue this.
self._cache.invalidate(src.index)
self._invalidate_creating(src.creating)
src.close()
# Savepoint support
#####################################################################
class Savepoint:
def __init__(self, datamanager, state):
self.datamanager = datamanager
self.state = state
def rollback(self):
self.datamanager._rollback(self.state)
class TmpStore:
"""A storage-like thing to support savepoints."""
def __init__(self, base_version, storage):
self._storage = storage
for method in (
'getName', 'new_oid', 'modifiedInVersion', 'getSize',
'undoLog', 'versionEmpty', 'sortKey',
):
setattr(self, method, getattr(storage, method))
self._base_version = base_version
self._file = tempfile.TemporaryFile()
# position: current file position
# _tpos: file position at last commit point
self.position = 0L
# index: map oid to pos of last committed version
self.index = {}
self.creating = []
def __len__(self):
return len(self.index)
def close(self):
self._file.close()
def load(self, oid, version):
pos = self.index.get(oid)
if pos is None:
return self._storage.load(oid, self._base_version)
self._file.seek(pos)
h = self._file.read(8)
oidlen = u64(h)
read_oid = self._file.read(oidlen)
if read_oid != oid:
raise POSException.StorageSystemError('Bad temporary storage')
h = self._file.read(16)
size = u64(h[8:])
serial = h[:8]
return self._file.read(size), serial
def store(self, oid, serial, data, version, transaction):
# we have this funny signature so we can reuse the normal non-commit
# commit logic
assert version == self._base_version
self._file.seek(self.position)
l = len(data)
if serial is None:
serial = z64
header = p64(len(oid)) + oid + serial + p64(l)
self._file.write(header)
self._file.write(data)
self.index[oid] = self.position
self.position += l + len(header)
return serial
def reset(self, position, index):
self._file.truncate(position)
self.position = position
self.index = index
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from ZODB import POSException
from ZODB.utils import p64, u64, z64
import tempfile
class TmpStore:
"""A storage to support subtransactions."""
_bver = ''
def __init__(self, base_version, storage):
self._transaction = None
self._storage = storage
if base_version:
self._bver = base_version
self._file = tempfile.TemporaryFile()
# _pos: current file position
# _tpos: file position at last commit point
self._pos = self._tpos = 0L
# _index: map oid to pos of last committed version
self._index = {}
# _tindex: map oid to pos for new updates
self._tindex = {}
self._creating = []
def close(self):
self._file.close()
def getName(self):
return self._storage.getName()
def getSize(self):
return self._pos
def load(self, oid, version):
pos = self._index.get(oid)
if pos is None:
return self._storage.load(oid, self._bver)
self._file.seek(pos)
h = self._file.read(8)
oidlen = u64(h)
read_oid = self._file.read(oidlen)
if read_oid != oid:
raise POSException.StorageSystemError('Bad temporary storage')
h = self._file.read(16)
size = u64(h[8:])
serial = h[:8]
return self._file.read(size), serial
def sortKey(self):
return self._storage.sortKey()
# TODO: clarify difference between self._storage & self._db._storage
def modifiedInVersion(self, oid):
if self._index.has_key(oid):
return self._bver
return self._storage.modifiedInVersion(oid)
def new_oid(self):
return self._storage.new_oid()
def registerDB(self, db, limit):
pass
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._file.seek(self._pos)
l = len(data)
if serial is None:
serial = z64
header = p64(len(oid)) + oid + serial + p64(l)
self._file.write(header)
self._file.write(data)
self._tindex[oid] = self._pos
self._pos += l + len(header)
return serial
def tpc_abort(self, transaction):
if transaction is not self._transaction:
return
self._tindex.clear()
self._transaction = None
self._pos = self._tpos
def tpc_begin(self, transaction):
if self._transaction is transaction:
return
self._transaction = transaction
self._tindex.clear() # Just to be sure!
self._pos = self._tpos
def tpc_vote(self, transaction):
pass
def tpc_finish(self, transaction, f=None):
if transaction is not self._transaction:
return
if f is not None:
f()
self._index.update(self._tindex)
self._tindex.clear()
self._tpos = self._pos
def undoLog(self, first, last, filter=None):
return ()
def versionEmpty(self, version):
# TODO: what is this supposed to do?
if version == self._bver:
return len(self._index)
...@@ -294,7 +294,7 @@ class UserMethodTests(unittest.TestCase): ...@@ -294,7 +294,7 @@ class UserMethodTests(unittest.TestCase):
>>> cn.close() # this was succeeding >>> cn.close() # this was succeeding
Traceback (most recent call last): Traceback (most recent call last):
... ...
ConnectionStateError: Cannot close a connection with a pending subtransaction ConnectionStateError: Cannot close a connection joined to a transaction
Again this leaves the connection as it was. Again this leaves the connection as it was.
>>> transaction.commit() >>> transaction.commit()
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of savepoint feature
$Id$
"""
import unittest
from zope.testing import doctest
import persistent.dict, transaction
def testAddingThenModifyThenAbort():
"""
>>> import ZODB.tests.util
>>> db = ZODB.tests.util.DB()
>>> connection = db.open()
>>> root = connection.root()
>>> ob = persistent.dict.PersistentDict()
>>> root['ob'] = ob
>>> sp = transaction.savepoint()
>>> ob.x = 1
>>> transaction.abort()
"""
def testModifyThenSavePointThenModifySomeMoreThenCommit():
"""
>>> import ZODB.tests.util
>>> db = ZODB.tests.util.DB()
>>> connection = db.open()
>>> root = connection.root()
>>> sp = transaction.savepoint()
>>> root['a'] = 1
>>> sp = transaction.savepoint()
>>> root['a'] = 2
>>> transaction.commit()
"""
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite('testConnectionSavepoint.txt'),
doctest.DocTestSuite(),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
Savepoints
==========
Savepoints provide a way to save to disk intermediate work done during
a transaction allowing:
- partial transaction (subtransaction) rollback (abort)
- state of saved objects to be freed, freeing on-line memory for other
uses
Savepoints make it possible to write atomic subroutines that don't
make top-level transaction commitments.
Applications
------------
To demonstrate how savepoints work with transactions, we'll show an example.
>>> import ZODB.tests.util
>>> db = ZODB.tests.util.DB()
>>> connection = db.open()
>>> root = connection.root()
>>> root['name'] = 'bob'
As with other data managers, we can commit changes:
>>> import transaction
>>> transaction.commit()
>>> root['name']
'bob'
and abort changes:
>>> root['name'] = 'sally'
>>> root['name']
'sally'
>>> transaction.abort()
>>> root['name']
'bob'
Now, lets look at an application that manages funds for people.
It allows deposits and debits to be entered for multiple people.
It accepts a sequence of entries and generates a sequence of status
messages. For each entry, it applies the change and then validates
the user's account. If the user's account is invalid, we role back
the change for that entry. The success or failure of an entry is
indicated in the output status. First we'll initialize some accounts:
>>> root['bob-balance'] = 0.0
>>> root['bob-credit'] = 0.0
>>> root['sally-balance'] = 0.0
>>> root['sally-credit'] = 100.0
>>> transaction.commit()
Now, we'll define a validation function to validate an account:
>>> def validate_account(name):
... if root[name+'-balance'] + root[name+'-credit'] < 0:
... raise ValueError('Overdrawn', name)
And a function to apply entries. If the function fails in some
unexpected way, it rolls back all of it's changes and
prints the error:
>>> def apply_entries(entries):
... savepoint = transaction.savepoint()
... try:
... for name, amount in entries:
... entry_savepoint = transaction.savepoint()
... try:
... root[name+'-balance'] += amount
... validate_account(name)
... except ValueError, error:
... entry_savepoint.rollback()
... print 'Error', str(error)
... else:
... print 'Updated', name
... except Exception, error:
... savepoint.rollback()
... print 'Unexpected exception', error
Now let's try applying some entries:
>>> apply_entries([
... ('bob', 10.0),
... ('sally', 10.0),
... ('bob', 20.0),
... ('sally', 10.0),
... ('bob', -100.0),
... ('sally', -100.0),
... ])
Updated bob
Updated sally
Updated bob
Updated sally
Error ('Overdrawn', 'bob')
Updated sally
>>> root['bob-balance']
30.0
>>> root['sally-balance']
-80.0
If we give provide entries that cause an unexpected error:
>>> apply_entries([
... ('bob', 10.0),
... ('sally', 10.0),
... ('bob', '20.0'),
... ('sally', 10.0),
... ])
Updated bob
Updated sally
Unexpected exception unsupported operand type(s) for +=: 'float' and 'str'
Because the apply_entries used a savepoint for the entire function,
it was able to rollback the partial changes without rolling back
changes made in the previous call to apply_entries:
>>> root['bob-balance']
30.0
>>> root['sally-balance']
-80.0
If we now abort the outer transactions, the earlier changes will go
away:
>>> transaction.abort()
>>> root['bob-balance']
0.0
>>> root['sally-balance']
0.0
...@@ -550,13 +550,17 @@ class ZODBTests(unittest.TestCase): ...@@ -550,13 +550,17 @@ class ZODBTests(unittest.TestCase):
self.assertEqual(rt['a'], 1) self.assertEqual(rt['a'], 1)
rt['b'] = 2 rt['b'] = 2
# Subtransactions don't do tpc_vote, so we poison tpc_begin. # Subtransactions don't do tpc_vote, so we poison tpc_begin.
poisoned = PoisonedObject(PoisonedJar(break_tpc_begin=True)) poisoned = PoisonedJar()
transaction.get().register(poisoned) transaction.get().join(poisoned)
poisoned.break_savepoint = True
self.assertRaises(PoisonedError, transaction.get().commit, True) self.assertRaises(PoisonedError, transaction.get().commit, True)
# Trying to subtxn-commit again fails too. # Trying to subtxn-commit again fails too.
self.assertRaises(TransactionFailedError, transaction.get().commit, True) self.assertRaises(TransactionFailedError,
self.assertRaises(TransactionFailedError, transaction.get().commit, True) transaction.get().commit, True)
self.assertRaises(TransactionFailedError,
transaction.get().commit, True)
# Top-level commit also fails. # Top-level commit also fails.
self.assertRaises(TransactionFailedError, transaction.get().commit) self.assertRaises(TransactionFailedError, transaction.get().commit)
...@@ -568,6 +572,7 @@ class ZODBTests(unittest.TestCase): ...@@ -568,6 +572,7 @@ class ZODBTests(unittest.TestCase):
# also raises TransactionFailedError. # also raises TransactionFailedError.
self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2) self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
# Clean up via abort(), and try again. # Clean up via abort(), and try again.
transaction.get().abort() transaction.get().abort()
rt['a'] = 1 rt['a'] = 1
...@@ -576,13 +581,18 @@ class ZODBTests(unittest.TestCase): ...@@ -576,13 +581,18 @@ class ZODBTests(unittest.TestCase):
# Cleaning up via begin() should also work. # Cleaning up via begin() should also work.
rt['a'] = 2 rt['a'] = 2
transaction.get().register(poisoned) poisoned = PoisonedJar()
transaction.get().join(poisoned)
poisoned.break_savepoint = True
self.assertRaises(PoisonedError, transaction.get().commit, True) self.assertRaises(PoisonedError, transaction.get().commit, True)
self.assertRaises(TransactionFailedError, transaction.get().commit, True) self.assertRaises(TransactionFailedError,
transaction.get().commit, True)
# The change to rt['a'] is lost. # The change to rt['a'] is lost.
self.assertEqual(rt['a'], 1) self.assertEqual(rt['a'], 1)
# Trying to modify an object also fails. # Trying to modify an object also fails.
self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2) self.assertRaises(TransactionFailedError, rt.__setitem__, 'b', 2)
# Clean up via begin(), and try again. # Clean up via begin(), and try again.
transaction.begin() transaction.begin()
rt['a'] = 2 rt['a'] = 2
...@@ -603,9 +613,11 @@ class PoisonedError(Exception): ...@@ -603,9 +613,11 @@ class PoisonedError(Exception):
# PoisonedJar arranges to raise exceptions from interesting places. # PoisonedJar arranges to raise exceptions from interesting places.
# For whatever reason, subtransaction commits don't call tpc_vote. # For whatever reason, subtransaction commits don't call tpc_vote.
class PoisonedJar: class PoisonedJar:
def __init__(self, break_tpc_begin=False, break_tpc_vote=False): def __init__(self, break_tpc_begin=False, break_tpc_vote=False,
break_savepoint=False):
self.break_tpc_begin = break_tpc_begin self.break_tpc_begin = break_tpc_begin
self.break_tpc_vote = break_tpc_vote self.break_tpc_vote = break_tpc_vote
self.break_savepoint = break_savepoint
def sortKey(self): def sortKey(self):
return str(id(self)) return str(id(self))
...@@ -620,13 +632,9 @@ class PoisonedJar: ...@@ -620,13 +632,9 @@ class PoisonedJar:
if self.break_tpc_vote: if self.break_tpc_vote:
raise PoisonedError("tpc_vote fails") raise PoisonedError("tpc_vote fails")
# commit_sub is needed else this jar is ignored during subtransaction def savepoint(self):
# commit. if self.break_savepoint:
def commit_sub(*args): raise PoisonedError("savepoint fails")
pass
def abort_sub(*args):
pass
def commit(*args): def commit(*args):
pass pass
......
...@@ -238,569 +238,6 @@ def test_tpc_abort_phase2(): ...@@ -238,569 +238,6 @@ def test_tpc_abort_phase2():
False False
""" """
def test_commit_w_subtransactions():
"""
So, we have a data manager:
>>> dm = DataManager()
and we do some work that modifies uncommited state:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 1)
Now we'll commit the changes in a subtransaction. When the data
manager joins a transaction, the transaction will create an
adapter.
>>> dma = DataManagerAdapter(dm)
and register it as a modified object. At commit time, the
transaction will get the "jar" like this:
>>> jar = getattr(dma, '_p_jar', dma)
and, of course, the jar and the adapter will be the same:
>>> jar is dma
True
The transaction will call tpc_begin:
>>> t1 = '1'
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
Then the transaction will call commit on the jar:
>>> jar.commit(t1)
This doesn't actually do anything. :)
>>> dm.state, dm.delta
(0, 1)
The transaction will then call tpc_vote:
>>> jar.tpc_vote(t1)
This doesn't do anything either, because zodb4 data managers don't
actually do two-phase commit for subtransactions.
>>> dm.state, dm.delta
(0, 1)
Finally, we call tpc_finish. This does actally create a savepoint,
but we can't really tell that from outside.
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 1)
We'll do more of the above:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 2)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 2)
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 3)
Note that the bove works *because* the same transaction is used
for each subtransaction.
Finally, we'll do a little more work:
>>> dm.inc()
>>> dm.inc()
>>> dm.state, dm.delta
(0, 5)
and then commit the top-level transaction.
The transaction will actually go through the steps for a subtransaction:
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
And then call commit_sub:
>>> jar.commit_sub(t1)
As usual, this doesn't actually do anything. ;)
>>> dm.state, dm.delta
(0, 5)
The transaction manager doesn's call tpc_begin, because commit_sub
implies the start of two-phase commit. Next, it does call commit:
>>> jar.commit(t1)
which doesn't do anything.
Finally, the transaction calls tpc_vote:
>>> jar.tpc_vote(t1)
which actually does something (because this is the top-level txn):
>>> dm.state, dm.delta
(5, 5)
>>> dm.prepared
True
Finally, tpc_finish is called:
>>> jar.tpc_finish(t1)
and the data manager finishes the two-phase commit:
>>> dm.state, dm.delta
(5, 0)
>>> dm.prepared
False
"""
def test_commit_w_subtransactions_featuring_subtransaction_abort():
"""
So, we have a data manager:
>>> dm = DataManager()
and we do some work that modifies uncommited state:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 1)
Now we'll commit the changes in a subtransaction. When the data
manager joins a transaction, the transaction will create an
adapter.
>>> dma = DataManagerAdapter(dm)
and register it as a modified object. At commit time, the
transaction will get the "jar" like this:
>>> jar = getattr(dma, '_p_jar', dma)
and, of course, the jar and the adapter will be the same:
>>> jar is dma
True
The transaction will call tpc_begin:
>>> t1 = '1'
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
Then the transaction will call commit on the jar:
>>> jar.commit(t1)
This doesn't actually do anything. :)
>>> dm.state, dm.delta
(0, 1)
The transaction will then call tpc_vote:
>>> jar.tpc_vote(t1)
This doesn't do anything either, because zodb4 data managers don't
actually do two-phase commit for subtransactions.
>>> dm.state, dm.delta
(0, 1)
Finally, we call tpc_finish. This does actally create a savepoint,
but we can't really tell that from outside.
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 1)
We'll do more of the above:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 2)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 2)
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
But then we'll decide to abort a subtransaction.
The transaction will just call abort as usual:
>>> jar.abort(t1)
This will cause a rollback to the last savepoint:
>>> dm.state, dm.delta
(0, 2)
Then we do more work:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 3)
Note that the bove works *because* the same transaction is used
for each subtransaction.
Finally, we'll do a little more work:
>>> dm.inc()
>>> dm.inc()
>>> dm.state, dm.delta
(0, 5)
and then commit the top-level transaction.
The transaction will actually go through the steps for a subtransaction:
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
And then call commit_sub:
>>> jar.commit_sub(t1)
As usual, this doesn't actually do anything. ;)
>>> dm.state, dm.delta
(0, 5)
The transaction manager doesn's call tpc_begin, because commit_sub
implies the start of two-phase commit. Next, it does call commit:
>>> jar.commit(t1)
which doesn't do anything.
Finally, the transaction calls tpc_vote:
>>> jar.tpc_vote(t1)
which actually does something (because this is the top-level txn):
>>> dm.state, dm.delta
(5, 5)
>>> dm.prepared
True
Finally, tpc_finish is called:
>>> jar.tpc_finish(t1)
and the data manager finishes the two-phase commit:
>>> dm.state, dm.delta
(5, 0)
>>> dm.prepared
False
"""
def test_abort_w_subtransactions():
"""
So, we have a data manager:
>>> dm = DataManager()
and we do some work that modifies uncommited state:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 1)
Now we'll commit the changes in a subtransaction. When the data
manager joins a transaction, the transaction will create an
adapter.
>>> dma = DataManagerAdapter(dm)
and register it as a modified object. At commit time, the
transaction will get the "jar" like this:
>>> jar = getattr(dma, '_p_jar', dma)
and, of course, the jar and the adapter will be the same:
>>> jar is dma
True
The transaction will call tpc_begin:
>>> t1 = '1'
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
Then the transaction will call commit on the jar:
>>> jar.commit(t1)
This doesn't actually do anything. :)
>>> dm.state, dm.delta
(0, 1)
The transaction will then call tpc_vote:
>>> jar.tpc_vote(t1)
This doesn't do anything either, because zodb4 data managers don't
actually do two-phase commit for subtransactions.
>>> dm.state, dm.delta
(0, 1)
Finally, we call tpc_finish. This does actally create a savepoint,
but we can't really tell that from outside.
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 1)
We'll do more of the above:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 2)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 2)
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 3)
Note that the bove works *because* the same transaction is used
for each subtransaction.
Finally, we'll do a little more work:
>>> dm.inc()
>>> dm.inc()
>>> dm.state, dm.delta
(0, 5)
and then abort the top-level transaction.
The transaction first call abort on the jar:
>>> jar.abort(t1)
This will have the effect of aborting the subtrancation:
>>> dm.state, dm.delta
(0, 3)
Then the transaction will call abort_sub:
>>> jar.abort_sub(t1)
This will abort all of the subtransactions:
>>> dm.state, dm.delta
(0, 0)
"""
def test_tpc_abort_w_subtransactions_featuring_subtransaction_abort():
"""
So, we have a data manager:
>>> dm = DataManager()
and we do some work that modifies uncommited state:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 1)
Now we'll commit the changes in a subtransaction. When the data
manager joins a transaction, the transaction will create an
adapter.
>>> dma = DataManagerAdapter(dm)
and register it as a modified object. At commit time, the
transaction will get the "jar" like this:
>>> jar = getattr(dma, '_p_jar', dma)
and, of course, the jar and the adapter will be the same:
>>> jar is dma
True
The transaction will call tpc_begin:
>>> t1 = '1'
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
Then the transaction will call commit on the jar:
>>> jar.commit(t1)
This doesn't actually do anything. :)
>>> dm.state, dm.delta
(0, 1)
The transaction will then call tpc_vote:
>>> jar.tpc_vote(t1)
This doesn't do anything either, because zodb4 data managers don't
actually do two-phase commit for subtransactions.
>>> dm.state, dm.delta
(0, 1)
Finally, we call tpc_finish. This does actally create a savepoint,
but we can't really tell that from outside.
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 1)
We'll do more of the above:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 2)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 2)
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
But then we'll decide to abort a subtransaction.
The transaction will just call abort as usual:
>>> jar.abort(t1)
This will cause a rollback to the last savepoint:
>>> dm.state, dm.delta
(0, 2)
Then we do more work:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 3)
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
>>> dm.state, dm.delta
(0, 3)
Note that the bove works *because* the same transaction is used
for each subtransaction.
Finally, we'll do a little more work:
>>> dm.inc()
>>> dm.inc()
>>> dm.state, dm.delta
(0, 5)
and then commit the top-level transaction.
The transaction will actually go through the steps for a subtransaction:
>>> jar.tpc_begin(t1, 1) # 1 -> subtxn
>>> jar.commit(t1)
>>> jar.tpc_vote(t1)
>>> jar.tpc_finish(t1)
And then call commit_sub:
>>> jar.commit_sub(t1)
As usual, this doesn't actually do anything. ;)
>>> dm.state, dm.delta
(0, 5)
The transaction manager doesn's call tpc_begin, because commit_sub
implies the start of two-phase commit. Next, it does call commit:
>>> jar.commit(t1)
which doesn't do anything.
Finally, the transaction calls tpc_vote:
>>> jar.tpc_vote(t1)
which actually does something (because this is the top-level txn):
>>> dm.state, dm.delta
(5, 5)
>>> dm.prepared
True
Now, at the last minute, the transaction is aborted (possibly due
to a "no vote" from another data manager):
>>> jar.tpc_abort(t1)
An the changes are undone:
>>> dm.state, dm.delta
(0, 0)
>>> dm.prepared
False
"""
def test_suite(): def test_suite():
return DocTestSuite() return DocTestSuite()
......
...@@ -288,10 +288,11 @@ class IPersistentDataManager(Interface): ...@@ -288,10 +288,11 @@ class IPersistentDataManager(Interface):
policy of one transaction manager for each thread. policy of one transaction manager for each thread.
""" """
def mtime(object): # Maybe later:
"""Return the modification time of the object. ## def mtime(object):
## """Return the modification time of the object.
The modification time may not be known, in which case None
is returned. If non-None, the return value is the kind of ## The modification time may not be known, in which case None
timestamp supplied by Python's time.time(). ## is returned. If non-None, the return value is the kind of
""" ## timestamp supplied by Python's time.time().
## """
...@@ -24,6 +24,7 @@ get = manager.get ...@@ -24,6 +24,7 @@ get = manager.get
begin = manager.begin begin = manager.begin
commit = manager.commit commit = manager.commit
abort = manager.abort abort = manager.abort
savepoint = manager.savepoint
def get_transaction(): def get_transaction():
from ZODB.utils import deprecated36 from ZODB.utils import deprecated36
......
...@@ -67,6 +67,9 @@ class TransactionManager(object): ...@@ -67,6 +67,9 @@ class TransactionManager(object):
def abort(self, sub=False): def abort(self, sub=False):
self.get().abort(sub) self.get().abort(sub)
def savepoint(self, optimistic=False):
return self.get().savepoint(optimistic)
class ThreadTransactionManager(TransactionManager): class ThreadTransactionManager(TransactionManager):
"""Thread-aware transaction manager. """Thread-aware transaction manager.
......
...@@ -30,6 +30,8 @@ registers its _p_jar attribute. TODO: explain adapter ...@@ -30,6 +30,8 @@ registers its _p_jar attribute. TODO: explain adapter
Subtransactions Subtransactions
--------------- ---------------
Note: Suntransactions are deprecated!
A subtransaction applies the transaction notion recursively. It A subtransaction applies the transaction notion recursively. It
allows a set of modifications within a transaction to be committed or allows a set of modifications within a transaction to be committed or
aborted as a group. A subtransaction is a strictly local activity; aborted as a group. A subtransaction is a strictly local activity;
...@@ -82,7 +84,7 @@ calls the following four methods on each resource manager; it calls ...@@ -82,7 +84,7 @@ calls the following four methods on each resource manager; it calls
tpc_begin() on each resource manager before calling commit() on any of tpc_begin() on each resource manager before calling commit() on any of
them. them.
1. tpc_begin(txn, subtransaction=False) 1. tpc_begin(txn)
2. commit(txn) 2. commit(txn)
3. tpc_vote(txn) 3. tpc_vote(txn)
4. tpc_finish(txn) 4. tpc_finish(txn)
...@@ -90,6 +92,8 @@ them. ...@@ -90,6 +92,8 @@ them.
Subtransaction commit Subtransaction commit
--------------------- ---------------------
Note: Subtransactions are deprecated!
When a subtransaction commits, the protocol is different. When a subtransaction commits, the protocol is different.
1. tpc_begin() is passed a second argument, which indicates that a 1. tpc_begin() is passed a second argument, which indicates that a
...@@ -128,8 +132,7 @@ uncommitted object, including the object that failed in its commit(), ...@@ -128,8 +132,7 @@ uncommitted object, including the object that failed in its commit(),
call abort(). call abort().
Once uncommitted objects are aborted, tpc_abort() or abort_sub() is Once uncommitted objects are aborted, tpc_abort() or abort_sub() is
called on each resource manager. abort_sub() is called if the called on each resource manager.
resource manager was involved in a subtransaction.
Synchronization Synchronization
--------------- ---------------
...@@ -213,14 +216,6 @@ class Transaction(object): ...@@ -213,14 +216,6 @@ class Transaction(object):
self.log = logging.getLogger("txn.%d" % thread.get_ident()) self.log = logging.getLogger("txn.%d" % thread.get_ident())
self.log.debug("new transaction") self.log.debug("new transaction")
# _sub contains all of the resource managers involved in
# subtransactions. It maps id(a resource manager) to the resource
# manager.
self._sub = {}
# _nonsub contains all the resource managers that do not support
# subtransactions that were involved in subtransaction commits.
self._nonsub = {}
# If a commit fails, the traceback is saved in _failure_traceback. # If a commit fails, the traceback is saved in _failure_traceback.
# If another attempt is made to commit, TransactionFailedError is # If another attempt is made to commit, TransactionFailedError is
# raised, incorporating this traceback. # raised, incorporating this traceback.
...@@ -231,6 +226,9 @@ class Transaction(object): ...@@ -231,6 +226,9 @@ class Transaction(object):
# inefficient for FIFO access of this kind. # inefficient for FIFO access of this kind.
self._before_commit = [] self._before_commit = []
# Keep track of the last savepoint
self._last_savepoint = None
# Raise TransactionFailedError, due to commit()/join()/register() # Raise TransactionFailedError, due to commit()/join()/register()
# getting called when the current transaction has already suffered # getting called when the current transaction has already suffered
# a commit failure. # a commit failure.
...@@ -258,6 +256,34 @@ class Transaction(object): ...@@ -258,6 +256,34 @@ class Transaction(object):
resource = DataManagerAdapter(resource) resource = DataManagerAdapter(resource)
self._resources.append(resource) self._resources.append(resource)
if self._last_savepoint is not None:
self._last_savepoint.join(resource)
def savepoint(self, optimistic=False):
if self.status is Status.COMMITFAILED:
self._prior_commit_failed() # doesn't return, it raises
try:
savepoint = Savepoint(optimistic)
for resource in self._resources:
savepoint.join(resource)
except:
self._cleanup(self._resources)
self._saveCommitishError() # doesn't return, it raises!
if self._last_savepoint is not None:
savepoint.previous = self._last_savepoint
self._last_savepoint.next = savepoint
self._last_savepoint = savepoint
return savepoint
def _invalidate_last_savepoint(self):
# Invalidate the last savepoint and any previous
# savepoints. This is done on a commit or abort.
if self._last_savepoint is not None:
self._last_savepoint._invalidate_previous()
self._last_savepoint = None
def register(self, obj): def register(self, obj):
# The old way of registering transaction participants. # The old way of registering transaction participants.
# #
...@@ -273,9 +299,6 @@ class Transaction(object): ...@@ -273,9 +299,6 @@ class Transaction(object):
raise ValueError("Register with no manager") raise ValueError("Register with no manager")
adapter = self._adapters.get(manager) adapter = self._adapters.get(manager)
if adapter is None: if adapter is None:
if myhasattr(manager, "commit_sub"):
adapter = MultiObjectResourceAdapterSub(manager)
else:
adapter = MultiObjectResourceAdapter(manager) adapter = MultiObjectResourceAdapter(manager)
adapter.objects.append(obj) adapter.objects.append(obj)
self._adapters[manager] = adapter self._adapters[manager] = adapter
...@@ -286,46 +309,46 @@ class Transaction(object): ...@@ -286,46 +309,46 @@ class Transaction(object):
assert id(obj) not in map(id, adapter.objects) assert id(obj) not in map(id, adapter.objects)
adapter.objects.append(obj) adapter.objects.append(obj)
# In the presence of subtransactions, an existing adapter
# might be in _adapters but not in _resources.
if adapter not in self._resources:
self._resources.append(adapter)
def begin(self): def begin(self):
from ZODB.utils import deprecated36 from ZODB.utils import deprecated36
deprecated36("Transaction.begin() should no longer be used; use " deprecated36("Transaction.begin() should no longer be used; use "
"the begin() method of a transaction manager.") "the begin() method of a transaction manager.")
if (self._resources or if (self._resources or self._synchronizers):
self._sub or
self._nonsub or
self._synchronizers):
self.abort() self.abort()
# Else aborting wouldn't do anything, except if _manager is non-None, # Else aborting wouldn't do anything, except if _manager is non-None,
# in which case it would do nothing besides uselessly free() this # in which case it would do nothing besides uselessly free() this
# transaction. # transaction.
def commit(self, subtransaction=False): def commit(self, subtransaction=False):
self._invalidate_last_savepoint()
if subtransaction:
# TODO depricate subtransactions
self.savepoint(1)
return
if self.status is Status.COMMITFAILED: if self.status is Status.COMMITFAILED:
self._prior_commit_failed() # doesn't return self._prior_commit_failed() # doesn't return
if not subtransaction:
self._callBeforeCommitHooks() self._callBeforeCommitHooks()
if not subtransaction and self._sub and self._resources:
# This commit is for a top-level transaction that has
# previously committed subtransactions. Do one last
# subtransaction commit to clear out the current objects,
# then commit all the subjars.
self.commit(True)
if not subtransaction:
self._synchronizers.map(lambda s: s.beforeCompletion(self)) self._synchronizers.map(lambda s: s.beforeCompletion(self))
self.status = Status.COMMITTING self.status = Status.COMMITTING
try: try:
self._commitResources(subtransaction) self._commitResources()
except: except:
self._saveCommitishError() # This raises!
self.status = Status.COMMITTED
if self._manager:
self._manager.free(self)
self._synchronizers.map(lambda s: s.afterCompletion(self))
self.log.debug("commit")
def _saveCommitishError(self):
self.status = Status.COMMITFAILED self.status = Status.COMMITFAILED
# Save the traceback for TransactionFailedError. # Save the traceback for TransactionFailedError.
ft = self._failure_traceback = StringIO() ft = self._failure_traceback = StringIO()
...@@ -338,14 +361,6 @@ class Transaction(object): ...@@ -338,14 +361,6 @@ class Transaction(object):
ft.writelines(traceback.format_exception_only(t, v)) ft.writelines(traceback.format_exception_only(t, v))
raise t, v, tb raise t, v, tb
if subtransaction:
self._resources = []
else:
self.status = Status.COMMITTED
if self._manager:
self._manager.free(self)
self._synchronizers.map(lambda s: s.afterCompletion(self))
self.log.debug("commit")
def beforeCommitHook(self, hook, *args, **kws): def beforeCommitHook(self, hook, *args, **kws):
self._before_commit.append((hook, args, kws)) self._before_commit.append((hook, args, kws))
...@@ -357,28 +372,17 @@ class Transaction(object): ...@@ -357,28 +372,17 @@ class Transaction(object):
hook, args, kws = self._before_commit.pop(0) hook, args, kws = self._before_commit.pop(0)
hook(*args, **kws) hook(*args, **kws)
def _commitResources(self, subtransaction): def _commitResources(self):
# Execute the two-phase commit protocol. # Execute the two-phase commit protocol.
L = self._getResourceManagers(subtransaction) L = list(self._resources)
L.sort(rm_cmp)
try: try:
for rm in L: for rm in L:
# If you pass subtransaction=True to tpc_begin(), it rm.tpc_begin(self)
# will create a temporary storage for the duration of
# the transaction. To signal that the top-level
# transaction is committing, you must then call
# commit_sub().
if not subtransaction and id(rm) in self._sub:
del self._sub[id(rm)]
rm.commit_sub(self)
else:
rm.tpc_begin(self, subtransaction)
for rm in L: for rm in L:
rm.commit(self) rm.commit(self)
self.log.debug("commit %r" % rm) self.log.debug("commit %r" % rm)
if not subtransaction:
# Not sure why, but it is intentional that you do not
# call tpc_vote() for subtransaction commits.
for rm in L: for rm in L:
rm.tpc_vote(self) rm.tpc_vote(self)
self._voted[id(rm)] = True self._voted[id(rm)] = True
...@@ -401,7 +405,6 @@ class Transaction(object): ...@@ -401,7 +405,6 @@ class Transaction(object):
try: try:
self._cleanup(L) self._cleanup(L)
finally: finally:
if not subtransaction:
self._synchronizers.map(lambda s: s.afterCompletion(self)) self._synchronizers.map(lambda s: s.afterCompletion(self))
raise t, v, tb raise t, v, tb
...@@ -415,68 +418,30 @@ class Transaction(object): ...@@ -415,68 +418,30 @@ class Transaction(object):
self.log.error("Error in abort() on manager %s", self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info()) rm, exc_info=sys.exc_info())
for rm in L: for rm in L:
if id(rm) in self._sub:
try:
rm.abort_sub(self)
except Exception:
self.log.error("Error in abort_sub() on manager %s",
rm, exc_info=sys.exc_info())
else:
try: try:
rm.tpc_abort(self) rm.tpc_abort(self)
except Exception: except Exception:
self.log.error("Error in tpc_abort() on manager %s", self.log.error("Error in tpc_abort() on manager %s",
rm, exc_info=sys.exc_info()) rm, exc_info=sys.exc_info())
def _getResourceManagers(self, subtransaction): def abort(self, subtransaction=False):
L = []
if subtransaction:
# If we are in a subtransaction, make sure all resource
# managers are placed in either _sub or _nonsub. When
# the top-level transaction commits, we need to merge
# these back into the resource set.
# If a data manager doesn't support sub-transactions, we
# don't do anything with it now. (That's somewhat okay,
# because subtransactions are mostly just an
# optimization.) Save it until the top-level transaction
# commits.
for rm in self._resources: if subtransaction:
if myhasattr(rm, "commit_sub"): # TODO deprecate subtransactions
self._sub[id(rm)] = rm if not self._last_savepoint:
L.append(rm) raise interfaces.InvalidSavepointRollbackError
else: if self._last_savepoint.valid:
self._nonsub[id(rm)] = rm # We're supposed to be able to call abort(1) multiple
else: # times. Sigh.
if self._sub or self._nonsub: self._last_savepoint.rollback()
# Merge all of _sub, _nonsub, and _resources. return
d = dict(self._sub)
d.update(self._nonsub)
# TODO: I think _sub and _nonsub are disjoint, and that
# _resources is empty. If so, we can simplify this code.
assert len(d) == len(self._sub) + len(self._nonsub)
assert not self._resources
for rm in self._resources:
d[id(rm)] = rm
L = d.values()
else:
L = list(self._resources)
L.sort(rm_cmp) self._invalidate_last_savepoint()
return L
def abort(self, subtransaction=False):
if not subtransaction:
self._synchronizers.map(lambda s: s.beforeCompletion(self)) self._synchronizers.map(lambda s: s.beforeCompletion(self))
if subtransaction and self._nonsub:
from ZODB.POSException import TransactionError
raise TransactionError("Resource manager does not support "
"subtransaction abort")
tb = None tb = None
for rm in self._resources + self._nonsub.values(): for rm in self._resources:
try: try:
rm.abort(self) rm.abort(self)
except: except:
...@@ -485,20 +450,11 @@ class Transaction(object): ...@@ -485,20 +450,11 @@ class Transaction(object):
self.log.error("Failed to abort resource manager: %s", self.log.error("Failed to abort resource manager: %s",
rm, exc_info=sys.exc_info()) rm, exc_info=sys.exc_info())
if not subtransaction:
for rm in self._sub.values():
try:
rm.abort_sub(self)
except:
if tb is None:
t, v, tb = sys.exc_info()
self.log.error("Failed to abort_sub resource manager: %s",
rm, exc_info=sys.exc_info())
if not subtransaction:
if self._manager: if self._manager:
self._manager.free(self) self._manager.free(self)
self._synchronizers.map(lambda s: s.afterCompletion(self)) self._synchronizers.map(lambda s: s.afterCompletion(self))
self.log.debug("abort") self.log.debug("abort")
if tb is not None: if tb is not None:
...@@ -539,8 +495,8 @@ class MultiObjectResourceAdapter(object): ...@@ -539,8 +495,8 @@ class MultiObjectResourceAdapter(object):
def sortKey(self): def sortKey(self):
return self.manager.sortKey() return self.manager.sortKey()
def tpc_begin(self, txn, sub=False): def tpc_begin(self, txn):
self.manager.tpc_begin(txn, sub) self.manager.tpc_begin(txn)
def tpc_finish(self, txn): def tpc_finish(self, txn):
self.manager.tpc_finish(txn) self.manager.tpc_finish(txn)
...@@ -571,25 +527,6 @@ class MultiObjectResourceAdapter(object): ...@@ -571,25 +527,6 @@ class MultiObjectResourceAdapter(object):
if tb is not None: if tb is not None:
raise t, v, tb raise t, v, tb
class MultiObjectResourceAdapterSub(MultiObjectResourceAdapter):
"""Adapt resource managers that participate in subtransactions."""
def commit_sub(self, txn):
self.manager.commit_sub(txn)
def abort_sub(self, txn):
self.manager.abort_sub(txn)
def tpc_begin(self, txn, sub=False):
self.manager.tpc_begin(txn, sub)
self.sub = sub
def tpc_finish(self, txn):
self.manager.tpc_finish(txn)
if self.sub:
self.objects = []
def rm_cmp(rm1, rm2): def rm_cmp(rm1, rm2):
return cmp(rm1.sortKey(), rm2.sortKey()) return cmp(rm1.sortKey(), rm2.sortKey())
...@@ -624,50 +561,82 @@ class DataManagerAdapter(object): ...@@ -624,50 +561,82 @@ class DataManagerAdapter(object):
def __init__(self, datamanager): def __init__(self, datamanager):
self._datamanager = datamanager self._datamanager = datamanager
self._rollback = None
# TODO: I'm not sure why commit() doesn't do anything # TODO: I'm not sure why commit() doesn't do anything
def commit(self, transaction): def commit(self, transaction):
# We don't do anything here because ZODB4-style data managers
# didn't have a separate commit step
pass pass
def abort(self, transaction): def abort(self, transaction):
# We need to discard any changes since the last save point, or all
# changes
if self._rollback is None:
# No previous savepoint, so just abort
self._datamanager.abort(transaction)
else:
self._rollback()
def abort_sub(self, transaction):
self._datamanager.abort(transaction) self._datamanager.abort(transaction)
def commit_sub(self, transaction): def tpc_begin(self, transaction):
# Nothing to do wrt data, be we begin 2pc for the top-level # We don't do anything here because ZODB4-style data managers
# trans # didn't have a separate tpc_begin step
self._sub = False pass
def tpc_begin(self, transaction, subtransaction=False):
self._sub = subtransaction
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
if self._sub:
self.abort(self, transaction)
else:
self._datamanager.abort(transaction) self._datamanager.abort(transaction)
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
if self._sub:
self._rollback = self._datamanager.savepoint(transaction).rollback
else:
self._datamanager.commit(transaction) self._datamanager.commit(transaction)
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
if not self._sub:
self._datamanager.prepare(transaction) self._datamanager.prepare(transaction)
def sortKey(self): def sortKey(self):
return self._datamanager.sortKey() return self._datamanager.sortKey()
class Savepoint:
"""Transaction savepoint
Transaction savepoints coordinate savepoints for data managers
participating in a transaction.
"""
interface.implements(interfaces.ISavepoint)
def __init__(self, optimistic):
self._savepoints = []
self.valid = True
self.next = self.previous = None
self.optimistic = optimistic
def join(self, datamanager):
try:
savepoint = datamanager.savepoint
except AttributeError:
if not self.optimistic:
raise TypeError("Savepoints unsupported", datamanager)
savepoint = NoRollbackSavepoint(datamanager)
else:
savepoint = savepoint()
self._savepoints.append(savepoint)
def rollback(self):
if not self.valid:
raise interfaces.InvalidSavepointRollbackError
self._invalidate_next()
for savepoint in self._savepoints:
savepoint.rollback()
def _invalidate_next(self):
self.valid = False
if self.next is not None:
self.next._invalidate_next()
def _invalidate_previous(self):
self.valid = False
if self.previous is not None:
self.previous._invalidate_previous()
class NoRollbackSavepoint:
def __init__(self, datamanager):
self.datamanager = datamanager
def rollback(self):
raise TypeError("Savepoints unsupported", self.datamanager)
...@@ -18,157 +18,46 @@ $Id$ ...@@ -18,157 +18,46 @@ $Id$
import zope.interface import zope.interface
class ISynchronizer(zope.interface.Interface): class ITransactionManager(zope.interface.Interface):
"""Objects that participate in the transaction-boundary notification API. """An object that manages a sequence of transactions
"""
def beforeCompletion(transaction):
"""Hook that is called by the transaction at the start of a commit."""
def afterCompletion(transaction):
"""Hook that is called by the transaction after completing a commit."""
class IDataManager(zope.interface.Interface): Applications use transaction managers to establish transaction boundaries.
"""Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage
non-object storages, such as relational databases.
IDataManagerOriginal is the interface currently provided by ZODB
database connections, but the intent is to move to the newer
IDataManager.
""" """
def abort_sub(transaction):
"""Discard all subtransaction data.
See subtransaction.txt
This is called when top-level transactions are aborted.
No further subtransactions can be started once abort_sub()
has been called; this is only used when the transaction is
being aborted.
abort_sub also implies the abort of a 2-phase commit. def begin():
"""Begin a new transaction.
This should never fail. If an existing transaction is in progress, it will be aborted.
""" """
def commit_sub(transaction): def get():
"""Commit all changes made in subtransactions and begin 2-phase commit """Get the current transaction.
Data are saved *as if* they are part of the current transaction.
That is, they will not be persistent unless the current transaction
is committed.
This is called when the current top-level transaction is committed.
No further subtransactions can be started once commit_sub()
has been called; this is only used when the transaction is
being committed.
This call also implies the beginning of 2-phase commit.
""" """
# Two-phase commit protocol. These methods are called by the def commit():
# ITransaction object associated with the transaction being """Commit the current transaction
# committed.
def tpc_begin(transaction, subtransaction=False):
"""Begin commit of a transaction, starting the two-phase commit.
transaction is the ITransaction instance associated with the
transaction being committed.
subtransaction is a Boolean flag indicating whether the
two-phase commit is being invoked for a subtransaction.
Important note: Subtransactions are modelled in the sense that
when you commit a subtransaction, subsequent commits should be
for subtransactions as well. That is, there must be a
commit_sub() call between a tpc_begin() call with the
subtransaction flag set to true and a tpc_begin() with the
flag set to false.
""" """
def tpc_abort(transaction): def abort(self):
"""Abort a transaction. """Abort the current transaction
This is called by a transaction manager to end a two-phase commit on
the data manager.
This is always called after a tpc_begin call.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail.
""" """
def tpc_finish(transaction): def registerSynch(synch):
"""Indicate confirmation that the transaction is done. """Register an ISynchronizer.
transaction is the ITransaction instance associated with the Synchronizers are notified at the beginning and end of
transaction being committed. transaction completion.
This should never fail. If this raises an exception, the
database is not expected to maintain consistency; it's a
serious error.
It's important that the storage calls the passed function
while it still has its lock. We don't want another thread
to be able to read any updated data until we've had a chance
to send an invalidation message to all of the other
connections!
""" """
def tpc_vote(transaction): def unregisterSynch(synch):
"""Verify that a data manager can commit the transaction """Unregister an ISynchronizer.
This is the last chance for a data manager to vote 'no'. A
data manager votes 'no' by raising an exception.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def commit(transaction):
"""Commit modifications to registered objects.
Save the object as part of the data to be made persistent if
the transaction commits.
This includes conflict detection and handling. If no conflicts or
errors occur it saves the objects in the storage.
"""
def abort(transaction):
"""Abort a transaction and forget all changes.
Abort must be called outside of a two-phase commit.
Abort is called by the transaction manager to abort transactions
that are not yet in a two-phase commit.
"""
def sortKey(): Synchronizers are notified at the beginning and end of
"""Return a key to use for ordering registered DataManagers transaction completion.
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
""" """
# Alternate version:
#"""Return a consistent sort key for this connection.
#
#This allows ordering multiple connections that use the same storage in
#a consistent manner. This is unique for the lifetime of a connection,
#which is good enough to avoid ZEO deadlocks.
#"""
class ITransaction(zope.interface.Interface): class ITransaction(zope.interface.Interface):
"""Object representing a running transaction. """Object representing a running transaction.
...@@ -219,9 +108,17 @@ class ITransaction(zope.interface.Interface): ...@@ -219,9 +108,17 @@ class ITransaction(zope.interface.Interface):
def join(datamanager): def join(datamanager):
"""Add a datamanager to the transaction. """Add a datamanager to the transaction.
The if the data manager supports savepoints, it must call this
*before* making any changes. If the transaction has had any
savepoints, then it will take a savepoint of the data manager
when join is called and this savepoint must reflct the state
of the data manager before any changes that caused the data
manager to join the transaction.
The datamanager must implement the The datamanager must implement the
transactions.interfaces.IDataManager interface, and be transactions.interfaces.IDataManager interface, and be
adaptable to ZODB.interfaces.IDataManager. adaptable to ZODB.interfaces.IDataManager.
""" """
def note(text): def note(text):
...@@ -293,3 +190,161 @@ class ITransactionDeprecated(zope.interface.Interface): ...@@ -293,3 +190,161 @@ class ITransactionDeprecated(zope.interface.Interface):
# TODO: deprecate this for 3.6. # TODO: deprecate this for 3.6.
def register(object): def register(object):
"""Register the given object for transaction control.""" """Register the given object for transaction control."""
class IDataManager(zope.interface.Interface):
"""Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage
non-object storages, such as relational databases.
IDataManagerOriginal is the interface currently provided by ZODB
database connections, but the intent is to move to the newer
IDataManager.
Note that when data are modified, data managers should join a
transaction so that data can be committed when the user commits
the transaction.
"""
# Two-phase commit protocol. These methods are called by the
# ITransaction object associated with the transaction being
# committed.
def abort(transaction):
"""Abort a transaction and forget all changes.
Abort must be called outside of a two-phase commit.
Abort is called by the transaction manager to abort transactions
that are not yet in a two-phase commit.
"""
def tpc_begin(transaction):
"""Begin commit of a transaction, starting the two-phase commit.
transaction is the ITransaction instance associated with the
transaction being committed.
subtransaction is a Boolean flag indicating whether the
two-phase commit is being invoked for a subtransaction.
Important note: Subtransactions are modelled in the sense that
when you commit a subtransaction, subsequent commits should be
for subtransactions as well. That is, there must be a
commit_sub() call between a tpc_begin() call with the
subtransaction flag set to true and a tpc_begin() with the
flag set to false.
"""
def commit(transaction):
"""Commit modifications to registered objects.
Save the object as part of the data to be made persistent if
the transaction commits.
This includes conflict detection and handling. If no conflicts or
errors occur it saves the objects in the storage.
"""
def tpc_abort(transaction):
"""Abort a transaction.
This is called by a transaction manager to end a two-phase commit on
the data manager.
This is always called after a tpc_begin call.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail.
"""
def tpc_vote(transaction):
"""Verify that a data manager can commit the transaction
This is the last chance for a data manager to vote 'no'. A
data manager votes 'no' by raising an exception.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_finish(transaction):
"""Indicate confirmation that the transaction is done.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail. If this raises an exception, the
database is not expected to maintain consistency; it's a
serious error.
It's important that the storage calls the passed function
while it still has its lock. We don't want another thread
to be able to read any updated data until we've had a chance
to send an invalidation message to all of the other
connections!
"""
def sortKey():
"""Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
# Alternate version:
#"""Return a consistent sort key for this connection.
#
#This allows ordering multiple connections that use the same storage in
#a consistent manner. This is unique for the lifetime of a connection,
#which is good enough to avoid ZEO deadlocks.
#"""
class ISavepointDataManager(IDataManager):
def savepoint():
"""Return a savepoint (ISavepoint)
"""
class ISavepoint(zope.interface.Interface):
"""A transaction savepoint
"""
def rollback():
"""Rollback any work done since the savepoint
An InvalidSavepointRollbackError is raised if the savepoint
isn't valid.
"""
valid = zope.interface.Attribute(
"Boolean indicating whether the savepoint is valid")
class InvalidSavepointRollbackError(Exception):
"""Attempt to rollback an invalid savepoint
A savepoint may be invalid because:
- The surrounding transaction has committed or aborted
- An earlier savepoint in the same transaction has been rolled back
"""
class ISynchronizer(zope.interface.Interface):
"""Objects that participate in the transaction-boundary notification API.
"""
def beforeCompletion(transaction):
"""Hook that is called by the transaction at the start of a commit.
"""
def afterCompletion(transaction):
"""Hook that is called by the transaction after completing a commit.
"""
Savepoints
==========
Savepoints provide a way to save to disk intermediate work done during
a transaction allowing:
- partial transaction (subtransaction) rollback (abort)
- state of saved objects to be freed, freeing on-line memory for other
uses
Savepoints make it possible to write atomic subroutines that don't
make top-level transaction commitments.
Applications
------------
To demonstrate how savepoints work with transactions, we've provided a
sample data manager implementation that provides savepoint support.
The primary purpose of this data manager is to provide code that can
be read to understand how savepoints work. The secondary purpose is to
provide support for demonstrating the correct operation of savepoint
support within the transaction system. This data manager is very
simple. It provides flat storage of named immutable values, like strings
and numbers.
>>> import transaction.tests.savepointsample
>>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> dm['name'] = 'bob'
As with other data managers, we can commit changes:
>>> transaction.commit()
>>> dm['name']
'bob'
and abort changes:
>>> dm['name'] = 'sally'
>>> dm['name']
'sally'
>>> transaction.abort()
>>> dm['name']
'bob'
Now, lets look at an application that manages funds for people.
It allows deposits and debits to be entered for multiple people.
It accepts a sequence of entries and generates a sequence of status
messages. For each entry, it applies the change and then validates
the user's account. If the user's account is invalid, we role back
the change for that entry. The success or failure of an entry is
indicated in the output status. First we'll initialize some accounts:
>>> dm['bob-balance'] = 0.0
>>> dm['bob-credit'] = 0.0
>>> dm['sally-balance'] = 0.0
>>> dm['sally-credit'] = 100.0
>>> transaction.commit()
Now, we'll define a validation function to validate an account:
>>> def validate_account(name):
... if dm[name+'-balance'] + dm[name+'-credit'] < 0:
... raise ValueError('Overdrawn', name)
And a function to apply entries. If the function fails in some
unexpected way, it rolls back all of it's changes and
prints the error:
>>> def apply_entries(entries):
... savepoint = transaction.savepoint()
... try:
... for name, amount in entries:
... entry_savepoint = transaction.savepoint()
... try:
... dm[name+'-balance'] += amount
... validate_account(name)
... except ValueError, error:
... entry_savepoint.rollback()
... print 'Error', str(error)
... else:
... print 'Updated', name
... except Exception, error:
... savepoint.rollback()
... print 'Unexpected exception', error
Now let's try applying some entries:
>>> apply_entries([
... ('bob', 10.0),
... ('sally', 10.0),
... ('bob', 20.0),
... ('sally', 10.0),
... ('bob', -100.0),
... ('sally', -100.0),
... ])
Updated bob
Updated sally
Updated bob
Updated sally
Error ('Overdrawn', 'bob')
Updated sally
>>> dm['bob-balance']
30.0
>>> dm['sally-balance']
-80.0
If we give provide entries that cause an unexpected error:
>>> apply_entries([
... ('bob', 10.0),
... ('sally', 10.0),
... ('bob', '20.0'),
... ('sally', 10.0),
... ])
Updated bob
Updated sally
Unexpected exception unsupported operand type(s) for +=: 'float' and 'str'
Because the apply_entries used a savepoint for the entire function,
it was able to rollback the partial changes without rolling back
changes made in the previous call to apply_entries:
>>> dm['bob-balance']
30.0
>>> dm['sally-balance']
-80.0
If we now abort the outer transactions, the earlier changes will go
away:
>>> transaction.abort()
>>> dm['bob-balance']
0.0
>>> dm['sally-balance']
0.0
Savepoint invalidation
----------------------
Once a savepoint has been used, it can't be used again:
>>> savepoint = transaction.savepoint()
>>> dm['bob-balance'] = 100.0
>>> dm['bob-balance']
100.0
>>> savepoint.rollback()
>>> dm['bob-balance']
0.0
>>> savepoint.rollback()
Traceback (most recent call last):
...
InvalidSavepointRollbackError
Using a savepoint also invalidates any savepoints that com eafter it:
>>> savepoint1 = transaction.savepoint()
>>> dm['bob-balance'] = 100.0
>>> dm['bob-balance']
100.0
>>> savepoint2 = transaction.savepoint()
>>> dm['bob-balance'] = 200.0
>>> dm['bob-balance']
200.0
>>> savepoint1.rollback()
>>> dm['bob-balance']
0.0
>>> savepoint2.rollback()
Traceback (most recent call last):
...
InvalidSavepointRollbackError
>>> transaction.abort()
Databases without savepoint support
-----------------------------------
Normally it's an error to use savepoints with databases that don't
support savepoints:
>>> dm_no_sp = transaction.tests.savepointsample.SampleDataManager()
>>> dm_no_sp['name'] = 'bob'
>>> transaction.commit()
>>> dm_no_sp['name'] = 'sally'
>>> savepoint = transaction.savepoint()
Traceback (most recent call last):
...
TypeError: ('Savepoints unsupported', {'name': 'bob'})
>>> transaction.abort()
However, a flag can be passed to the transaction savepoint method to
indicate that databases without savepoint support should be tolderated
until a savepoint is roled back. This allows transactions to proceed
is there are no reasons to roll back:
>>> dm_no_sp['name'] = 'sally'
>>> savepoint = transaction.savepoint(1)
>>> dm_no_sp['name'] = 'sue'
>>> transaction.commit()
>>> dm_no_sp['name']
'sue'
>>> savepoint = transaction.savepoint(1)
>>> dm_no_sp['name'] = 'sam'
>>> savepoint.rollback()
Traceback (most recent call last):
...
TypeError: ('Savepoints unsupported', {'name': 'sam'})
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Savepoint data manager implementation example
Sample data manager implementation that illustrates how to implement savepoints
See savepoint.txt in the transaction package.
$Id$
"""
import UserDict
from zope import interface
import transaction.interfaces
class SampleDataManager(UserDict.DictMixin):
"""Sample implementation of data manager that doesn't support savepoints
This data manager stores named simple values, like strings and numbers.
"""
interface.implements(transaction.interfaces.ISavepointDataManager)
def __init__(self, transaction_manager = None):
if transaction_manager is None:
# Use the thread-local transaction manager if none is provided:
transaction_manager = transaction.manager
self.transaction_manager = transaction_manager
# Our committed and uncommitted data:
self.committed = {}
self.uncommitted = self.committed.copy()
# Our transaction state:
# If our uncommitted data is modified, we'll join a transaction
# and keep track of the transaction we joined. Any commit
# related messages we get should be for this same transaction
self.transaction = None
# What phase, if any, of two-phase commit we are in:
self.tpc_phase = None
#######################################################################
# Provide a mapping interface to uncommitted data. We provide
# a basic subset of the interface. DictMixin does the rest.
def __getitem__(self, name):
return self.uncommitted[name]
def __setitem__(self, name, value):
self._join() # join the current transaction, if we haven't already
self.uncommitted[name] = value
def __delitem__(self, name):
self._join() # join the current transaction, if we haven't already
del self.uncommitted[name]
def keys(self):
return self.uncommitted.keys()
#
#######################################################################
#######################################################################
# Transaction methods
def _join(self):
# If this is the first change in the transaction, join the transaction
if self.transaction is None:
self.transaction = self.transaction_manager.get()
self.transaction.join(self)
def _resetTransaction(self):
self.transaction = None
self.tpc_phase = None
def abort(self, transaction):
"""Throw away changes made before the commit process has started
"""
assert ((transaction is self.transaction) or (self.transaction is None)
), "Must not change transactions"
assert self.tpc_phase is None, "Must be called outside of tpc"
self.uncommitted = self.committed.copy()
self._resetTransaction()
def tpc_begin(self, transaction):
"""Enter two-phase commit
"""
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase is None, "Must be called outside of tpc"
self.tpc_phase = 1
def commit(self, transaction):
"""Record data modified during the transaction
"""
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase == 1, "Must be called in first phase of tpc"
# In our simple example, we don't need to do anything.
# A more complex data manager would typically write to some sort
# of log.
def tpc_vote(self, transaction):
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase == 1, "Must be called in first phase of tpc"
# This particular data manager is always ready to vote.
# Real data managers will usually need to take some steps to
# make sure that the finish will succeed
self.tpc_phase = 2
def tpc_finish(self, transaction):
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase == 2, "Must be called in second phase of tpc"
self.committed = self.uncommitted.copy()
self._resetTransaction()
def tpc_abort(self, transaction):
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase is not None, "Must be called inside of tpc"
self.uncommitted = self.committed.copy()
self._resetTransaction()
#
#######################################################################
#######################################################################
# Other data manager methods
def sortKey(self):
# Commit operations on multiple data managers are performed in
# sort key order. This important to avoid deadlock when data
# managers are shared among multiple threads or processes and
# use locks to manage that sharing. We aren't going to bother
# with that here.
return str(id(self))
#
#######################################################################
class SampleSavepointDataManager(SampleDataManager):
"""Sample implementation of a savepoint-supporting data manager
This extends the basic data manager with savepoint support.
"""
def savepoint(self):
# When we create the savepoint, we save the existing database state
return SampleSavepoint(self, self.uncommitted.copy())
def _rollback_savepoint(self, savepoint):
# when we rollback the savepoint, we restore the saved data
self.uncommitted = savepoint.data
class SampleSavepoint:
def __init__(self, data_manager, data):
self.data_manager = data_manager
self.data = data
def rollback(self):
self.data_manager._rollback_savepoint(self)
...@@ -128,7 +128,7 @@ class TestConnection: ...@@ -128,7 +128,7 @@ class TestConnection:
def sortKey(self): def sortKey(self):
return str(id(self)) return str(id(self))
def tpc_begin(self, txn, sub): def tpc_begin(self, txn):
self.calls.append("begin") self.calls.append("begin")
def tpc_vote(self, txn): def tpc_vote(self, txn):
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of savepoint feature
$Id$
"""
import unittest
from zope.testing import doctest
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite('../savepoint.txt'),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
...@@ -85,100 +85,6 @@ class TransactionTests(unittest.TestCase): ...@@ -85,100 +85,6 @@ class TransactionTests(unittest.TestCase):
t.abort() t.abort()
def testSubTransactionCommitCommit(self):
self.sub1.modify()
self.sub2.modify()
self.txn_mgr.commit(1)
assert self.sub1._p_jar.ctpc_vote == 0
assert self.sub1._p_jar.ctpc_finish == 1
self.txn_mgr.commit()
assert self.sub1._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_vote == 1
def testSubTransactionCommitAbort(self):
self.sub1.modify()
self.sub2.modify()
self.txn_mgr.commit(1)
self.txn_mgr.abort()
assert self.sub1._p_jar.ctpc_vote == 0
assert self.sub1._p_jar.cabort == 0
assert self.sub1._p_jar.cabort_sub == 1
def testMultipleSubTransactionCommitCommit(self):
self.sub1.modify()
self.txn_mgr.commit(1)
self.sub2.modify()
# reset a flag on the original to test it again
self.sub1.ctpc_finish = 0
self.txn_mgr.commit(1)
# this is interesting.. we go through
# every subtrans commit with all subtrans capable
# objects... i don't like this but its an impl artifact
assert self.sub1._p_jar.ctpc_vote == 0
assert self.sub1._p_jar.ctpc_finish > 0
# add another before we do the entire txn commit
self.sub3.modify()
self.txn_mgr.commit()
# we did an implicit sub commit, is this impl artifact?
assert self.sub3._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_finish > 1
def testMultipleSubTransactionCommitAbortSub(self):
"""
sub1 calling method commit
sub1 calling method tpc_finish
sub2 calling method tpc_begin
sub2 calling method commit
sub2 calling method tpc_finish
sub3 calling method abort
sub1 calling method commit_sub
sub2 calling method commit_sub
sub2 calling method tpc_vote
sub1 calling method tpc_vote
sub1 calling method tpc_finish
sub2 calling method tpc_finish
"""
# add it
self.sub1.modify()
self.txn_mgr.commit(1)
# add another
self.sub2.modify()
self.txn_mgr.commit(1)
assert self.sub1._p_jar.ctpc_vote == 0
assert self.sub1._p_jar.ctpc_finish > 0
# add another before we do the entire txn commit
self.sub3.modify()
# abort the sub transaction
self.txn_mgr.abort(1)
# commit the container transaction
self.txn_mgr.commit()
assert self.sub3._p_jar.cabort == 1
assert self.sub1._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_finish > 1
# repeat adding in a nonsub trans jars # repeat adding in a nonsub trans jars
...@@ -230,68 +136,6 @@ class TransactionTests(unittest.TestCase): ...@@ -230,68 +136,6 @@ class TransactionTests(unittest.TestCase):
assert self.nosub1._p_jar.cabort == 1 assert self.nosub1._p_jar.cabort == 1
assert self.sub1._p_jar.cabort_sub == 1 assert self.sub1._p_jar.cabort_sub == 1
def testNSJSubTransactionCommitCommit(self):
self.sub1.modify()
self.nosub1.modify()
self.txn_mgr.commit(1)
assert self.nosub1._p_jar.ctpc_vote == 0
self.txn_mgr.commit()
#assert self.nosub1._p_jar.ccommit_sub == 0
assert self.nosub1._p_jar.ctpc_vote == 1
assert self.sub1._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_vote == 1
def testNSJMultipleSubTransactionCommitCommit(self):
"""
sub1 calling method tpc_begin
sub1 calling method commit
sub1 calling method tpc_finish
nosub calling method tpc_begin
nosub calling method tpc_finish
sub2 calling method tpc_begin
sub2 calling method commit
sub2 calling method tpc_finish
nosub calling method tpc_begin
nosub calling method commit
sub1 calling method commit_sub
sub2 calling method commit_sub
sub1 calling method tpc_vote
nosub calling method tpc_vote
sub2 calling method tpc_vote
sub2 calling method tpc_finish
nosub calling method tpc_finish
sub1 calling method tpc_finish
"""
# add it
self.sub1.modify()
self.txn_mgr.commit(1)
# add another
self.nosub1.modify()
self.txn_mgr.commit(1)
assert self.sub1._p_jar.ctpc_vote == 0
assert self.nosub1._p_jar.ctpc_vote == 0
assert self.sub1._p_jar.ctpc_finish > 0
# add another before we do the entire txn commit
self.sub2.modify()
# commit the container transaction
self.txn_mgr.commit()
# we did an implicit sub commit
assert self.sub2._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_finish > 1
### Failure Mode Tests ### Failure Mode Tests
# #
...@@ -387,80 +231,6 @@ class TransactionTests(unittest.TestCase): ...@@ -387,80 +231,6 @@ class TransactionTests(unittest.TestCase):
assert self.nosub1._p_jar.ctpc_abort == 1 assert self.nosub1._p_jar.ctpc_abort == 1
### More Failure modes...
# now we mix in some sub transactions
###
def testExceptionInSubCommitSub(self):
# It's harder than normal to verify test results, because
# the subtransaction jars are stored in a dictionary. The
# order in which jars are processed depends on the order
# they come out of the dictionary.
self.sub1.modify()
self.txn_mgr.commit(1)
self.nosub1.modify()
self.sub2._p_jar = SubTransactionJar(errors='commit_sub')
self.sub2.modify(nojar=1)
self.txn_mgr.commit(1)
self.sub3.modify()
try:
self.txn_mgr.commit()
except TestTxnException:
pass
if self.sub1._p_jar.ccommit_sub:
self.assertEqual(self.sub1._p_jar.ctpc_abort, 1)
else:
self.assertEqual(self.sub1._p_jar.cabort_sub, 1)
self.assertEqual(self.sub2._p_jar.ctpc_abort, 1)
self.assertEqual(self.nosub1._p_jar.ctpc_abort, 1)
if self.sub3._p_jar.ccommit_sub:
self.assertEqual(self.sub3._p_jar.ctpc_abort, 1)
else:
self.assertEqual(self.sub3._p_jar.cabort_sub, 1)
def testExceptionInSubAbortSub(self):
# This test has two errors. When commit_sub() is called on
# sub1, it will fail. If sub1 is handled first, it will raise
# an except and abort_sub() will be called on sub2. If sub2
# is handled first, then commit_sub() will fail after sub2 has
# already begun its top-level transaction and tpc_abort() will
# be called.
self.sub1._p_jar = SubTransactionJar(errors='commit_sub')
self.sub1.modify(nojar=1)
self.txn_mgr.commit(1)
self.nosub1.modify()
self.sub2._p_jar = SubTransactionJar(errors='abort_sub')
self.sub2.modify(nojar=1)
self.txn_mgr.commit(1)
self.sub3.modify()
try:
self.txn_mgr.commit()
except TestTxnException, err:
pass
else:
self.fail("expected transaction to fail")
# The last commit failed. If the commit_sub() method was
# called, then tpc_abort() should be called to abort the
# actual transaction. If not, then calling abort_sub() is
# sufficient.
if self.sub3._p_jar.ccommit_sub:
self.assertEqual(self.sub3._p_jar.ctpc_abort, 1)
else:
self.assertEqual(self.sub3._p_jar.cabort_sub, 1)
# last test, check the hosing mechanism # last test, check the hosing mechanism
...@@ -507,7 +277,7 @@ class DataObject: ...@@ -507,7 +277,7 @@ class DataObject:
self._p_jar = NoSubTransactionJar(tracing=tracing) self._p_jar = NoSubTransactionJar(tracing=tracing)
else: else:
self._p_jar = SubTransactionJar(tracing=tracing) self._p_jar = SubTransactionJar(tracing=tracing)
self.txn_mgr.get().register(self) self.txn_mgr.get().join(self._p_jar)
class TestTxnException(Exception): class TestTxnException(Exception):
pass pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment