Commit 4f9af419 authored by Chris McDonough's avatar Chris McDonough

Remove the transaction package; we now have a setuptools install_requires...

Remove the transaction package; we now have a setuptools install_requires dependency that must be satisfied to get it.

Setup.py now no longer works without setuptools installed.

To see the state of the world directly before this change was made, see http://svn.zope.org/ZODB/tags/before_transaction_remove .

parent ba2478a3
...@@ -15,6 +15,12 @@ General ...@@ -15,6 +15,12 @@ General
- (3.9.0a1) Make it possible to examine oid and (in some situations) database - (3.9.0a1) Make it possible to examine oid and (in some situations) database
name of persistent object references during conflict resolution. name of persistent object references during conflict resolution.
- (unreleased, after 3.9.0a1) Moved 'transaction' module out of ZODB.
ZODB depends upon this module, but it must be installed separately.
- (unreleased, after 3.9.0a1) ZODB installation now requires
setuptools.
ZEO ZEO
--- ---
...@@ -25,7 +31,9 @@ ZEO ...@@ -25,7 +31,9 @@ ZEO
Transactions Transactions
------------ ------------
- - (unlreleased, after 3.9.0a1) 'transaction' module is not included in
ZODB anymore. It is now just a ZODB dependency (via setuptools
declarations).
Blobs Blobs
----- -----
......
...@@ -38,26 +38,9 @@ Operating System :: Microsoft :: Windows ...@@ -38,26 +38,9 @@ Operating System :: Microsoft :: Windows
Operating System :: Unix Operating System :: Unix
""" """
try: from setuptools import setup
from setuptools import setup
except ImportError: entry_points = """
from distutils.core import setup
extra = dict(
scripts = ["src/ZODB/scripts/fsdump.py",
"src/ZODB/scripts/fsoids.py",
"src/ZODB/scripts/fsrefs.py",
"src/ZODB/scripts/fstail.py",
"src/ZODB/scripts/fstest.py",
"src/ZODB/scripts/repozo.py",
"src/ZEO/scripts/zeopack.py",
"src/ZEO/scripts/runzeo.py",
"src/ZEO/scripts/zeopasswd.py",
"src/ZEO/scripts/mkzeoinst.py",
"src/ZEO/scripts/zeoctl.py",
],
)
else:
entry_points = """
[console_scripts] [console_scripts]
fsdump = ZODB.FileStorage.fsdump:main fsdump = ZODB.FileStorage.fsdump:main
fsoids = ZODB.scripts.fsoids:main fsoids = ZODB.scripts.fsoids:main
...@@ -70,19 +53,8 @@ else: ...@@ -70,19 +53,8 @@ else:
mkzeoinst = ZEO.mkzeoinst:main mkzeoinst = ZEO.mkzeoinst:main
zeoctl = ZEO.zeoctl:main zeoctl = ZEO.zeoctl:main
""" """
extra = dict(
install_requires = [ scripts = []
'zope.interface',
'zope.proxy',
'zope.testing',
'ZConfig',
'zdaemon',
],
zip_safe = False,
entry_points = entry_points,
include_package_data = True,
)
scripts = []
import glob import glob
import os import os
...@@ -177,7 +149,6 @@ packages = ["BTrees", "BTrees.tests", ...@@ -177,7 +149,6 @@ packages = ["BTrees", "BTrees.tests",
"ZODB", "ZODB.FileStorage", "ZODB.tests", "ZODB", "ZODB.FileStorage", "ZODB.tests",
"ZODB.scripts", "ZODB.scripts",
"persistent", "persistent.tests", "persistent", "persistent.tests",
"transaction", "transaction.tests",
"ThreadedAsync", "ThreadedAsync",
"ZopeUndo", "ZopeUndo.tests", "ZopeUndo", "ZopeUndo.tests",
] ]
...@@ -188,8 +159,6 @@ def copy_other_files(cmd, outputbase): ...@@ -188,8 +159,6 @@ def copy_other_files(cmd, outputbase):
extensions = ["*.conf", "*.xml", "*.txt", "*.sh"] extensions = ["*.conf", "*.xml", "*.txt", "*.sh"]
directories = [ directories = [
"BTrees", "BTrees",
"transaction",
"transaction/tests",
"persistent/tests", "persistent/tests",
"ZEO", "ZEO",
"ZEO/scripts", "ZEO/scripts",
...@@ -260,4 +229,27 @@ setup(name="ZODB3", ...@@ -260,4 +229,27 @@ setup(name="ZODB3",
classifiers = filter(None, classifiers.split("\n")), classifiers = filter(None, classifiers.split("\n")),
long_description = "\n".join(doclines[2:]), long_description = "\n".join(doclines[2:]),
distclass = MyDistribution, distclass = MyDistribution,
**extra) install_requires = [
'zope.interface',
'zope.proxy',
'zope.testing',
'ZConfig',
'zdaemon',
'transaction',
],
zip_safe = False,
entry_points = """
[console_scripts]
fsdump = ZODB.FileStorage.fsdump:main
fsoids = ZODB.scripts.fsoids:main
fsrefs = ZODB.scripts.fsrefs:main
fstail = ZODB.scripts.fstail:Main
repozo = ZODB.scripts.repozo:main
zeopack = ZEO.scripts.zeopack:main
runzeo = ZEO.runzeo:main
zeopasswd = ZEO.zeopasswd:main
mkzeoinst = ZEO.mkzeoinst:main
zeoctl = ZEO.zeoctl:main
""",
include_package_data = True,
)
============
Transactions
============
This package contains a generic transaction implementation for Python. It is
mainly used by the ZODB, though.
Note that the data manager API, ``transaction.interfaces.IDataManager``,
is syntactically simple, but semantically complex. The semantics
were not easy to express in the interface. This could probably use
more work. The semantics are presented in detail through examples of
a sample data manager in ``transaction.tests.test_SampleDataManager``.
############################################################################
#
# Copyright (c) 2001, 2002, 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
"""Exported transaction functions.
$Id$
"""
from transaction._transaction import Transaction
from transaction._manager import TransactionManager, ThreadTransactionManager
manager = ThreadTransactionManager()
get = manager.get
begin = manager.begin
commit = manager.commit
abort = manager.abort
doom = manager.doom
isDoomed = manager.isDoomed
savepoint = manager.savepoint
############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
"""A TransactionManager controls transaction boundaries.
It coordinates application code and resource managers, so that they
are associated with the right transaction.
"""
import thread
from ZODB.utils import WeakSet, deprecated37
from transaction._transaction import Transaction
# Used for deprecated arguments. ZODB.utils.DEPRECATED_ARGUMENT was
# too hard to use here, due to the convoluted import dance across
# __init__.py files.
_marker = object()
# We have to remember sets of synch objects, especially Connections.
# But we don't want mere registration with a transaction manager to
# keep a synch object alive forever; in particular, it's common
# practice not to explicitly close Connection objects, and keeping
# a Connection alive keeps a potentially huge number of other objects
# alive (e.g., the cache, and everything reachable from it too).
# Therefore we use "weak sets" internally.
#
# Call the ISynchronizer newTransaction() method on every element of
# WeakSet synchs.
# A transaction manager needs to do this whenever begin() is called.
# Since it would be good if tm.get() returned the new transaction while
# newTransaction() is running, calling this has to be delayed until after
# the transaction manager has done whatever it needs to do to make its
# get() return the new txn.
def _new_transaction(txn, synchs):
if synchs:
synchs.map(lambda s: s.newTransaction(txn))
# Important: we must always pass a WeakSet (even if empty) to the Transaction
# constructor: synchronizers are registered with the TM, but the
# ISynchronizer xyzCompletion() methods are called by Transactions without
# consulting the TM, so we need to pass a mutable collection of synchronizers
# so that Transactions "see" synchronizers that get registered after the
# Transaction object is constructed.
class TransactionManager(object):
def __init__(self):
self._txn = None
self._synchs = WeakSet()
def begin(self):
if self._txn is not None:
self._txn.abort()
txn = self._txn = Transaction(self._synchs, self)
_new_transaction(txn, self._synchs)
return txn
def get(self):
if self._txn is None:
self._txn = Transaction(self._synchs, self)
return self._txn
def free(self, txn):
assert txn is self._txn
self._txn = None
def registerSynch(self, synch):
self._synchs.add(synch)
def unregisterSynch(self, synch):
self._synchs.remove(synch)
def isDoomed(self):
return self.get().isDoomed()
def doom(self):
return self.get().doom()
def commit(self):
return self.get().commit()
def abort(self):
return self.get().abort()
def savepoint(self, optimistic=False):
return self.get().savepoint(optimistic)
class ThreadTransactionManager(TransactionManager):
"""Thread-aware transaction manager.
Each thread is associated with a unique transaction.
"""
def __init__(self):
# _threads maps thread ids to transactions
self._txns = {}
# _synchs maps a thread id to a WeakSet of registered synchronizers.
# The WeakSet is passed to the Transaction constructor, because the
# latter needs to call the synchronizers when it commits.
self._synchs = {}
def begin(self):
tid = thread.get_ident()
txn = self._txns.get(tid)
if txn is not None:
txn.abort()
synchs = self._synchs.get(tid)
if synchs is None:
synchs = self._synchs[tid] = WeakSet()
txn = self._txns[tid] = Transaction(synchs, self)
_new_transaction(txn, synchs)
return txn
def get(self):
tid = thread.get_ident()
txn = self._txns.get(tid)
if txn is None:
synchs = self._synchs.get(tid)
if synchs is None:
synchs = self._synchs[tid] = WeakSet()
txn = self._txns[tid] = Transaction(synchs, self)
return txn
def free(self, txn):
tid = thread.get_ident()
assert txn is self._txns.get(tid)
del self._txns[tid]
def registerSynch(self, synch):
tid = thread.get_ident()
ws = self._synchs.get(tid)
if ws is None:
ws = self._synchs[tid] = WeakSet()
ws.add(synch)
def unregisterSynch(self, synch):
tid = thread.get_ident()
ws = self._synchs[tid]
ws.remove(synch)
############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
"""Transaction objects manage resources for an individual activity.
Compatibility issues
--------------------
The implementation of Transaction objects involves two layers of
backwards compatibility, because this version of transaction supports
both ZODB 3 and ZODB 4. Zope is evolving towards the ZODB4
interfaces.
Transaction has two methods for a resource manager to call to
participate in a transaction -- register() and join(). join() takes a
resource manager and adds it to the list of resources. register() is
for backwards compatibility. It takes a persistent object and
registers its _p_jar attribute. TODO: explain adapter
Two-phase commit
----------------
A transaction commit involves an interaction between the transaction
object and one or more resource managers. The transaction manager
calls the following four methods on each resource manager; it calls
tpc_begin() on each resource manager before calling commit() on any of
them.
1. tpc_begin(txn)
2. commit(txn)
3. tpc_vote(txn)
4. tpc_finish(txn)
Before-commit hook
------------------
Sometimes, applications want to execute some code when a transaction is
committed. For example, one might want to delay object indexing until a
transaction commits, rather than indexing every time an object is changed.
Or someone might want to check invariants only after a set of operations. A
pre-commit hook is available for such use cases: use addBeforeCommitHook(),
passing it a callable and arguments. The callable will be called with its
arguments at the start of the commit (but not for substransaction commits).
After-commit hook
------------------
Sometimes, applications want to execute code after a transaction is
committed or aborted. For example, one might want to launch non
transactional code after a successful commit. Or still someone might
want to launch asynchronous code after. A post-commit hook is
available for such use cases: use addAfterCommitHook(), passing it a
callable and arguments. The callable will be called with a Boolean
value representing the status of the commit operation as first
argument (true if successfull or false iff aborted) preceding its
arguments at the start of the commit (but not for substransaction
commits).
Error handling
--------------
When errors occur during two-phase commit, the transaction manager
aborts all the resource managers. The specific methods it calls
depend on whether the error occurs before or after the call to
tpc_vote() on that transaction manager.
If the resource manager has not voted, then the resource manager will
have one or more uncommitted objects. There are two cases that lead
to this state; either the transaction manager has not called commit()
for any objects on this resource manager or the call that failed was a
commit() for one of the objects of this resource manager. For each
uncommitted object, including the object that failed in its commit(),
call abort().
Once uncommitted objects are aborted, tpc_abort() or abort_sub() is
called on each resource manager.
Synchronization
---------------
You can register sychronization objects (synchronizers) with the
tranasction manager. The synchronizer must implement
beforeCompletion() and afterCompletion() methods. The transaction
manager calls beforeCompletion() when it starts a top-level two-phase
commit. It calls afterCompletion() when a top-level transaction is
committed or aborted. The methods are passed the current Transaction
as their only argument.
"""
import logging
import sys
import thread
import warnings
import weakref
import traceback
from cStringIO import StringIO
from zope import interface
from ZODB.utils import WeakSet
from ZODB.utils import deprecated37, deprecated38
from ZODB.POSException import TransactionFailedError
from ZODB.utils import oid_repr
from transaction import interfaces
_marker = object()
# The point of this is to avoid hiding exceptions (which the builtin
# hasattr() does).
def myhasattr(obj, attr):
return getattr(obj, attr, _marker) is not _marker
class Status:
# ACTIVE is the initial state.
ACTIVE = "Active"
COMMITTING = "Committing"
COMMITTED = "Committed"
DOOMED = "Doomed"
# commit() or commit(True) raised an exception. All further attempts
# to commit or join this transaction will raise TransactionFailedError.
COMMITFAILED = "Commit failed"
class Transaction(object):
interface.implements(interfaces.ITransaction,
interfaces.ITransactionDeprecated)
# Assign an index to each savepoint so we can invalidate later savepoints
# on rollback. The first index assigned is 1, and it goes up by 1 each
# time.
_savepoint_index = 0
# If savepoints are used, keep a weak key dict of them. This maps a
# savepoint to its index (see above).
_savepoint2index = None
# Meta data. ._extension is also metadata, but is initialized to an
# emtpy dict in __init__.
user = ""
description = ""
def __init__(self, synchronizers=None, manager=None):
self.status = Status.ACTIVE
# List of resource managers, e.g. MultiObjectResourceAdapters.
self._resources = []
# Weak set of synchronizer objects to call.
if synchronizers is None:
synchronizers = WeakSet()
self._synchronizers = synchronizers
self._manager = manager
# _adapters: Connection/_p_jar -> MultiObjectResourceAdapter[Sub]
self._adapters = {}
self._voted = {} # id(Connection) -> boolean, True if voted
# _voted and other dictionaries use the id() of the resource
# manager as a key, because we can't guess whether the actual
# resource managers will be safe to use as dict keys.
# The user, description, and _extension attributes are accessed
# directly by storages, leading underscore notwithstanding.
self._extension = {}
self.log = logging.getLogger("txn.%d" % thread.get_ident())
self.log.debug("new transaction")
# If a commit fails, the traceback is saved in _failure_traceback.
# If another attempt is made to commit, TransactionFailedError is
# raised, incorporating this traceback.
self._failure_traceback = None
# List of (hook, args, kws) tuples added by addBeforeCommitHook().
self._before_commit = []
# List of (hook, args, kws) tuples added by addAfterCommitHook().
self._after_commit = []
def isDoomed(self):
return self.status is Status.DOOMED
def doom(self):
if self.status is not Status.DOOMED:
if self.status is not Status.ACTIVE:
# should not doom transactions in the middle,
# or after, a commit
raise AssertionError()
self.status = Status.DOOMED
# Raise TransactionFailedError, due to commit()/join()/register()
# getting called when the current transaction has already suffered
# a commit/savepoint failure.
def _prior_operation_failed(self):
assert self._failure_traceback is not None
raise TransactionFailedError("An operation previously failed, "
"with traceback:\n\n%s" %
self._failure_traceback.getvalue())
def join(self, resource):
if self.status is Status.COMMITFAILED:
self._prior_operation_failed() # doesn't return
if (self.status is not Status.ACTIVE and
self.status is not Status.DOOMED):
# TODO: Should it be possible to join a committing transaction?
# I think some users want it.
raise ValueError("expected txn status %r or %r, but it's %r" % (
Status.ACTIVE, Status.DOOMED, self.status))
# TODO: the prepare check is a bit of a hack, perhaps it would
# be better to use interfaces. If this is a ZODB4-style
# resource manager, it needs to be adapted, too.
if myhasattr(resource, "prepare"):
# TODO: deprecate 3.6
resource = DataManagerAdapter(resource)
self._resources.append(resource)
if self._savepoint2index:
# A data manager has joined a transaction *after* a savepoint
# was created. A couple of things are different in this case:
#
# 1. We need to add its savepoint to all previous savepoints.
# so that if they are rolled back, we roll this one back too.
#
# 2. We don't actually need to ask the data manager for a
# savepoint: because it's just joining, we can just abort it to
# roll back to the current state, so we simply use an
# AbortSavepoint.
datamanager_savepoint = AbortSavepoint(resource, self)
for transaction_savepoint in self._savepoint2index.keys():
transaction_savepoint._savepoints.append(
datamanager_savepoint)
def savepoint(self, optimistic=False):
if self.status is Status.COMMITFAILED:
self._prior_operation_failed() # doesn't return, it raises
try:
savepoint = Savepoint(self, optimistic, *self._resources)
except:
self._cleanup(self._resources)
self._saveAndRaiseCommitishError() # reraises!
if self._savepoint2index is None:
self._savepoint2index = weakref.WeakKeyDictionary()
self._savepoint_index += 1
self._savepoint2index[savepoint] = self._savepoint_index
return savepoint
# Remove and invalidate all savepoints we know about with an index
# larger than `savepoint`'s. This is what's needed when a rollback
# _to_ `savepoint` is done.
def _remove_and_invalidate_after(self, savepoint):
savepoint2index = self._savepoint2index
index = savepoint2index[savepoint]
# use items() to make copy to avoid mutating while iterating
for savepoint, i in savepoint2index.items():
if i > index:
savepoint.transaction = None # invalidate
del savepoint2index[savepoint]
# Invalidate and forget about all savepoints.
def _invalidate_all_savepoints(self):
for savepoint in self._savepoint2index.keys():
savepoint.transaction = None # invalidate
self._savepoint2index.clear()
def register(self, obj):
# The old way of registering transaction participants.
#
# register() is passed either a persisent object or a
# resource manager like the ones defined in ZODB.DB.
# If it is passed a persistent object, that object should
# be stored when the transaction commits. For other
# objects, the object implements the standard two-phase
# commit protocol.
manager = getattr(obj, "_p_jar", obj)
if manager is None:
raise ValueError("Register with no manager")
adapter = self._adapters.get(manager)
if adapter is None:
adapter = MultiObjectResourceAdapter(manager)
adapter.objects.append(obj)
self._adapters[manager] = adapter
self.join(adapter)
else:
# TODO: comment out this expensive assert later
# Use id() to guard against proxies.
assert id(obj) not in map(id, adapter.objects)
adapter.objects.append(obj)
def commit(self):
if self.status is Status.DOOMED:
raise interfaces.DoomedTransaction()
if self._savepoint2index:
self._invalidate_all_savepoints()
if self.status is Status.COMMITFAILED:
self._prior_operation_failed() # doesn't return
self._callBeforeCommitHooks()
self._synchronizers.map(lambda s: s.beforeCompletion(self))
self.status = Status.COMMITTING
try:
self._commitResources()
self.status = Status.COMMITTED
except:
t, v, tb = self._saveAndGetCommitishError()
self._callAfterCommitHooks(status=False)
raise t, v, tb
else:
if self._manager:
self._manager.free(self)
self._synchronizers.map(lambda s: s.afterCompletion(self))
self._callAfterCommitHooks(status=True)
self.log.debug("commit")
def _saveAndGetCommitishError(self):
self.status = Status.COMMITFAILED
# Save the traceback for TransactionFailedError.
ft = self._failure_traceback = StringIO()
t, v, tb = sys.exc_info()
# Record how we got into commit().
traceback.print_stack(sys._getframe(1), None, ft)
# Append the stack entries from here down to the exception.
traceback.print_tb(tb, None, ft)
# Append the exception type and value.
ft.writelines(traceback.format_exception_only(t, v))
return t, v, tb
def _saveAndRaiseCommitishError(self):
t, v, tb = self._saveAndGetCommitishError()
raise t, v, tb
def getBeforeCommitHooks(self):
return iter(self._before_commit)
def addBeforeCommitHook(self, hook, args=(), kws=None):
if kws is None:
kws = {}
self._before_commit.append((hook, tuple(args), kws))
def beforeCommitHook(self, hook, *args, **kws):
deprecated38("Use addBeforeCommitHook instead of beforeCommitHook.")
self.addBeforeCommitHook(hook, args, kws)
def _callBeforeCommitHooks(self):
# Call all hooks registered, allowing further registrations
# during processing. Note that calls to addBeforeCommitHook() may
# add additional hooks while hooks are running, and iterating over a
# growing list is well-defined in Python.
for hook, args, kws in self._before_commit:
hook(*args, **kws)
self._before_commit = []
def getAfterCommitHooks(self):
return iter(self._after_commit)
def addAfterCommitHook(self, hook, args=(), kws=None):
if kws is None:
kws = {}
self._after_commit.append((hook, tuple(args), kws))
def _callAfterCommitHooks(self, status=True):
# Avoid to abort anything at the end if no hooks are registred.
if not self._after_commit:
return
# Call all hooks registered, allowing further registrations
# during processing. Note that calls to addAterCommitHook() may
# add additional hooks while hooks are running, and iterating over a
# growing list is well-defined in Python.
for hook, args, kws in self._after_commit:
# The first argument passed to the hook is a Boolean value,
# true if the commit succeeded, or false if the commit aborted.
try:
hook(status, *args, **kws)
except:
# We need to catch the exceptions if we want all hooks
# to be called
self.log.error("Error in after commit hook exec in %s ",
hook, exc_info=sys.exc_info())
# The transaction is already committed. It must not have
# further effects after the commit.
for rm in self._resources:
try:
rm.abort(self)
except:
# XXX should we take further actions here ?
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
self._after_commit = []
self._before_commit = []
def _commitResources(self):
# Execute the two-phase commit protocol.
L = list(self._resources)
L.sort(rm_cmp)
try:
for rm in L:
rm.tpc_begin(self)
for rm in L:
rm.commit(self)
self.log.debug("commit %r" % rm)
for rm in L:
rm.tpc_vote(self)
self._voted[id(rm)] = True
try:
for rm in L:
rm.tpc_finish(self)
except:
# TODO: do we need to make this warning stronger?
# TODO: It would be nice if the system could be configured
# to stop committing transactions at this point.
self.log.critical("A storage error occurred during the second "
"phase of the two-phase commit. Resources "
"may be in an inconsistent state.")
raise
except:
# If an error occurs committing a transaction, we try
# to revert the changes in each of the resource managers.
t, v, tb = sys.exc_info()
try:
self._cleanup(L)
finally:
self._synchronizers.map(lambda s: s.afterCompletion(self))
raise t, v, tb
def _cleanup(self, L):
# Called when an exception occurs during tpc_vote or tpc_finish.
for rm in L:
if id(rm) not in self._voted:
try:
rm.abort(self)
except Exception:
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
for rm in L:
try:
rm.tpc_abort(self)
except Exception:
self.log.error("Error in tpc_abort() on manager %s",
rm, exc_info=sys.exc_info())
def abort(self):
if self._savepoint2index:
self._invalidate_all_savepoints()
self._synchronizers.map(lambda s: s.beforeCompletion(self))
tb = None
for rm in self._resources:
try:
rm.abort(self)
except:
if tb is None:
t, v, tb = sys.exc_info()
self.log.error("Failed to abort resource manager: %s",
rm, exc_info=sys.exc_info())
if self._manager:
self._manager.free(self)
self._synchronizers.map(lambda s: s.afterCompletion(self))
self.log.debug("abort")
if tb is not None:
raise t, v, tb
def note(self, text):
text = text.strip()
if self.description:
self.description += "\n\n" + text
else:
self.description = text
def setUser(self, user_name, path="/"):
self.user = "%s %s" % (path, user_name)
def setExtendedInfo(self, name, value):
self._extension[name] = value
# TODO: We need a better name for the adapters.
class MultiObjectResourceAdapter(object):
"""Adapt the old-style register() call to the new-style join().
With join(), a resource mananger like a Connection registers with
the transaction manager. With register(), an individual object
is passed to register().
"""
def __init__(self, jar):
self.manager = jar
self.objects = []
self.ncommitted = 0
def __repr__(self):
return "<%s for %s at %s>" % (self.__class__.__name__,
self.manager, id(self))
def sortKey(self):
return self.manager.sortKey()
def tpc_begin(self, txn):
self.manager.tpc_begin(txn)
def tpc_finish(self, txn):
self.manager.tpc_finish(txn)
def tpc_abort(self, txn):
self.manager.tpc_abort(txn)
def commit(self, txn):
for o in self.objects:
self.manager.commit(o, txn)
self.ncommitted += 1
def tpc_vote(self, txn):
self.manager.tpc_vote(txn)
def abort(self, txn):
tb = None
for o in self.objects:
try:
self.manager.abort(o, txn)
except:
# Capture the first exception and re-raise it after
# aborting all the other objects.
if tb is None:
t, v, tb = sys.exc_info()
txn.log.error("Failed to abort object: %s",
object_hint(o), exc_info=sys.exc_info())
if tb is not None:
raise t, v, tb
def rm_cmp(rm1, rm2):
return cmp(rm1.sortKey(), rm2.sortKey())
def object_hint(o):
"""Return a string describing the object.
This function does not raise an exception.
"""
# We should always be able to get __class__.
klass = o.__class__.__name__
# oid would be great, but may this isn't a persistent object.
oid = getattr(o, "_p_oid", _marker)
if oid is not _marker:
oid = oid_repr(oid)
return "%s oid=%s" % (klass, oid)
# TODO: deprecate for 3.6.
class DataManagerAdapter(object):
"""Adapt zodb 4-style data managers to zodb3 style
Adapt transaction.interfaces.IDataManager to
ZODB.interfaces.IPureDatamanager
"""
# Note that it is pretty important that this does not have a _p_jar
# attribute. This object will be registered with a zodb3 TM, which
# will then try to get a _p_jar from it, using it as the default.
# (Objects without a _p_jar are their own data managers.)
def __init__(self, datamanager):
self._datamanager = datamanager
# TODO: I'm not sure why commit() doesn't do anything
def commit(self, transaction):
# We don't do anything here because ZODB4-style data managers
# didn't have a separate commit step
pass
def abort(self, transaction):
self._datamanager.abort(transaction)
def tpc_begin(self, transaction):
# We don't do anything here because ZODB4-style data managers
# didn't have a separate tpc_begin step
pass
def tpc_abort(self, transaction):
self._datamanager.abort(transaction)
def tpc_finish(self, transaction):
self._datamanager.commit(transaction)
def tpc_vote(self, transaction):
self._datamanager.prepare(transaction)
def sortKey(self):
return self._datamanager.sortKey()
class Savepoint:
"""Transaction savepoint.
Transaction savepoints coordinate savepoints for data managers
participating in a transaction.
"""
interface.implements(interfaces.ISavepoint)
valid = property(lambda self: self.transaction is not None)
def __init__(self, transaction, optimistic, *resources):
self.transaction = transaction
self._savepoints = savepoints = []
for datamanager in resources:
try:
savepoint = datamanager.savepoint
except AttributeError:
if not optimistic:
raise TypeError("Savepoints unsupported", datamanager)
savepoint = NoRollbackSavepoint(datamanager)
else:
savepoint = savepoint()
savepoints.append(savepoint)
def rollback(self):
transaction = self.transaction
if transaction is None:
raise interfaces.InvalidSavepointRollbackError
transaction._remove_and_invalidate_after(self)
try:
for savepoint in self._savepoints:
savepoint.rollback()
except:
# Mark the transaction as failed.
transaction._saveAndRaiseCommitishError() # reraises!
class AbortSavepoint:
def __init__(self, datamanager, transaction):
self.datamanager = datamanager
self.transaction = transaction
def rollback(self):
self.datamanager.abort(self.transaction)
class NoRollbackSavepoint:
def __init__(self, datamanager):
self.datamanager = datamanager
def rollback(self):
raise TypeError("Savepoints unsupported", self.datamanager)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Transaction Interfaces
$Id$
"""
import zope.interface
class ITransactionManager(zope.interface.Interface):
"""An object that manages a sequence of transactions.
Applications use transaction managers to establish transaction boundaries.
"""
def begin():
"""Begin a new transaction.
If an existing transaction is in progress, it will be aborted.
The newTransaction() method of registered synchronizers is called,
passing the new transaction object.
"""
def get():
"""Get the current transaction.
"""
def commit():
"""Commit the current transaction.
"""
def abort():
"""Abort the current transaction.
"""
def doom():
"""Doom the current transaction.
"""
def isDoomed():
"""Returns True if the current transaction is doomed, otherwise False.
"""
def savepoint(optimistic=False):
"""Create a savepoint from the current transaction.
If the optimistic argument is true, then data managers that
don't support savepoints can be used, but an error will be
raised if the savepoint is rolled back.
An ISavepoint object is returned.
"""
def registerSynch(synch):
"""Register an ISynchronizer.
Synchronizers are notified about some major events in a transaction's
life. See ISynchronizer for details.
"""
def unregisterSynch(synch):
"""Unregister an ISynchronizer.
Synchronizers are notified about some major events in a transaction's
life. See ISynchronizer for details.
"""
class ITransaction(zope.interface.Interface):
"""Object representing a running transaction.
Objects with this interface may represent different transactions
during their lifetime (.begin() can be called to start a new
transaction using the same instance, although that example is
deprecated and will go away in ZODB 3.6).
"""
user = zope.interface.Attribute(
"""A user name associated with the transaction.
The format of the user name is defined by the application. The value
is of Python type str. Storages record the user value, as meta-data,
when a transaction commits.
A storage may impose a limit on the size of the value; behavior is
undefined if such a limit is exceeded (for example, a storage may
raise an exception, or truncate the value).
""")
description = zope.interface.Attribute(
"""A textual description of the transaction.
The value is of Python type str. Method note() is the intended
way to set the value. Storages record the description, as meta-data,
when a transaction commits.
A storage may impose a limit on the size of the description; behavior
is undefined if such a limit is exceeded (for example, a storage may
raise an exception, or truncate the value).
""")
def commit():
"""Finalize the transaction.
This executes the two-phase commit algorithm for all
IDataManager objects associated with the transaction.
"""
def abort():
"""Abort the transaction.
This is called from the application. This can only be called
before the two-phase commit protocol has been started.
"""
def doom():
"""Doom the transaction.
Dooms the current transaction. This will cause
DoomedTransactionException to be raised on any attempt to commit the
transaction.
Otherwise the transaction will behave as if it was active.
"""
def savepoint(optimistic=False):
"""Create a savepoint.
If the optimistic argument is true, then data managers that don't
support savepoints can be used, but an error will be raised if the
savepoint is rolled back.
An ISavepoint object is returned.
"""
def join(datamanager):
"""Add a data manager to the transaction.
`datamanager` must provide the transactions.interfaces.IDataManager
interface.
"""
def note(text):
"""Add text to the transaction description.
This modifies the `.description` attribute; see its docs for more
detail. First surrounding whitespace is stripped from `text`. If
`.description` is currently an empty string, then the stripped text
becomes its value, else two newlines and the stripped text are
appended to `.description`.
"""
def setUser(user_name, path="/"):
"""Set the user name.
path should be provided if needed to further qualify the
identified user. This is a convenience method used by Zope.
It sets the .user attribute to str(path) + " " + str(user_name).
This sets the `.user` attribute; see its docs for more detail.
"""
def setExtendedInfo(name, value):
"""Add extension data to the transaction.
name is the name of the extension property to set, of Python type
str; value must be picklable. Multiple calls may be made to set
multiple extension properties, provided the names are distinct.
Storages record the extension data, as meta-data, when a transaction
commits.
A storage may impose a limit on the size of extension data; behavior
is undefined if such a limit is exceeded (for example, a storage may
raise an exception, or remove `<name, value>` pairs).
"""
# deprecated38
def beforeCommitHook(__hook, *args, **kws):
"""Register a hook to call before the transaction is committed.
THIS IS DEPRECATED IN ZODB 3.6. Use addBeforeCommitHook() instead.
The specified hook function will be called after the transaction's
commit method has been called, but before the commit process has been
started. The hook will be passed the specified positional and keyword
arguments.
Multiple hooks can be registered and will be called in the order they
were registered (first registered, first called). This method can
also be called from a hook: an executing hook can register more
hooks. Applications should take care to avoid creating infinite loops
by recursively registering hooks.
Hooks are called only for a top-level commit. A savepoint
does not call any hooks. If the transaction is aborted, hooks
are not called, and are discarded. Calling a hook "consumes" its
registration too: hook registrations do not persist across
transactions. If it's desired to call the same hook on every
transaction commit, then beforeCommitHook() must be called with that
hook during every transaction; in such a case consider registering a
synchronizer object via a TransactionManager's registerSynch() method
instead.
"""
def addBeforeCommitHook(hook, args=(), kws=None):
"""Register a hook to call before the transaction is committed.
The specified hook function will be called after the transaction's
commit method has been called, but before the commit process has been
started. The hook will be passed the specified positional (`args`)
and keyword (`kws`) arguments. `args` is a sequence of positional
arguments to be passed, defaulting to an empty tuple (no positional
arguments are passed). `kws` is a dictionary of keyword argument
names and values to be passed, or the default None (no keyword
arguments are passed).
Multiple hooks can be registered and will be called in the order they
were registered (first registered, first called). This method can
also be called from a hook: an executing hook can register more
hooks. Applications should take care to avoid creating infinite loops
by recursively registering hooks.
Hooks are called only for a top-level commit. A
savepoint creation does not call any hooks. If the
transaction is aborted, hooks are not called, and are discarded.
Calling a hook "consumes" its registration too: hook registrations
do not persist across transactions. If it's desired to call the same
hook on every transaction commit, then addBeforeCommitHook() must be
called with that hook during every transaction; in such a case
consider registering a synchronizer object via a TransactionManager's
registerSynch() method instead.
"""
def getBeforeCommitHooks():
"""Return iterable producing the registered addBeforeCommit hooks.
A triple (hook, args, kws) is produced for each registered hook.
The hooks are produced in the order in which they would be invoked
by a top-level transaction commit.
"""
def addAfterCommitHook(hook, args=(), kws=None):
"""Register a hook to call after a transaction commit attempt.
The specified hook function will be called after the transaction
commit succeeds or aborts. The first argument passed to the hook
is a Boolean value, true if the commit succeeded, or false if the
commit aborted. `args` specifies additional positional, and `kws`
keyword, arguments to pass to the hook. `args` is a sequence of
positional arguments to be passed, defaulting to an empty tuple
(only the true/false success argument is passed). `kws` is a
dictionary of keyword argument names and values to be passed, or
the default None (no keyword arguments are passed).
Multiple hooks can be registered and will be called in the order they
were registered (first registered, first called). This method can
also be called from a hook: an executing hook can register more
hooks. Applications should take care to avoid creating infinite loops
by recursively registering hooks.
Hooks are called only for a top-level commit. A
savepoint creation does not call any hooks. Calling a
hook "consumes" its registration: hook registrations do not
persist across transactions. If it's desired to call the same
hook on every transaction commit, then addAfterCommitHook() must be
called with that hook during every transaction; in such a case
consider registering a synchronizer object via a TransactionManager's
registerSynch() method instead.
"""
def getAfterCommitHooks():
"""Return iterable producing the registered addAfterCommit hooks.
A triple (hook, args, kws) is produced for each registered hook.
The hooks are produced in the order in which they would be invoked
by a top-level transaction commit.
"""
class ITransactionDeprecated(zope.interface.Interface):
"""Deprecated parts of the transaction API."""
def begin(info=None):
"""Begin a new transaction.
If the transaction is in progress, it is aborted and a new
transaction is started using the same transaction object.
"""
# TODO: deprecate this for 3.6.
def register(object):
"""Register the given object for transaction control."""
class IDataManager(zope.interface.Interface):
"""Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage
non-object storages, such as relational databases. For example,
a ZODB.Connection.
Note that when some data is modified, that data's data manager should
join a transaction so that data can be committed when the user commits
the transaction.
"""
transaction_manager = zope.interface.Attribute(
"""The transaction manager (TM) used by this data manager.
This is a public attribute, intended for read-only use. The value
is an instance of ITransactionManager, typically set by the data
manager's constructor.
""")
def abort(transaction):
"""Abort a transaction and forget all changes.
Abort must be called outside of a two-phase commit.
Abort is called by the transaction manager to abort transactions
that are not yet in a two-phase commit.
"""
# Two-phase commit protocol. These methods are called by the ITransaction
# object associated with the transaction being committed. The sequence
# of calls normally follows this regular expression:
# tpc_begin commit tpc_vote (tpc_finish | tpc_abort)
def tpc_begin(transaction):
"""Begin commit of a transaction, starting the two-phase commit.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def commit(transaction):
"""Commit modifications to registered objects.
Save changes to be made persistent if the transaction commits (if
tpc_finish is called later). If tpc_abort is called later, changes
must not persist.
This includes conflict detection and handling. If no conflicts or
errors occur, the data manager should be prepared to make the
changes persist when tpc_finish is called.
"""
def tpc_vote(transaction):
"""Verify that a data manager can commit the transaction.
This is the last chance for a data manager to vote 'no'. A
data manager votes 'no' by raising an exception.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_finish(transaction):
"""Indicate confirmation that the transaction is done.
Make all changes to objects modified by this transaction persist.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail. If this raises an exception, the
database is not expected to maintain consistency; it's a
serious error.
"""
def tpc_abort(transaction):
"""Abort a transaction.
This is called by a transaction manager to end a two-phase commit on
the data manager. Abandon all changes to objects modified by this
transaction.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail.
"""
def sortKey():
"""Return a key to use for ordering registered DataManagers.
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
# Alternate version:
#"""Return a consistent sort key for this connection.
#
#This allows ordering multiple connections that use the same storage in
#a consistent manner. This is unique for the lifetime of a connection,
#which is good enough to avoid ZEO deadlocks.
#"""
class ISavepointDataManager(IDataManager):
def savepoint():
"""Return a data-manager savepoint (IDataManagerSavepoint).
"""
class IDataManagerSavepoint(zope.interface.Interface):
"""Savepoint for data-manager changes for use in transaction savepoints.
Datamanager savepoints are used by, and only by, transaction savepoints.
Note that data manager savepoints don't have any notion of, or
responsibility for, validity. It isn't the responsibility of
data-manager savepoints to prevent multiple rollbacks or rollbacks after
transaction termination. Preventing invalid savepoint rollback is the
responsibility of transaction rollbacks. Application code should never
use data-manager savepoints.
"""
def rollback():
"""Rollback any work done since the savepoint.
"""
class ISavepoint(zope.interface.Interface):
"""A transaction savepoint.
"""
def rollback():
"""Rollback any work done since the savepoint.
InvalidSavepointRollbackError is raised if the savepoint isn't valid.
"""
valid = zope.interface.Attribute(
"Boolean indicating whether the savepoint is valid")
class InvalidSavepointRollbackError(Exception):
"""Attempt to rollback an invalid savepoint.
A savepoint may be invalid because:
- The surrounding transaction has committed or aborted.
- An earlier savepoint in the same transaction has been rolled back.
"""
class ISynchronizer(zope.interface.Interface):
"""Objects that participate in the transaction-boundary notification API.
"""
def beforeCompletion(transaction):
"""Hook that is called by the transaction at the start of a commit.
"""
def afterCompletion(transaction):
"""Hook that is called by the transaction after completing a commit.
"""
def newTransaction(transaction):
"""Hook that is called at the start of a transaction.
This hook is called when, and only when, a transaction manager's
begin() method is called explictly.
"""
class DoomedTransaction(Exception):
"""A commit was attempted on a transaction that was doomed."""
Savepoints
==========
Savepoints provide a way to save to disk intermediate work done during
a transaction allowing:
- partial transaction (subtransaction) rollback (abort)
- state of saved objects to be freed, freeing on-line memory for other
uses
Savepoints make it possible to write atomic subroutines that don't
make top-level transaction commitments.
Applications
------------
To demonstrate how savepoints work with transactions, we've provided a sample
data manager implementation that provides savepoint support. The primary
purpose of this data manager is to provide code that can be read to understand
how savepoints work. The secondary purpose is to provide support for
demonstrating the correct operation of savepoint support within the
transaction system. This data manager is very simple. It provides flat
storage of named immutable values, like strings and numbers.
>>> import transaction.tests.savepointsample
>>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> dm['name'] = 'bob'
As with other data managers, we can commit changes:
>>> transaction.commit()
>>> dm['name']
'bob'
and abort changes:
>>> dm['name'] = 'sally'
>>> dm['name']
'sally'
>>> transaction.abort()
>>> dm['name']
'bob'
Now, let's look at an application that manages funds for people. It allows
deposits and debits to be entered for multiple people. It accepts a sequence
of entries and generates a sequence of status messages. For each entry, it
applies the change and then validates the user's account. If the user's
account is invalid, we roll back the change for that entry. The success or
failure of an entry is indicated in the output status. First we'll initialize
some accounts:
>>> dm['bob-balance'] = 0.0
>>> dm['bob-credit'] = 0.0
>>> dm['sally-balance'] = 0.0
>>> dm['sally-credit'] = 100.0
>>> transaction.commit()
Now, we'll define a validation function to validate an account:
>>> def validate_account(name):
... if dm[name+'-balance'] + dm[name+'-credit'] < 0:
... raise ValueError('Overdrawn', name)
And a function to apply entries. If the function fails in some unexpected
way, it rolls back all of its changes and prints the error:
>>> def apply_entries(entries):
... savepoint = transaction.savepoint()
... try:
... for name, amount in entries:
... entry_savepoint = transaction.savepoint()
... try:
... dm[name+'-balance'] += amount
... validate_account(name)
... except ValueError, error:
... entry_savepoint.rollback()
... print 'Error', str(error)
... else:
... print 'Updated', name
... except Exception, error:
... savepoint.rollback()
... print 'Unexpected exception', error
Now let's try applying some entries:
>>> apply_entries([
... ('bob', 10.0),
... ('sally', 10.0),
... ('bob', 20.0),
... ('sally', 10.0),
... ('bob', -100.0),
... ('sally', -100.0),
... ])
Updated bob
Updated sally
Updated bob
Updated sally
Error ('Overdrawn', 'bob')
Updated sally
>>> dm['bob-balance']
30.0
>>> dm['sally-balance']
-80.0
If we provide entries that cause an unexpected error:
>>> apply_entries([
... ('bob', 10.0),
... ('sally', 10.0),
... ('bob', '20.0'),
... ('sally', 10.0),
... ])
Updated bob
Updated sally
Unexpected exception unsupported operand type(s) for +=: 'float' and 'str'
Because the apply_entries used a savepoint for the entire function, it was
able to rollback the partial changes without rolling back changes made in the
previous call to ``apply_entries``:
>>> dm['bob-balance']
30.0
>>> dm['sally-balance']
-80.0
If we now abort the outer transactions, the earlier changes will go
away:
>>> transaction.abort()
>>> dm['bob-balance']
0.0
>>> dm['sally-balance']
0.0
Savepoint invalidation
----------------------
A savepoint can be used any number of times:
>>> dm['bob-balance'] = 100.0
>>> dm['bob-balance']
100.0
>>> savepoint = transaction.savepoint()
>>> dm['bob-balance'] = 200.0
>>> dm['bob-balance']
200.0
>>> savepoint.rollback()
>>> dm['bob-balance']
100.0
>>> savepoint.rollback() # redundant, but should be harmless
>>> dm['bob-balance']
100.0
>>> dm['bob-balance'] = 300.0
>>> dm['bob-balance']
300.0
>>> savepoint.rollback()
>>> dm['bob-balance']
100.0
However, using a savepoint invalidates any savepoints that come after it:
>>> dm['bob-balance'] = 200.0
>>> dm['bob-balance']
200.0
>>> savepoint1 = transaction.savepoint()
>>> dm['bob-balance'] = 300.0
>>> dm['bob-balance']
300.0
>>> savepoint2 = transaction.savepoint()
>>> savepoint.rollback()
>>> dm['bob-balance']
100.0
>>> savepoint2.rollback()
Traceback (most recent call last):
...
InvalidSavepointRollbackError
>>> savepoint1.rollback()
Traceback (most recent call last):
...
InvalidSavepointRollbackError
>>> transaction.abort()
Databases without savepoint support
-----------------------------------
Normally it's an error to use savepoints with databases that don't support
savepoints:
>>> dm_no_sp = transaction.tests.savepointsample.SampleDataManager()
>>> dm_no_sp['name'] = 'bob'
>>> transaction.commit()
>>> dm_no_sp['name'] = 'sally'
>>> savepoint = transaction.savepoint()
Traceback (most recent call last):
...
TypeError: ('Savepoints unsupported', {'name': 'bob'})
>>> transaction.abort()
However, a flag can be passed to the transaction savepoint method to indicate
that databases without savepoint support should be tolerated until a savepoint
is rolled back. This allows transactions to proceed if there are no reasons
to roll back:
>>> dm_no_sp['name'] = 'sally'
>>> savepoint = transaction.savepoint(1)
>>> dm_no_sp['name'] = 'sue'
>>> transaction.commit()
>>> dm_no_sp['name']
'sue'
>>> dm_no_sp['name'] = 'sam'
>>> savepoint = transaction.savepoint(1)
>>> savepoint.rollback()
Traceback (most recent call last):
...
TypeError: ('Savepoints unsupported', {'name': 'sam'})
Failures
--------
If a failure occurs when creating or rolling back a savepoint, the transaction
state will be uncertain and the transaction will become uncommitable. From
that point on, most transaction operations, including commit, will fail until
the transaction is aborted.
In the previous example, we got an error when we tried to rollback the
savepoint. If we try to commit the transaction, the commit will fail:
>>> transaction.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
TransactionFailedError: An operation previously failed, with traceback:
...
TypeError: ('Savepoints unsupported', {'name': 'sam'})
<BLANKLINE>
We have to abort it to make any progress:
>>> transaction.abort()
Similarly, in our earlier example, where we tried to take a savepoint with a
data manager that didn't support savepoints:
>>> dm_no_sp['name'] = 'sally'
>>> dm['name'] = 'sally'
>>> savepoint = transaction.savepoint()
Traceback (most recent call last):
...
TypeError: ('Savepoints unsupported', {'name': 'sue'})
>>> transaction.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
TransactionFailedError: An operation previously failed, with traceback:
...
TypeError: ('Savepoints unsupported', {'name': 'sue'})
<BLANKLINE>
>>> transaction.abort()
After clearing the transaction with an abort, we can get on with new
transactions:
>>> dm_no_sp['name'] = 'sally'
>>> dm['name'] = 'sally'
>>> transaction.commit()
>>> dm_no_sp['name']
'sally'
>>> dm['name']
'sally'
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test cases for objects implementing IDataManager.
This is a combo test between Connection and DB, since the two are
rather incestuous and the DB Interface is not defined that I was
able to find.
To do a full test suite one would probably want to write a dummy
storage that will raise errors as needed for testing.
I started this test suite to reproduce a very simple error (tpc_abort
had an error and wouldn't even run if called). So it is *very*
incomplete, and even the tests that exist do not make sure that
the data actually gets written/not written to the storge.
Obviously this test suite should be expanded.
$Id$
"""
from unittest import TestCase
class IDataManagerTests(TestCase, object):
def setUp(self):
self.datamgr = None # subclass should override
self.obj = None # subclass should define Persistent object
self.txn_factory = None
def get_transaction(self):
return self.txn_factory()
################################
# IDataManager interface tests #
################################
def testCommitObj(self):
tran = self.get_transaction()
self.datamgr.prepare(tran)
self.datamgr.commit(tran)
def testAbortTran(self):
tran = self.get_transaction()
self.datamgr.prepare(tran)
self.datamgr.abort(tran)
Dooming Transactions
====================
A doomed transaction behaves exactly the same way as an active transaction but
raises an error on any attempt to commit it, thus forcing an abort.
Doom is useful in places where abort is unsafe and an exception cannot be
raised. This occurs when the programmer wants the code following the doom to
run but not commit. It is unsafe to abort in these circumstances as a following
get() may implicitly open a new transaction.
Any attempt to commit a doomed transaction will raise a DoomedTransaction
exception.
An example of such a use case can be found in
zope/app/form/browser/editview.py. Here a form validation failure must doom
the transaction as committing the transaction may have side-effects. However,
the form code must continue to calculate a form containing the error messages
to return.
For Zope in general, code running within a request should always doom
transactions rather than aborting them. It is the responsibilty of the
publication to either abort() or commit() the transaction. Application code can
use savepoints and doom() safely.
To see how it works we first need to create a stub data manager:
>>> from transaction.interfaces import IDataManager
>>> from zope.interface import implements
>>> class DataManager:
... implements(IDataManager)
... def __init__(self):
... self.attr_counter = {}
... def __getattr__(self, name):
... def f(transaction):
... self.attr_counter[name] = self.attr_counter.get(name, 0) + 1
... return f
... def total(self):
... count = 0
... for access_count in self.attr_counter.values():
... count += access_count
... return count
... def sortKey(self):
... return 1
Start a new transaction:
>>> import transaction
>>> txn = transaction.begin()
>>> dm = DataManager()
>>> txn.join(dm)
We can ask a transaction if it is doomed to avoid expensive operations. An
example of a use case is an object-relational mapper where a pre-commit hook
sends all outstanding SQL to a relational database for objects changed during
the transaction. This expensive operation is not necessary if the transaction
has been doomed. A non-doomed transaction should return False:
>>> txn.isDoomed()
False
We can doom a transaction by calling .doom() on it:
>>> txn.doom()
>>> txn.isDoomed()
True
We can doom it again if we like:
>>> txn.doom()
The data manager is unchanged at this point:
>>> dm.total()
0
Attempting to commit a doomed transaction any number of times raises a
DoomedTransaction:
>>> txn.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
DoomedTransaction
>>> txn.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
DoomedTransaction
But still leaves the data manager unchanged:
>>> dm.total()
0
But the doomed transaction can be aborted:
>>> txn.abort()
Which aborts the data manager:
>>> dm.total()
1
>>> dm.attr_counter['abort']
1
Dooming the current transaction can also be done directly from the transaction
module. We can also begin a new transaction directly after dooming the old one:
>>> txn = transaction.begin()
>>> transaction.isDoomed()
False
>>> transaction.doom()
>>> transaction.isDoomed()
True
>>> txn = transaction.begin()
After committing a transaction we get an assertion error if we try to doom the
transaction. This could be made more specific, but trying to doom a transaction
after it's been committed is probably a programming error:
>>> txn = transaction.begin()
>>> txn.commit()
>>> txn.doom()
Traceback (most recent call last):
...
AssertionError
A doomed transaction should act the same as an active transaction, so we should
be able to join it:
>>> txn = transaction.begin()
>>> txn.doom()
>>> dm2 = DataManager()
>>> txn.join(dm2)
Clean up:
>>> txn = transaction.begin()
>>> txn.abort()
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Savepoint data manager implementation example.
Sample data manager implementation that illustrates how to implement
savepoints.
See savepoint.txt in the transaction package.
$Id$
"""
import UserDict
from zope import interface
import transaction.interfaces
class SampleDataManager(UserDict.DictMixin):
"""Sample implementation of data manager that doesn't support savepoints
This data manager stores named simple values, like strings and numbers.
"""
interface.implements(transaction.interfaces.IDataManager)
def __init__(self, transaction_manager=None):
if transaction_manager is None:
# Use the thread-local transaction manager if none is provided:
transaction_manager = transaction.manager
self.transaction_manager = transaction_manager
# Our committed and uncommitted data:
self.committed = {}
self.uncommitted = self.committed.copy()
# Our transaction state:
#
# If our uncommitted data is modified, we'll join a transaction
# and keep track of the transaction we joined. Any commit
# related messages we get should be for this same transaction
self.transaction = None
# What phase, if any, of two-phase commit we are in:
self.tpc_phase = None
#######################################################################
# Provide a mapping interface to uncommitted data. We provide
# a basic subset of the interface. DictMixin does the rest.
def __getitem__(self, name):
return self.uncommitted[name]
def __setitem__(self, name, value):
self._join() # join the current transaction, if we haven't already
self.uncommitted[name] = value
def __delitem__(self, name):
self._join() # join the current transaction, if we haven't already
del self.uncommitted[name]
def keys(self):
return self.uncommitted.keys()
#
#######################################################################
#######################################################################
# Transaction methods
def _join(self):
# If this is the first change in the transaction, join the transaction
if self.transaction is None:
self.transaction = self.transaction_manager.get()
self.transaction.join(self)
def _resetTransaction(self):
self.transaction = None
self.tpc_phase = None
def abort(self, transaction):
"""Throw away changes made before the commit process has started
"""
assert ((transaction is self.transaction) or (self.transaction is None)
), "Must not change transactions"
assert self.tpc_phase is None, "Must be called outside of tpc"
self.uncommitted = self.committed.copy()
self._resetTransaction()
def tpc_begin(self, transaction):
"""Enter two-phase commit
"""
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase is None, "Must be called outside of tpc"
self.tpc_phase = 1
def commit(self, transaction):
"""Record data modified during the transaction
"""
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase == 1, "Must be called in first phase of tpc"
# In our simple example, we don't need to do anything.
# A more complex data manager would typically write to some sort
# of log.
def tpc_vote(self, transaction):
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase == 1, "Must be called in first phase of tpc"
# This particular data manager is always ready to vote.
# Real data managers will usually need to take some steps to
# make sure that the finish will succeed
self.tpc_phase = 2
def tpc_finish(self, transaction):
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase == 2, "Must be called in second phase of tpc"
self.committed = self.uncommitted.copy()
self._resetTransaction()
def tpc_abort(self, transaction):
assert transaction is self.transaction, "Must not change transactions"
assert self.tpc_phase is not None, "Must be called inside of tpc"
self.uncommitted = self.committed.copy()
self._resetTransaction()
#
#######################################################################
#######################################################################
# Other data manager methods
def sortKey(self):
# Commit operations on multiple data managers are performed in
# sort key order. This important to avoid deadlock when data
# managers are shared among multiple threads or processes and
# use locks to manage that sharing. We aren't going to bother
# with that here.
return str(id(self))
#
#######################################################################
class SampleSavepointDataManager(SampleDataManager):
"""Sample implementation of a savepoint-supporting data manager
This extends the basic data manager with savepoint support.
"""
interface.implements(transaction.interfaces.ISavepointDataManager)
def savepoint(self):
# When we create the savepoint, we save the existing database state.
return SampleSavepoint(self, self.uncommitted.copy())
def _rollback_savepoint(self, savepoint):
# When we rollback the savepoint, we restore the saved data.
# Caution: without the copy(), further changes to the database
# could reflect in savepoint.data, and then `savepoint` would no
# longer contain the originally saved data, and so `savepoint`
# couldn't restore the original state if a rollback to this
# savepoint was done again. IOW, copy() is necessary.
self.uncommitted = savepoint.data.copy()
class SampleSavepoint:
interface.implements(transaction.interfaces.IDataManagerSavepoint)
def __init__(self, data_manager, data):
self.data_manager = data_manager
self.data = data
def rollback(self):
self.data_manager._rollback_savepoint(self)
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Sample objects for use in tests
$Id$
"""
class DataManager(object):
"""Sample data manager
This class provides a trivial data-manager implementation and doc
strings to illustrate the the protocol and to provide a tool for
writing tests.
Our sample data manager has state that is updated through an inc
method and through transaction operations.
When we create a sample data manager:
>>> dm = DataManager()
It has two bits of state, state:
>>> dm.state
0
and delta:
>>> dm.delta
0
Both of which are initialized to 0. state is meant to model
committed state, while delta represents tentative changes within a
transaction. We change the state by calling inc:
>>> dm.inc()
which updates delta:
>>> dm.delta
1
but state isn't changed until we commit the transaction:
>>> dm.state
0
To commit the changes, we use 2-phase commit. We execute the first
stage by calling prepare. We need to pass a transation. Our
sample data managers don't really use the transactions for much,
so we'll be lazy and use strings for transactions:
>>> t1 = '1'
>>> dm.prepare(t1)
The sample data manager updates the state when we call prepare:
>>> dm.state
1
>>> dm.delta
1
This is mainly so we can detect some affect of calling the methods.
Now if we call commit:
>>> dm.commit(t1)
Our changes are"permanent". The state reflects the changes and the
delta has been reset to 0.
>>> dm.state
1
>>> dm.delta
0
"""
def __init__(self):
self.state = 0
self.sp = 0
self.transaction = None
self.delta = 0
self.prepared = False
def inc(self, n=1):
self.delta += n
def prepare(self, transaction):
"""Prepare to commit data
>>> dm = DataManager()
>>> dm.inc()
>>> t1 = '1'
>>> dm.prepare(t1)
>>> dm.commit(t1)
>>> dm.state
1
>>> dm.inc()
>>> t2 = '2'
>>> dm.prepare(t2)
>>> dm.abort(t2)
>>> dm.state
1
It is en error to call prepare more than once without an intervening
commit or abort:
>>> dm.prepare(t1)
>>> dm.prepare(t1)
Traceback (most recent call last):
...
TypeError: Already prepared
>>> dm.prepare(t2)
Traceback (most recent call last):
...
TypeError: Already prepared
>>> dm.abort(t1)
If there was a preceeding savepoint, the transaction must match:
>>> rollback = dm.savepoint(t1)
>>> dm.prepare(t2)
Traceback (most recent call last):
,,,
TypeError: ('Transaction missmatch', '2', '1')
>>> dm.prepare(t1)
"""
if self.prepared:
raise TypeError('Already prepared')
self._checkTransaction(transaction)
self.prepared = True
self.transaction = transaction
self.state += self.delta
def _checkTransaction(self, transaction):
if (transaction is not self.transaction
and self.transaction is not None):
raise TypeError("Transaction missmatch",
transaction, self.transaction)
def abort(self, transaction):
"""Abort a transaction
The abort method can be called before two-phase commit to
throw away work done in the transaction:
>>> dm = DataManager()
>>> dm.inc()
>>> dm.state, dm.delta
(0, 1)
>>> t1 = '1'
>>> dm.abort(t1)
>>> dm.state, dm.delta
(0, 0)
The abort method also throws away work done in savepoints:
>>> dm.inc()
>>> r = dm.savepoint(t1)
>>> dm.inc()
>>> r = dm.savepoint(t1)
>>> dm.state, dm.delta
(0, 2)
>>> dm.abort(t1)
>>> dm.state, dm.delta
(0, 0)
If savepoints are used, abort must be passed the same
transaction:
>>> dm.inc()
>>> r = dm.savepoint(t1)
>>> t2 = '2'
>>> dm.abort(t2)
Traceback (most recent call last):
...
TypeError: ('Transaction missmatch', '2', '1')
>>> dm.abort(t1)
The abort method is also used to abort a two-phase commit:
>>> dm.inc()
>>> dm.state, dm.delta
(0, 1)
>>> dm.prepare(t1)
>>> dm.state, dm.delta
(1, 1)
>>> dm.abort(t1)
>>> dm.state, dm.delta
(0, 0)
Of course, the transactions passed to prepare and abort must
match:
>>> dm.prepare(t1)
>>> dm.abort(t2)
Traceback (most recent call last):
...
TypeError: ('Transaction missmatch', '2', '1')
>>> dm.abort(t1)
"""
self._checkTransaction(transaction)
if self.transaction is not None:
self.transaction = None
if self.prepared:
self.state -= self.delta
self.prepared = False
self.delta = 0
def commit(self, transaction):
"""Complete two-phase commit
>>> dm = DataManager()
>>> dm.state
0
>>> dm.inc()
We start two-phase commit by calling prepare:
>>> t1 = '1'
>>> dm.prepare(t1)
We complete it by calling commit:
>>> dm.commit(t1)
>>> dm.state
1
It is an error ro call commit without calling prepare first:
>>> dm.inc()
>>> t2 = '2'
>>> dm.commit(t2)
Traceback (most recent call last):
...
TypeError: Not prepared to commit
>>> dm.prepare(t2)
>>> dm.commit(t2)
If course, the transactions given to prepare and commit must
be the same:
>>> dm.inc()
>>> t3 = '3'
>>> dm.prepare(t3)
>>> dm.commit(t2)
Traceback (most recent call last):
...
TypeError: ('Transaction missmatch', '2', '3')
"""
if not self.prepared:
raise TypeError('Not prepared to commit')
self._checkTransaction(transaction)
self.delta = 0
self.transaction = None
self.prepared = False
def savepoint(self, transaction):
"""Provide the ability to rollback transaction state
Savepoints provide a way to:
- Save partial transaction work. For some data managers, this
could allow resources to be used more efficiently.
- Provide the ability to revert state to a point in a
transaction without aborting the entire transaction. In
other words, savepoints support partial aborts.
Savepoints don't use two-phase commit. If there are errors in
setting or rolling back to savepoints, the application should
abort the containing transaction. This is *not* the
responsibility of the data manager.
Savepoints are always associated with a transaction. Any work
done in a savepoint's transaction is tentative until the
transaction is committed using two-phase commit.
>>> dm = DataManager()
>>> dm.inc()
>>> t1 = '1'
>>> r = dm.savepoint(t1)
>>> dm.state, dm.delta
(0, 1)
>>> dm.inc()
>>> dm.state, dm.delta
(0, 2)
>>> r.rollback()
>>> dm.state, dm.delta
(0, 1)
>>> dm.prepare(t1)
>>> dm.commit(t1)
>>> dm.state, dm.delta
(1, 0)
Savepoints must have the same transaction:
>>> r1 = dm.savepoint(t1)
>>> dm.state, dm.delta
(1, 0)
>>> dm.inc()
>>> dm.state, dm.delta
(1, 1)
>>> t2 = '2'
>>> r2 = dm.savepoint(t2)
Traceback (most recent call last):
...
TypeError: ('Transaction missmatch', '2', '1')
>>> r2 = dm.savepoint(t1)
>>> dm.inc()
>>> dm.state, dm.delta
(1, 2)
If we rollback to an earlier savepoint, we discard all work
done later:
>>> r1.rollback()
>>> dm.state, dm.delta
(1, 0)
and we can no longer rollback to the later savepoint:
>>> r2.rollback()
Traceback (most recent call last):
...
TypeError: ('Attempt to roll back to invalid save point', 3, 2)
We can roll back to a savepoint as often as we like:
>>> r1.rollback()
>>> r1.rollback()
>>> r1.rollback()
>>> dm.state, dm.delta
(1, 0)
>>> dm.inc()
>>> dm.inc()
>>> dm.inc()
>>> dm.state, dm.delta
(1, 3)
>>> r1.rollback()
>>> dm.state, dm.delta
(1, 0)
But we can't rollback to a savepoint after it has been
committed:
>>> dm.prepare(t1)
>>> dm.commit(t1)
>>> r1.rollback()
Traceback (most recent call last):
...
TypeError: Attempt to rollback stale rollback
"""
if self.prepared:
raise TypeError("Can't get savepoint during two-phase commit")
self._checkTransaction(transaction)
self.transaction = transaction
self.sp += 1
return Rollback(self)
class Rollback(object):
def __init__(self, dm):
self.dm = dm
self.sp = dm.sp
self.delta = dm.delta
self.transaction = dm.transaction
def rollback(self):
if self.transaction is not self.dm.transaction:
raise TypeError("Attempt to rollback stale rollback")
if self.dm.sp < self.sp:
raise TypeError("Attempt to roll back to invalid save point",
self.sp, self.dm.sp)
self.dm.sp = self.sp
self.dm.delta = self.delta
def test_suite():
from zope.testing.doctest import DocTestSuite
return DocTestSuite()
if __name__ == '__main__':
unittest.main()
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Sample objects for use in tests
$Id$
"""
class ResourceManager(object):
"""Sample resource manager.
This class provides a trivial resource-manager implementation and doc
strings to illustrate the protocol and to provide a tool for writing
tests.
Our sample resource manager has state that is updated through an inc
method and through transaction operations.
When we create a sample resource manager:
>>> rm = ResourceManager()
It has two pieces state, state and delta, both initialized to 0:
>>> rm.state
0
>>> rm.delta
0
state is meant to model committed state, while delta represents
tentative changes within a transaction. We change the state by
calling inc:
>>> rm.inc()
which updates delta:
>>> rm.delta
1
but state isn't changed until we commit the transaction:
>>> rm.state
0
To commit the changes, we use 2-phase commit. We execute the first
stage by calling prepare. We need to pass a transation. Our
sample resource managers don't really use the transactions for much,
so we'll be lazy and use strings for transactions. The sample
resource manager updates the state when we call tpc_vote:
>>> t1 = '1'
>>> rm.tpc_begin(t1)
>>> rm.state, rm.delta
(0, 1)
>>> rm.tpc_vote(t1)
>>> rm.state, rm.delta
(1, 1)
Now if we call tpc_finish:
>>> rm.tpc_finish(t1)
Our changes are "permanent". The state reflects the changes and the
delta has been reset to 0.
>>> rm.state, rm.delta
(1, 0)
"""
def __init__(self):
self.state = 0
self.sp = 0
self.transaction = None
self.delta = 0
self.txn_state = None
def _check_state(self, *ok_states):
if self.txn_state not in ok_states:
raise ValueError("txn in state %r but expected one of %r" %
(self.txn_state, ok_states))
def _checkTransaction(self, transaction):
if (transaction is not self.transaction
and self.transaction is not None):
raise TypeError("Transaction missmatch",
transaction, self.transaction)
def inc(self, n=1):
self.delta += n
def tpc_begin(self, transaction):
"""Prepare to commit data.
>>> rm = ResourceManager()
>>> rm.inc()
>>> t1 = '1'
>>> rm.tpc_begin(t1)
>>> rm.tpc_vote(t1)
>>> rm.tpc_finish(t1)
>>> rm.state
1
>>> rm.inc()
>>> t2 = '2'
>>> rm.tpc_begin(t2)
>>> rm.tpc_vote(t2)
>>> rm.tpc_abort(t2)
>>> rm.state
1
It is an error to call tpc_begin more than once without completing
two-phase commit:
>>> rm.tpc_begin(t1)
>>> rm.tpc_begin(t1)
Traceback (most recent call last):
...
ValueError: txn in state 'tpc_begin' but expected one of (None,)
>>> rm.tpc_abort(t1)
If there was a preceeding savepoint, the transaction must match:
>>> rollback = rm.savepoint(t1)
>>> rm.tpc_begin(t2)
Traceback (most recent call last):
,,,
TypeError: ('Transaction missmatch', '2', '1')
>>> rm.tpc_begin(t1)
"""
self._checkTransaction(transaction)
self._check_state(None)
self.transaction = transaction
self.txn_state = 'tpc_begin'
def tpc_vote(self, transaction):
"""Verify that a data manager can commit the transaction.
This is the last chance for a data manager to vote 'no'. A
data manager votes 'no' by raising an exception.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
self._checkTransaction(transaction)
self._check_state('tpc_begin')
self.state += self.delta
self.txn_state = 'tpc_vote'
def tpc_finish(self, transaction):
"""Complete two-phase commit
>>> rm = ResourceManager()
>>> rm.state
0
>>> rm.inc()
We start two-phase commit by calling prepare:
>>> t1 = '1'
>>> rm.tpc_begin(t1)
>>> rm.tpc_vote(t1)
We complete it by calling tpc_finish:
>>> rm.tpc_finish(t1)
>>> rm.state
1
It is an error ro call tpc_finish without calling tpc_vote:
>>> rm.inc()
>>> t2 = '2'
>>> rm.tpc_begin(t2)
>>> rm.tpc_finish(t2)
Traceback (most recent call last):
...
ValueError: txn in state 'tpc_begin' but expected one of ('tpc_vote',)
>>> rm.tpc_abort(t2) # clean slate
>>> rm.tpc_begin(t2)
>>> rm.tpc_vote(t2)
>>> rm.tpc_finish(t2)
Of course, the transactions given to tpc_begin and tpc_finish must
be the same:
>>> rm.inc()
>>> t3 = '3'
>>> rm.tpc_begin(t3)
>>> rm.tpc_vote(t3)
>>> rm.tpc_finish(t2)
Traceback (most recent call last):
...
TypeError: ('Transaction missmatch', '2', '3')
"""
self._checkTransaction(transaction)
self._check_state('tpc_vote')
self.delta = 0
self.transaction = None
self.prepared = False
self.txn_state = None
def tpc_abort(self, transaction):
"""Abort a transaction
The abort method can be called before two-phase commit to
throw away work done in the transaction:
>>> rm = ResourceManager()
>>> rm.inc()
>>> rm.state, rm.delta
(0, 1)
>>> t1 = '1'
>>> rm.tpc_abort(t1)
>>> rm.state, rm.delta
(0, 0)
The abort method also throws away work done in savepoints:
>>> rm.inc()
>>> r = rm.savepoint(t1)
>>> rm.inc()
>>> r = rm.savepoint(t1)
>>> rm.state, rm.delta
(0, 2)
>>> rm.tpc_abort(t1)
>>> rm.state, rm.delta
(0, 0)
If savepoints are used, abort must be passed the same
transaction:
>>> rm.inc()
>>> r = rm.savepoint(t1)
>>> t2 = '2'
>>> rm.tpc_abort(t2)
Traceback (most recent call last):
...
TypeError: ('Transaction missmatch', '2', '1')
>>> rm.tpc_abort(t1)
The abort method is also used to abort a two-phase commit:
>>> rm.inc()
>>> rm.state, rm.delta
(0, 1)
>>> rm.tpc_begin(t1)
>>> rm.state, rm.delta
(0, 1)
>>> rm.tpc_vote(t1)
>>> rm.state, rm.delta
(1, 1)
>>> rm.tpc_abort(t1)
>>> rm.state, rm.delta
(0, 0)
Of course, the transactions passed to prepare and abort must
match:
>>> rm.tpc_begin(t1)
>>> rm.tpc_abort(t2)
Traceback (most recent call last):
...
TypeError: ('Transaction missmatch', '2', '1')
>>> rm.tpc_abort(t1)
This should never fail.
"""
self._checkTransaction(transaction)
if self.transaction is not None:
self.transaction = None
if self.txn_state == 'tpc_vote':
self.state -= self.delta
self.txn_state = None
self.delta = 0
def savepoint(self, transaction):
"""Provide the ability to rollback transaction state
Savepoints provide a way to:
- Save partial transaction work. For some resource managers, this
could allow resources to be used more efficiently.
- Provide the ability to revert state to a point in a
transaction without aborting the entire transaction. In
other words, savepoints support partial aborts.
Savepoints don't use two-phase commit. If there are errors in
setting or rolling back to savepoints, the application should
abort the containing transaction. This is *not* the
responsibility of the resource manager.
Savepoints are always associated with a transaction. Any work
done in a savepoint's transaction is tentative until the
transaction is committed using two-phase commit.
>>> rm = ResourceManager()
>>> rm.inc()
>>> t1 = '1'
>>> r = rm.savepoint(t1)
>>> rm.state, rm.delta
(0, 1)
>>> rm.inc()
>>> rm.state, rm.delta
(0, 2)
>>> r.rollback()
>>> rm.state, rm.delta
(0, 1)
>>> rm.tpc_begin(t1)
>>> rm.tpc_vote(t1)
>>> rm.tpc_finish(t1)
>>> rm.state, rm.delta
(1, 0)
Savepoints must have the same transaction:
>>> r1 = rm.savepoint(t1)
>>> rm.state, rm.delta
(1, 0)
>>> rm.inc()
>>> rm.state, rm.delta
(1, 1)
>>> t2 = '2'
>>> r2 = rm.savepoint(t2)
Traceback (most recent call last):
...
TypeError: ('Transaction missmatch', '2', '1')
>>> r2 = rm.savepoint(t1)
>>> rm.inc()
>>> rm.state, rm.delta
(1, 2)
If we rollback to an earlier savepoint, we discard all work
done later:
>>> r1.rollback()
>>> rm.state, rm.delta
(1, 0)
and we can no longer rollback to the later savepoint:
>>> r2.rollback()
Traceback (most recent call last):
...
TypeError: ('Attempt to roll back to invalid save point', 3, 2)
We can roll back to a savepoint as often as we like:
>>> r1.rollback()
>>> r1.rollback()
>>> r1.rollback()
>>> rm.state, rm.delta
(1, 0)
>>> rm.inc()
>>> rm.inc()
>>> rm.inc()
>>> rm.state, rm.delta
(1, 3)
>>> r1.rollback()
>>> rm.state, rm.delta
(1, 0)
But we can't rollback to a savepoint after it has been
committed:
>>> rm.tpc_begin(t1)
>>> rm.tpc_vote(t1)
>>> rm.tpc_finish(t1)
>>> r1.rollback()
Traceback (most recent call last):
...
TypeError: Attempt to rollback stale rollback
"""
if self.txn_state is not None:
raise TypeError("Can't get savepoint during two-phase commit")
self._checkTransaction(transaction)
self.transaction = transaction
self.sp += 1
return SavePoint(self)
def discard(self, transaction):
pass
class SavePoint(object):
def __init__(self, rm):
self.rm = rm
self.sp = rm.sp
self.delta = rm.delta
self.transaction = rm.transaction
def rollback(self):
if self.transaction is not self.rm.transaction:
raise TypeError("Attempt to rollback stale rollback")
if self.rm.sp < self.sp:
raise TypeError("Attempt to roll back to invalid save point",
self.sp, self.rm.sp)
self.rm.sp = self.sp
self.rm.delta = self.delta
def discard(self):
pass
def test_suite():
from doctest import DocTestSuite
return DocTestSuite()
if __name__ == '__main__':
unittest.main()
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test backwards compatibility for resource managers using register().
The transaction package supports several different APIs for resource
managers. The original ZODB3 API was implemented by ZODB.Connection.
The Connection passed persistent objects to a Transaction's register()
method. It's possible that third-party code also used this API, hence
these tests that the code that adapts the old interface to the current
API works.
These tests use a TestConnection object that implements the old API.
They check that the right methods are called and in roughly the right
order.
Common cases
------------
First, check that a basic transaction commit works.
>>> cn = TestConnection()
>>> cn.register(Object())
>>> cn.register(Object())
>>> cn.register(Object())
>>> transaction.commit()
>>> len(cn.committed)
3
>>> len(cn.aborted)
0
>>> cn.calls
['begin', 'vote', 'finish']
Second, check that a basic transaction abort works. If the
application calls abort(), then the transaction never gets into the
two-phase commit. It just aborts each object.
>>> cn = TestConnection()
>>> cn.register(Object())
>>> cn.register(Object())
>>> cn.register(Object())
>>> transaction.abort()
>>> len(cn.committed)
0
>>> len(cn.aborted)
3
>>> cn.calls
[]
Error handling
--------------
The tricky part of the implementation is recovering from an error that
occurs during the two-phase commit. We override the commit() and
abort() methods of Object to cause errors during commit.
Note that the implementation uses lists internally, so that objects
are committed in the order they are registered. (In the presence of
multiple resource managers, objects from a single resource manager are
committed in order. I'm not sure if this is an accident of the
implementation or a feature that should be supported by any
implementation.)
The order of resource managers depends on sortKey().
>>> cn = TestConnection()
>>> cn.register(Object())
>>> cn.register(CommitError())
>>> cn.register(Object())
>>> transaction.commit()
Traceback (most recent call last):
...
RuntimeError: commit
>>> len(cn.committed)
1
>>> len(cn.aborted)
3
Clean up:
>>> transaction.abort()
"""
import transaction
class Object(object):
def commit(self):
pass
def abort(self):
pass
class CommitError(Object):
def commit(self):
raise RuntimeError("commit")
class AbortError(Object):
def abort(self):
raise RuntimeError("abort")
class BothError(CommitError, AbortError):
pass
class TestConnection:
def __init__(self):
self.committed = []
self.aborted = []
self.calls = []
def register(self, obj):
obj._p_jar = self
transaction.get().register(obj)
def sortKey(self):
return str(id(self))
def tpc_begin(self, txn):
self.calls.append("begin")
def tpc_vote(self, txn):
self.calls.append("vote")
def tpc_finish(self, txn):
self.calls.append("finish")
def tpc_abort(self, txn):
self.calls.append("abort")
def commit(self, obj, txn):
obj.commit()
self.committed.append(obj)
def abort(self, obj, txn):
obj.abort()
self.aborted.append(obj)
from zope.testing import doctest
def test_suite():
return doctest.DocTestSuite()
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of savepoint feature
$Id$
"""
import unittest
from zope.testing import doctest
def testRollbackRollsbackDataManagersThatJoinedLater():
"""
A savepoint needs to not just rollback it's savepoints, but needs to
rollback savepoints for data managers that joined savepoints after the
savepoint:
>>> import transaction.tests.savepointsample
>>> dm = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> dm['name'] = 'bob'
>>> sp1 = transaction.savepoint()
>>> dm['job'] = 'geek'
>>> sp2 = transaction.savepoint()
>>> dm['salary'] = 'fun'
>>> dm2 = transaction.tests.savepointsample.SampleSavepointDataManager()
>>> dm2['name'] = 'sally'
>>> 'name' in dm
True
>>> 'job' in dm
True
>>> 'salary' in dm
True
>>> 'name' in dm2
True
>>> sp1.rollback()
>>> 'name' in dm
True
>>> 'job' in dm
False
>>> 'salary' in dm
False
>>> 'name' in dm2
False
"""
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite('../savepoint.txt'),
doctest.DocTestSuite(),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
##############################################################################
#
# Copyright (c) 2001, 2002, 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test transaction behavior for variety of cases.
I wrote these unittests to investigate some odd transaction
behavior when doing unittests of integrating non sub transaction
aware objects, and to insure proper txn behavior. these
tests test the transaction system independent of the rest of the
zodb.
you can see the method calls to a jar by passing the
keyword arg tracing to the modify method of a dataobject.
the value of the arg is a prefix used for tracing print calls
to that objects jar.
the number of times a jar method was called can be inspected
by looking at an attribute of the jar that is the method
name prefixed with a c (count/check).
i've included some tracing examples for tests that i thought
were illuminating as doc strings below.
TODO
add in tests for objects which are modified multiple times,
for example an object that gets modified in multiple sub txns.
$Id$
"""
import unittest
import warnings
import transaction
from ZODB.utils import positive_id
from ZODB.tests.warnhook import WarningsHook
class TransactionTests(unittest.TestCase):
def setUp(self):
mgr = self.transaction_manager = transaction.TransactionManager()
self.sub1 = DataObject(mgr)
self.sub2 = DataObject(mgr)
self.sub3 = DataObject(mgr)
self.nosub1 = DataObject(mgr, nost=1)
# basic tests with two sub trans jars
# really we only need one, so tests for
# sub1 should identical to tests for sub2
def testTransactionCommit(self):
self.sub1.modify()
self.sub2.modify()
self.transaction_manager.commit()
assert self.sub1._p_jar.ccommit_sub == 0
assert self.sub1._p_jar.ctpc_finish == 1
def testTransactionAbort(self):
self.sub1.modify()
self.sub2.modify()
self.transaction_manager.abort()
assert self.sub2._p_jar.cabort == 1
def testTransactionNote(self):
t = self.transaction_manager.get()
t.note('This is a note.')
self.assertEqual(t.description, 'This is a note.')
t.note('Another.')
self.assertEqual(t.description, 'This is a note.\n\nAnother.')
t.abort()
# repeat adding in a nonsub trans jars
def testNSJTransactionCommit(self):
self.nosub1.modify()
self.transaction_manager.commit()
assert self.nosub1._p_jar.ctpc_finish == 1
def testNSJTransactionAbort(self):
self.nosub1.modify()
self.transaction_manager.abort()
assert self.nosub1._p_jar.ctpc_finish == 0
assert self.nosub1._p_jar.cabort == 1
### Failure Mode Tests
#
# ok now we do some more interesting
# tests that check the implementations
# error handling by throwing errors from
# various jar methods
###
# first the recoverable errors
def testExceptionInAbort(self):
self.sub1._p_jar = BasicJar(errors='abort')
self.nosub1.modify()
self.sub1.modify(nojar=1)
self.sub2.modify()
try:
self.transaction_manager.abort()
except TestTxnException: pass
assert self.nosub1._p_jar.cabort == 1
assert self.sub2._p_jar.cabort == 1
def testExceptionInCommit(self):
self.sub1._p_jar = BasicJar(errors='commit')
self.nosub1.modify()
self.sub1.modify(nojar=1)
try:
self.transaction_manager.commit()
except TestTxnException: pass
assert self.nosub1._p_jar.ctpc_finish == 0
assert self.nosub1._p_jar.ccommit == 1
assert self.nosub1._p_jar.ctpc_abort == 1
def testExceptionInTpcVote(self):
self.sub1._p_jar = BasicJar(errors='tpc_vote')
self.nosub1.modify()
self.sub1.modify(nojar=1)
try:
self.transaction_manager.commit()
except TestTxnException: pass
assert self.nosub1._p_jar.ctpc_finish == 0
assert self.nosub1._p_jar.ccommit == 1
assert self.nosub1._p_jar.ctpc_abort == 1
assert self.sub1._p_jar.ctpc_abort == 1
def testExceptionInTpcBegin(self):
"""
ok this test reveals a bug in the TM.py
as the nosub tpc_abort there is ignored.
nosub calling method tpc_begin
nosub calling method commit
sub calling method tpc_begin
sub calling method abort
sub calling method tpc_abort
nosub calling method tpc_abort
"""
self.sub1._p_jar = BasicJar(errors='tpc_begin')
self.nosub1.modify()
self.sub1.modify(nojar=1)
try:
self.transaction_manager.commit()
except TestTxnException:
pass
assert self.nosub1._p_jar.ctpc_abort == 1
assert self.sub1._p_jar.ctpc_abort == 1
def testExceptionInTpcAbort(self):
self.sub1._p_jar = BasicJar(errors=('tpc_abort', 'tpc_vote'))
self.nosub1.modify()
self.sub1.modify(nojar=1)
try:
self.transaction_manager.commit()
except TestTxnException:
pass
assert self.nosub1._p_jar.ctpc_abort == 1
# last test, check the hosing mechanism
## def testHoserStoppage(self):
## # It's hard to test the "hosed" state of the database, where
## # hosed means that a failure occurred in the second phase of
## # the two phase commit. It's hard because the database can
## # recover from such an error if it occurs during the very first
## # tpc_finish() call of the second phase.
## for obj in self.sub1, self.sub2:
## j = HoserJar(errors='tpc_finish')
## j.reset()
## obj._p_jar = j
## obj.modify(nojar=1)
## try:
## transaction.commit()
## except TestTxnException:
## pass
## self.assert_(Transaction.hosed)
## self.sub2.modify()
## try:
## transaction.commit()
## except Transaction.POSException.TransactionError:
## pass
## else:
## self.fail("Hosed Application didn't stop commits")
class DataObject:
def __init__(self, transaction_manager, nost=0):
self.transaction_manager = transaction_manager
self.nost = nost
self._p_jar = None
def modify(self, nojar=0, tracing=0):
if not nojar:
if self.nost:
self._p_jar = BasicJar(tracing=tracing)
else:
self._p_jar = BasicJar(tracing=tracing)
self.transaction_manager.get().join(self._p_jar)
class TestTxnException(Exception):
pass
class BasicJar:
def __init__(self, errors=(), tracing=0):
if not isinstance(errors, tuple):
errors = errors,
self.errors = errors
self.tracing = tracing
self.cabort = 0
self.ccommit = 0
self.ctpc_begin = 0
self.ctpc_abort = 0
self.ctpc_vote = 0
self.ctpc_finish = 0
self.cabort_sub = 0
self.ccommit_sub = 0
def __repr__(self):
return "<%s %X %s>" % (self.__class__.__name__,
positive_id(self),
self.errors)
def sortKey(self):
# All these jars use the same sort key, and Python's list.sort()
# is stable. These two
return self.__class__.__name__
def check(self, method):
if self.tracing:
print '%s calling method %s'%(str(self.tracing),method)
if method in self.errors:
raise TestTxnException("error %s" % method)
## basic jar txn interface
def abort(self, *args):
self.check('abort')
self.cabort += 1
def commit(self, *args):
self.check('commit')
self.ccommit += 1
def tpc_begin(self, txn, sub=0):
self.check('tpc_begin')
self.ctpc_begin += 1
def tpc_vote(self, *args):
self.check('tpc_vote')
self.ctpc_vote += 1
def tpc_abort(self, *args):
self.check('tpc_abort')
self.ctpc_abort += 1
def tpc_finish(self, *args):
self.check('tpc_finish')
self.ctpc_finish += 1
class HoserJar(BasicJar):
# The HoserJars coordinate their actions via the class variable
# committed. The check() method will only raise its exception
# if committed > 0.
committed = 0
def reset(self):
# Calling reset() on any instance will reset the class variable.
HoserJar.committed = 0
def check(self, method):
if HoserJar.committed > 0:
BasicJar.check(self, method)
def tpc_finish(self, *args):
self.check('tpc_finish')
self.ctpc_finish += 1
HoserJar.committed += 1
def test_join():
"""White-box test of the join method
The join method is provided for "backward-compatability" with ZODB 4
data managers.
The argument to join must be a zodb4 data manager,
transaction.interfaces.IDataManager.
>>> from ZODB.tests.sampledm import DataManager
>>> from transaction._transaction import DataManagerAdapter
>>> t = transaction.Transaction()
>>> dm = DataManager()
>>> t.join(dm)
The end result is that a data manager adapter is one of the
transaction's objects:
>>> isinstance(t._resources[0], DataManagerAdapter)
True
>>> t._resources[0]._datamanager is dm
True
"""
def hook():
pass
# deprecated38; remove this then
def test_beforeCommitHook():
"""Test beforeCommitHook.
Let's define a hook to call, and a way to see that it was called.
>>> log = []
>>> def reset_log():
... del log[:]
>>> def hook(arg='no_arg', kw1='no_kw1', kw2='no_kw2'):
... log.append("arg %r kw1 %r kw2 %r" % (arg, kw1, kw2))
beforeCommitHook is deprecated, so we need cruft to suppress the
warnings.
>>> whook = WarningsHook()
>>> whook.install()
Fool the warnings module into delivering the warnings despite that
they've been seen before; this is needed in case this test is run
more than once.
>>> import warnings
>>> warnings.filterwarnings("always", category=DeprecationWarning)
Now register the hook with a transaction.
>>> import transaction
>>> t = transaction.begin()
>>> t.beforeCommitHook(hook, '1')
Make sure it triggered a deprecation warning:
>>> len(whook.warnings)
1
>>> message, category, filename, lineno = whook.warnings[0]
>>> print message
This will be removed in ZODB 3.8:
Use addBeforeCommitHook instead of beforeCommitHook.
>>> category.__name__
'DeprecationWarning'
>>> whook.clear()
We can see that the hook is indeed registered.
>>> [(hook.func_name, args, kws)
... for hook, args, kws in t.getBeforeCommitHooks()]
[('hook', ('1',), {})]
When transaction commit starts, the hook is called, with its
arguments.
>>> log
[]
>>> t.commit()
>>> log
["arg '1' kw1 'no_kw1' kw2 'no_kw2'"]
>>> reset_log()
A hook's registration is consumed whenever the hook is called. Since
the hook above was called, it's no longer registered:
>>> len(list(t.getBeforeCommitHooks()))
0
>>> transaction.commit()
>>> log
[]
The hook is only called for a full commit, not for a savepoint.
>>> t = transaction.begin()
>>> t.beforeCommitHook(hook, 'A', kw1='B')
>>> dummy = t.savepoint()
>>> log
[]
>>> t.commit()
>>> log
["arg 'A' kw1 'B' kw2 'no_kw2'"]
>>> reset_log()
If a transaction is aborted, no hook is called.
>>> t = transaction.begin()
>>> t.beforeCommitHook(hook, "OOPS!")
>>> transaction.abort()
>>> log
[]
>>> transaction.commit()
>>> log
[]
The hook is called before the commit does anything, so even if the
commit fails the hook will have been called. To provoke failures in
commit, we'll add failing resource manager to the transaction.
>>> class CommitFailure(Exception):
... pass
>>> class FailingDataManager:
... def tpc_begin(self, txn, sub=False):
... raise CommitFailure
... def abort(self, txn):
... pass
>>> t = transaction.begin()
>>> t.join(FailingDataManager())
>>> t.beforeCommitHook(hook, '2')
>>> t.commit()
Traceback (most recent call last):
...
CommitFailure
>>> log
["arg '2' kw1 'no_kw1' kw2 'no_kw2'"]
>>> reset_log()
Let's register several hooks.
>>> t = transaction.begin()
>>> t.beforeCommitHook(hook, '4', kw1='4.1')
>>> t.beforeCommitHook(hook, '5', kw2='5.2')
They are returned in the same order by getBeforeCommitHooks.
>>> [(hook.func_name, args, kws) #doctest: +NORMALIZE_WHITESPACE
... for hook, args, kws in t.getBeforeCommitHooks()]
[('hook', ('4',), {'kw1': '4.1'}),
('hook', ('5',), {'kw2': '5.2'})]
And commit also calls them in this order.
>>> t.commit()
>>> len(log)
2
>>> log #doctest: +NORMALIZE_WHITESPACE
["arg '4' kw1 '4.1' kw2 'no_kw2'",
"arg '5' kw1 'no_kw1' kw2 '5.2'"]
>>> reset_log()
While executing, a hook can itself add more hooks, and they will all
be called before the real commit starts.
>>> def recurse(txn, arg):
... log.append('rec' + str(arg))
... if arg:
... txn.beforeCommitHook(hook, '-')
... txn.beforeCommitHook(recurse, txn, arg-1)
>>> t = transaction.begin()
>>> t.beforeCommitHook(recurse, t, 3)
>>> transaction.commit()
>>> log #doctest: +NORMALIZE_WHITESPACE
['rec3',
"arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec2',
"arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec1',
"arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec0']
>>> reset_log()
We have to uninstall the warnings hook so that other warnings don't get
lost.
>>> whook.uninstall()
Obscure: There is no API call for removing the filter we added, but
filters appears to be a public variable.
>>> del warnings.filters[0]
"""
def test_addBeforeCommitHook():
"""Test addBeforeCommitHook.
Let's define a hook to call, and a way to see that it was called.
>>> log = []
>>> def reset_log():
... del log[:]
>>> def hook(arg='no_arg', kw1='no_kw1', kw2='no_kw2'):
... log.append("arg %r kw1 %r kw2 %r" % (arg, kw1, kw2))
Now register the hook with a transaction.
>>> import transaction
>>> t = transaction.begin()
>>> t.addBeforeCommitHook(hook, '1')
We can see that the hook is indeed registered.
>>> [(hook.func_name, args, kws)
... for hook, args, kws in t.getBeforeCommitHooks()]
[('hook', ('1',), {})]
When transaction commit starts, the hook is called, with its
arguments.
>>> log
[]
>>> t.commit()
>>> log
["arg '1' kw1 'no_kw1' kw2 'no_kw2'"]
>>> reset_log()
A hook's registration is consumed whenever the hook is called. Since
the hook above was called, it's no longer registered:
>>> len(list(t.getBeforeCommitHooks()))
0
>>> transaction.commit()
>>> log
[]
The hook is only called for a full commit, not for a savepoint.
>>> t = transaction.begin()
>>> t.addBeforeCommitHook(hook, 'A', dict(kw1='B'))
>>> dummy = t.savepoint()
>>> log
[]
>>> t.commit()
>>> log
["arg 'A' kw1 'B' kw2 'no_kw2'"]
>>> reset_log()
If a transaction is aborted, no hook is called.
>>> t = transaction.begin()
>>> t.addBeforeCommitHook(hook, ["OOPS!"])
>>> transaction.abort()
>>> log
[]
>>> transaction.commit()
>>> log
[]
The hook is called before the commit does anything, so even if the
commit fails the hook will have been called. To provoke failures in
commit, we'll add failing resource manager to the transaction.
>>> class CommitFailure(Exception):
... pass
>>> class FailingDataManager:
... def tpc_begin(self, txn, sub=False):
... raise CommitFailure
... def abort(self, txn):
... pass
>>> t = transaction.begin()
>>> t.join(FailingDataManager())
>>> t.addBeforeCommitHook(hook, '2')
>>> t.commit()
Traceback (most recent call last):
...
CommitFailure
>>> log
["arg '2' kw1 'no_kw1' kw2 'no_kw2'"]
>>> reset_log()
Let's register several hooks.
>>> t = transaction.begin()
>>> t.addBeforeCommitHook(hook, '4', dict(kw1='4.1'))
>>> t.addBeforeCommitHook(hook, '5', dict(kw2='5.2'))
They are returned in the same order by getBeforeCommitHooks.
>>> [(hook.func_name, args, kws) #doctest: +NORMALIZE_WHITESPACE
... for hook, args, kws in t.getBeforeCommitHooks()]
[('hook', ('4',), {'kw1': '4.1'}),
('hook', ('5',), {'kw2': '5.2'})]
And commit also calls them in this order.
>>> t.commit()
>>> len(log)
2
>>> log #doctest: +NORMALIZE_WHITESPACE
["arg '4' kw1 '4.1' kw2 'no_kw2'",
"arg '5' kw1 'no_kw1' kw2 '5.2'"]
>>> reset_log()
While executing, a hook can itself add more hooks, and they will all
be called before the real commit starts.
>>> def recurse(txn, arg):
... log.append('rec' + str(arg))
... if arg:
... txn.addBeforeCommitHook(hook, '-')
... txn.addBeforeCommitHook(recurse, (txn, arg-1))
>>> t = transaction.begin()
>>> t.addBeforeCommitHook(recurse, (t, 3))
>>> transaction.commit()
>>> log #doctest: +NORMALIZE_WHITESPACE
['rec3',
"arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec2',
"arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec1',
"arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec0']
>>> reset_log()
When modifing persitent objects within before commit hooks
modifies the objects, of course :)
Start a new transaction
>>> t = transaction.begin()
Create a DB instance and add a IOBTree within
>>> from ZODB.tests.util import DB
>>> from ZODB.tests.util import P
>>> db = DB()
>>> con = db.open()
>>> root = con.root()
>>> root['p'] = P('julien')
>>> p = root['p']
>>> p.name
'julien'
This hook will get the object from the `DB` instance and change
the flag attribute.
>>> def hookmodify(status, arg=None, kw1='no_kw1', kw2='no_kw2'):
... p.name = 'jul'
Now register this hook and commit.
>>> t.addBeforeCommitHook(hookmodify, (p, 1))
>>> transaction.commit()
Nothing should have changed since it should have been aborted.
>>> p.name
'jul'
>>> db.close()
"""
def test_addAfterCommitHook():
"""Test addAfterCommitHook.
Let's define a hook to call, and a way to see that it was called.
>>> log = []
>>> def reset_log():
... del log[:]
>>> def hook(status, arg='no_arg', kw1='no_kw1', kw2='no_kw2'):
... log.append("%r arg %r kw1 %r kw2 %r" % (status, arg, kw1, kw2))
Now register the hook with a transaction.
>>> import transaction
>>> t = transaction.begin()
>>> t.addAfterCommitHook(hook, '1')
We can see that the hook is indeed registered.
>>> [(hook.func_name, args, kws)
... for hook, args, kws in t.getAfterCommitHooks()]
[('hook', ('1',), {})]
When transaction commit is done, the hook is called, with its
arguments.
>>> log
[]
>>> t.commit()
>>> log
["True arg '1' kw1 'no_kw1' kw2 'no_kw2'"]
>>> reset_log()
A hook's registration is consumed whenever the hook is called. Since
the hook above was called, it's no longer registered:
>>> len(list(t.getAfterCommitHooks()))
0
>>> transaction.commit()
>>> log
[]
The hook is only called after a full commit, not for a savepoint.
>>> t = transaction.begin()
>>> t.addAfterCommitHook(hook, 'A', dict(kw1='B'))
>>> dummy = t.savepoint()
>>> log
[]
>>> t.commit()
>>> log
["True arg 'A' kw1 'B' kw2 'no_kw2'"]
>>> reset_log()
If a transaction is aborted, no hook is called.
>>> t = transaction.begin()
>>> t.addAfterCommitHook(hook, ["OOPS!"])
>>> transaction.abort()
>>> log
[]
>>> transaction.commit()
>>> log
[]
The hook is called after the commit is done, so even if the
commit fails the hook will have been called. To provoke failures in
commit, we'll add failing resource manager to the transaction.
>>> class CommitFailure(Exception):
... pass
>>> class FailingDataManager:
... def tpc_begin(self, txn):
... raise CommitFailure
... def abort(self, txn):
... pass
>>> t = transaction.begin()
>>> t.join(FailingDataManager())
>>> t.addAfterCommitHook(hook, '2')
>>> t.commit()
Traceback (most recent call last):
...
CommitFailure
>>> log
["False arg '2' kw1 'no_kw1' kw2 'no_kw2'"]
>>> reset_log()
Let's register several hooks.
>>> t = transaction.begin()
>>> t.addAfterCommitHook(hook, '4', dict(kw1='4.1'))
>>> t.addAfterCommitHook(hook, '5', dict(kw2='5.2'))
They are returned in the same order by getAfterCommitHooks.
>>> [(hook.func_name, args, kws) #doctest: +NORMALIZE_WHITESPACE
... for hook, args, kws in t.getAfterCommitHooks()]
[('hook', ('4',), {'kw1': '4.1'}),
('hook', ('5',), {'kw2': '5.2'})]
And commit also calls them in this order.
>>> t.commit()
>>> len(log)
2
>>> log #doctest: +NORMALIZE_WHITESPACE
["True arg '4' kw1 '4.1' kw2 'no_kw2'",
"True arg '5' kw1 'no_kw1' kw2 '5.2'"]
>>> reset_log()
While executing, a hook can itself add more hooks, and they will all
be called before the real commit starts.
>>> def recurse(status, txn, arg):
... log.append('rec' + str(arg))
... if arg:
... txn.addAfterCommitHook(hook, '-')
... txn.addAfterCommitHook(recurse, (txn, arg-1))
>>> t = transaction.begin()
>>> t.addAfterCommitHook(recurse, (t, 3))
>>> transaction.commit()
>>> log #doctest: +NORMALIZE_WHITESPACE
['rec3',
"True arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec2',
"True arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec1',
"True arg '-' kw1 'no_kw1' kw2 'no_kw2'",
'rec0']
>>> reset_log()
If an after commit hook is raising an exception then it will log a
message at error level so that if other hooks are registered they
can be executed. We don't support execution dependencies at this level.
>>> mgr = transaction.TransactionManager()
>>> do = DataObject(mgr)
>>> def hookRaise(status, arg='no_arg', kw1='no_kw1', kw2='no_kw2'):
... raise TypeError("Fake raise")
>>> t = transaction.begin()
>>> t.addAfterCommitHook(hook, ('-', 1))
>>> t.addAfterCommitHook(hookRaise, ('-', 2))
>>> t.addAfterCommitHook(hook, ('-', 3))
>>> transaction.commit()
>>> log
["True arg '-' kw1 1 kw2 'no_kw2'", "True arg '-' kw1 3 kw2 'no_kw2'"]
>>> reset_log()
Test that the associated transaction manager has been cleanup when
after commit hooks are registered
>>> mgr = transaction.TransactionManager()
>>> do = DataObject(mgr)
>>> t = transaction.begin()
>>> len(t._manager._txns)
1
>>> t.addAfterCommitHook(hook, ('-', 1))
>>> transaction.commit()
>>> log
["True arg '-' kw1 1 kw2 'no_kw2'"]
>>> len(t._manager._txns)
0
>>> reset_log()
The transaction is already committed when the after commit hooks
will be executed. Executing the hooks must not have further
effects on persistent objects.
Start a new transaction
>>> t = transaction.begin()
Create a DB instance and add a IOBTree within
>>> from ZODB.tests.util import DB
>>> from ZODB.tests.util import P
>>> db = DB()
>>> con = db.open()
>>> root = con.root()
>>> root['p'] = P('julien')
>>> p = root['p']
>>> p.name
'julien'
This hook will get the object from the `DB` instance and change
the flag attribute.
>>> def badhook(status, arg=None, kw1='no_kw1', kw2='no_kw2'):
... p.name = 'jul'
Now register this hook and commit.
>>> t.addAfterCommitHook(badhook, (p, 1))
>>> transaction.commit()
Nothing should have changed since it should have been aborted.
>>> p.name
'julien'
>>> db.close()
"""
def test_suite():
from zope.testing.doctest import DocTestSuite, DocFileSuite
return unittest.TestSuite((
DocFileSuite('doom.txt'),
DocTestSuite(),
unittest.makeSuite(TransactionTests),
))
if __name__ == '__main__':
unittest.TextTestRunner().run(test_suite())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment