############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
"""Transaction objects manage resources for an individual activity.

Compatibility issues
--------------------

The implementation of Transaction objects involves two layers of
backwards compatibility, because this version of transaction supports
both ZODB 3 and ZODB 4.  Zope is evolving towards the ZODB4
interfaces.

Transaction has two methods for a resource manager to call to
participate in a transaction -- register() and join().  join() takes a
resource manager and adds it to the list of resources.  register() is
for backwards compatibility.  It takes a persistent object and
registers its _p_jar attribute.  TODO: explain adapter

Subtransactions
---------------

Note: Suntransactions are deprecated!  Use savepoint/rollback instead.

A subtransaction applies the transaction notion recursively.  It
allows a set of modifications within a transaction to be committed or
aborted as a group.  A subtransaction is a strictly local activity;
its changes are not visible to any other database connection until the
top-level transaction commits.  In addition to its use to organize a
large transaction, subtransactions can be used to optimize memory use.
ZODB must keep modified objects in memory until a transaction commits
and it can write the changes to the storage.  A subtransaction uses a
temporary disk storage for its commits, allowing modified objects to
be flushed from memory when the subtransaction commits.

The commit() and abort() methods take an optional subtransaction
argument that defaults to false.  If it is a true, the operation is
performed on a subtransaction.

Subtransactions add a lot of complexity to the transaction
implementation.  Some resource managers support subtransactions, but
they are not required to.  (ZODB Connection is the only standard
resource manager that supports subtransactions.)  Resource managers
that do support subtransactions implement abort_sub() and commit_sub()
methods and support a second argument to tpc_begin().

The second argument to tpc_begin() indicates that a subtransaction
commit is beginning (if it is true).  In a subtransaction, there is no
tpc_vote() call, because sub-transactions don't need 2-phase commit.
If a sub-transaction abort or commit fails, we can abort the outer
transaction.  The tpc_finish() or tpc_abort() call applies just to
that subtransaction.

Once a resource manager is involved in a subtransaction, all
subsequent transactions will be treated as subtransactions until
abort_sub() or commit_sub() is called.  abort_sub() will undo all the
changes of the subtransactions.  commit_sub() will begin a top-level
transaction and store all the changes from subtransactions.  After
commit_sub(), the transaction must still call tpc_vote() and
tpc_finish().

If the resource manager does not support subtransactions, nothing
happens when the subtransaction commits.  Instead, the resource
manager is put on a list of managers to commit when the actual
top-level transaction commits.  If this happens, it will not be
possible to abort subtransactions.

Two-phase commit
----------------

A transaction commit involves an interaction between the transaction
object and one or more resource managers.  The transaction manager
calls the following four methods on each resource manager; it calls
tpc_begin() on each resource manager before calling commit() on any of
them.

    1. tpc_begin(txn)
    2. commit(txn)
    3. tpc_vote(txn)
    4. tpc_finish(txn)

Subtransaction commit
---------------------

Note: Subtransactions are deprecated!

When a subtransaction commits, the protocol is different.

1. tpc_begin() is passed a second argument, which indicates that a
   subtransaction is being committed.
2. tpc_vote() is not called.

Once a subtransaction has been committed, the top-level transaction
commit will start with a commit_sub() call instead of a tpc_begin()
call.

Before-commit hook
---------------

Sometimes, applications want to execute some code when a transaction is
committed.  For example, one might want to delay object indexing until a
transaction commits, rather than indexing every time an object is changed.
Or someone might want to check invariants only after a set of operations.  A
pre-commit hook is available for such use cases, just use beforeCommitHook()
passing it a callable and arguments.  The callable will be called with its
arguments at the start of the commit (but not for substransaction commits).

Error handling
--------------

When errors occur during two-phase commit, the transaction manager
aborts all the resource managers.  The specific methods it calls
depend on whether the error occurs before or after the call to
tpc_vote() on that transaction manager.

If the resource manager has not voted, then the resource manager will
have one or more uncommitted objects.  There are two cases that lead
to this state; either the transaction manager has not called commit()
for any objects on this resource manager or the call that failed was a
commit() for one of the objects of this resource manager.  For each
uncommitted object, including the object that failed in its commit(),
call abort().

Once uncommitted objects are aborted, tpc_abort() or abort_sub() is
called on each resource manager.

Synchronization
---------------

You can register sychronization objects (synchronizers) with the
tranasction manager.  The synchronizer must implement
beforeCompletion() and afterCompletion() methods.  The transaction
manager calls beforeCompletion() when it starts a top-level two-phase
commit.  It calls afterCompletion() when a top-level transaction is
committed or aborted.  The methods are passed the current Transaction
as their only argument.
"""

import logging
import sys
import thread
import warnings
import weakref
import traceback
from cStringIO import StringIO

from zope import interface
from transaction import interfaces

# Sigh.  In the maze of __init__.py's, ZODB.__init__.py takes 'get'
# out of transaction.__init__.py, in order to stuff the 'get_transaction'
# alias in __builtin__.  So here in _transaction.py, we can't import
# exceptions from ZODB.POSException at top level (we're imported by
# our __init__.py, which is imported by ZODB's __init__, so the ZODB
# package isn't well-formed when we're first imported).
# from ZODB.POSException import TransactionError, TransactionFailedError

_marker = object()

# The point of this is to avoid hiding exceptions (which the builtin
# hasattr() does).
def myhasattr(obj, attr):
    return getattr(obj, attr, _marker) is not _marker

class Status:
    # ACTIVE is the initial state.
    ACTIVE       = "Active"

    COMMITTING   = "Committing"
    COMMITTED    = "Committed"

    # commit() or commit(True) raised an exception.  All further attempts
    # to commit or join this transaction will raise TransactionFailedError.
    COMMITFAILED = "Commit failed"

class Transaction(object):

    interface.implements(interfaces.ITransaction,
                         interfaces.ITransactionDeprecated)


    # Assign an index to each savepoint so we can invalidate later savepoints
    # on rollback.  The first index assigned is 1, and it goes up by 1 each
    # time.
    _savepoint_index = 0

    # If savepoints are used, keep a weak key dict of them.  This maps a
    # savepoint to its index (see above).
    _savepoint2index = None

    # Remember the savepoint for the last subtransaction.
    _subtransaction_savepoint = None

    # Meta data.  ._extension is also metadata, but is initialized to an
    # emtpy dict in __init__.
    user = ""
    description = ""

    def __init__(self, synchronizers=None, manager=None):
        self.status = Status.ACTIVE
        # List of resource managers, e.g. MultiObjectResourceAdapters.
        self._resources = []

        # Weak set of synchronizer objects to call.
        if synchronizers is None:
            from ZODB.utils import WeakSet
            synchronizers = WeakSet()
        self._synchronizers = synchronizers

        self._manager = manager

        # _adapters: Connection/_p_jar -> MultiObjectResourceAdapter[Sub]
        self._adapters = {}
        self._voted = {} # id(Connection) -> boolean, True if voted
        # _voted and other dictionaries use the id() of the resource
        # manager as a key, because we can't guess whether the actual
        # resource managers will be safe to use as dict keys.

        # The user, description, and _extension attributes are accessed
        # directly by storages, leading underscore notwithstanding.
        self._extension = {}

        self.log = logging.getLogger("txn.%d" % thread.get_ident())
        self.log.debug("new transaction")

        # If a commit fails, the traceback is saved in _failure_traceback.
        # If another attempt is made to commit, TransactionFailedError is
        # raised, incorporating this traceback.
        self._failure_traceback = None

        # Holds (hook, args, kws) triples added by beforeCommitHook.
        # TODO:  in Python 2.4, change to collections.deque; lists can be
        # inefficient for FIFO access of this kind.
        self._before_commit = []

    # Raise TransactionFailedError, due to commit()/join()/register()
    # getting called when the current transaction has already suffered
    # a commit/savepoint failure.
    def _prior_operation_failed(self):
        from ZODB.POSException import TransactionFailedError
        assert self._failure_traceback is not None
        raise TransactionFailedError("An operation previously failed, "
                "with traceback:\n\n%s" %
                self._failure_traceback.getvalue())

    def join(self, resource):
        if self.status is Status.COMMITFAILED:
            self._prior_operation_failed() # doesn't return

        if self.status is not Status.ACTIVE:
            # TODO: Should it be possible to join a committing transaction?
            # I think some users want it.
            raise ValueError("expected txn status %r, but it's %r" % (
                             Status.ACTIVE, self.status))
        # TODO: the prepare check is a bit of a hack, perhaps it would
        # be better to use interfaces.  If this is a ZODB4-style
        # resource manager, it needs to be adapted, too.
        if myhasattr(resource, "prepare"):
            # TODO: deprecate 3.6
            resource = DataManagerAdapter(resource)
        self._resources.append(resource)

        if self._savepoint2index:
            # A data manager has joined a transaction *after* a savepoint
            # was created.  A couple of things are different in this case:
            #
            # 1. We need to add its savepoint to all previous savepoints.
            # so that if they are rolled back, we roll this one back too.
            #
            # 2. We don't actually need to ask the data manager for a
            # savepoint:  because it's just joining, we can just abort it to
            # roll back to the current state, so we simply use an
            # AbortSavepoint.
            datamanager_savepoint = AbortSavepoint(resource, self)
            for transaction_savepoint in self._savepoint2index.keys():
                transaction_savepoint._savepoints.append(
                    datamanager_savepoint)

    def savepoint(self, optimistic=False):
        if self.status is Status.COMMITFAILED:
            self._prior_operation_failed() # doesn't return, it raises

        try:
            savepoint = Savepoint(self, optimistic, *self._resources)
        except:
            self._cleanup(self._resources)
            self._saveCommitishError() # reraises!

        if self._savepoint2index is None:
            self._savepoint2index = weakref.WeakKeyDictionary()
        self._savepoint_index += 1
        self._savepoint2index[savepoint] = self._savepoint_index

        return savepoint

    # Remove `savepoint` from _savepoint2index, and also remove and invalidate
    # all savepoints we know about with an index larger than `savepoint`'s.
    # This is what's needed when a rollback _to_ `savepoint` is done.
    def _remove_and_invalidate_after(self, savepoint):
        savepoint2index = self._savepoint2index
        index = savepoint2index.pop(savepoint)
        # use items to make copy to avoid mutating while iterating
        for savepoint, i in savepoint2index.items():
            if i > index:
                savepoint.transaction = None # invalidate
                del savepoint2index[savepoint]

    # Invalidate and forget about all savepoints.
    def _invalidate_all_savepoints(self):
        for savepoint in self._savepoint2index.keys():
            savepoint.transaction = None # invalidate
        self._savepoint2index.clear()


    def register(self, obj):
        # The old way of registering transaction participants.
        #
        # register() is passed either a persisent object or a
        # resource manager like the ones defined in ZODB.DB.
        # If it is passed a persistent object, that object should
        # be stored when the transaction commits.  For other
        # objects, the object implements the standard two-phase
        # commit protocol.

        manager = getattr(obj, "_p_jar", obj)
        if manager is None:
            raise ValueError("Register with no manager")
        adapter = self._adapters.get(manager)
        if adapter is None:
            adapter = MultiObjectResourceAdapter(manager)
            adapter.objects.append(obj)
            self._adapters[manager] = adapter
            self.join(adapter)
        else:
            # TODO: comment out this expensive assert later
            # Use id() to guard against proxies.
            assert id(obj) not in map(id, adapter.objects)
            adapter.objects.append(obj)

    def begin(self):
        from ZODB.utils import deprecated36

        deprecated36("Transaction.begin() should no longer be used; use "
                      "the begin() method of a transaction manager.")
        if (self._resources or self._synchronizers):
            self.abort()
        # Else aborting wouldn't do anything, except if _manager is non-None,
        # in which case it would do nothing besides uselessly free() this
        # transaction.

    def commit(self, subtransaction=False):

        if self._savepoint2index:
            self._invalidate_all_savepoints()

        if subtransaction:
            # TODO deprecate subtransactions
            self._subtransaction_savepoint = self.savepoint(1)
            return

        if self.status is Status.COMMITFAILED:
            self._prior_operation_failed() # doesn't return

        self._callBeforeCommitHooks()

        self._synchronizers.map(lambda s: s.beforeCompletion(self))
        self.status = Status.COMMITTING

        try:
            self._commitResources()
        except:
            self._saveCommitishError() # This raises!

        self.status = Status.COMMITTED
        if self._manager:
            self._manager.free(self)
        self._synchronizers.map(lambda s: s.afterCompletion(self))
        self.log.debug("commit")

    def _saveCommitishError(self):
        self.status = Status.COMMITFAILED
        # Save the traceback for TransactionFailedError.
        ft = self._failure_traceback = StringIO()
        t, v, tb = sys.exc_info()
        # Record how we got into commit().
        traceback.print_stack(sys._getframe(1), None, ft)
        # Append the stack entries from here down to the exception.
        traceback.print_tb(tb, None, ft)
        # Append the exception type and value.
        ft.writelines(traceback.format_exception_only(t, v))
        raise t, v, tb

    def getBeforeCommitHooks(self):
        return iter(self._before_commit)

    def beforeCommitHook(self, hook, *args, **kws):
        self._before_commit.append((hook, args, kws))

    def _callBeforeCommitHooks(self):
        # Call all hooks registered, allowing further registrations
        # during processing.
        while self._before_commit:
            hook, args, kws = self._before_commit.pop(0)
            hook(*args, **kws)

    def _commitResources(self):
        # Execute the two-phase commit protocol.

        L = list(self._resources)
        L.sort(rm_cmp)
        try:
            for rm in L:
                rm.tpc_begin(self)
            for rm in L:
                rm.commit(self)
                self.log.debug("commit %r" % rm)
            for rm in L:
                rm.tpc_vote(self)
                self._voted[id(rm)] = True

            try:
                for rm in L:
                    rm.tpc_finish(self)
            except:
                # TODO: do we need to make this warning stronger?
                # TODO: It would be nice if the system could be configured
                # to stop committing transactions at this point.
                self.log.critical("A storage error occured during the second "
                                  "phase of the two-phase commit.  Resources "
                                  "may be in an inconsistent state.")
                raise
        except:
            # If an error occurs committing a transaction, we try
            # to revert the changes in each of the resource managers.
            t, v, tb = sys.exc_info()
            try:
                self._cleanup(L)
            finally:
                self._synchronizers.map(lambda s: s.afterCompletion(self))
            raise t, v, tb

    def _cleanup(self, L):
        # Called when an exception occurs during tpc_vote or tpc_finish.
        for rm in L:
            if id(rm) not in self._voted:
                try:
                    rm.abort(self)
                except Exception:
                    self.log.error("Error in abort() on manager %s",
                                   rm, exc_info=sys.exc_info())
        for rm in L:
            try:
                rm.tpc_abort(self)
            except Exception:
                self.log.error("Error in tpc_abort() on manager %s",
                               rm, exc_info=sys.exc_info())

    def abort(self, subtransaction=False):

        if subtransaction:
            # TODO deprecate subtransactions
            if not self._subtransaction_savepoint:
                raise interfaces.InvalidSavepointRollbackError
            if self._subtransaction_savepoint.valid:
                # We're supposed to be able to call abort(1) multiple
                # times. Sigh.
                self._subtransaction_savepoint.rollback()
            return

        if self._savepoint2index:
            self._invalidate_all_savepoints()

        self._synchronizers.map(lambda s: s.beforeCompletion(self))

        tb = None
        for rm in self._resources:
            try:
                rm.abort(self)
            except:
                if tb is None:
                    t, v, tb = sys.exc_info()
                self.log.error("Failed to abort resource manager: %s",
                               rm, exc_info=sys.exc_info())

        if self._manager:
            self._manager.free(self)

        self._synchronizers.map(lambda s: s.afterCompletion(self))

        self.log.debug("abort")

        if tb is not None:
            raise t, v, tb

    def note(self, text):
        text = text.strip()
        if self.description:
            self.description += "\n\n" + text
        else:
            self.description = text

    def setUser(self, user_name, path="/"):
        self.user = "%s %s" % (path, user_name)

    def setExtendedInfo(self, name, value):
        self._extension[name] = value

# TODO: We need a better name for the adapters.

class MultiObjectResourceAdapter(object):
    """Adapt the old-style register() call to the new-style join().

    With join(), a resource mananger like a Connection registers with
    the transaction manager.  With register(), an individual object
    is passed to register().
    """

    def __init__(self, jar):
        self.manager = jar
        self.objects = []
        self.ncommitted = 0

    def __repr__(self):
        return "<%s for %s at %s>" % (self.__class__.__name__,
                                      self.manager, id(self))

    def sortKey(self):
        return self.manager.sortKey()

    def tpc_begin(self, txn):
        self.manager.tpc_begin(txn)

    def tpc_finish(self, txn):
        self.manager.tpc_finish(txn)

    def tpc_abort(self, txn):
        self.manager.tpc_abort(txn)

    def commit(self, txn):
        for o in self.objects:
            self.manager.commit(o, txn)
            self.ncommitted += 1

    def tpc_vote(self, txn):
        self.manager.tpc_vote(txn)

    def abort(self, txn):
        tb = None
        for o in self.objects:
            try:
                self.manager.abort(o, txn)
            except:
                # Capture the first exception and re-raise it after
                # aborting all the other objects.
                if tb is None:
                    t, v, tb = sys.exc_info()
                txn.log.error("Failed to abort object: %s",
                              object_hint(o), exc_info=sys.exc_info())
        if tb is not None:
            raise t, v, tb

def rm_cmp(rm1, rm2):
    return cmp(rm1.sortKey(), rm2.sortKey())

def object_hint(o):
    """Return a string describing the object.

    This function does not raise an exception.
    """

    from ZODB.utils import oid_repr

    # We should always be able to get __class__.
    klass = o.__class__.__name__
    # oid would be great, but may this isn't a persistent object.
    oid = getattr(o, "_p_oid", _marker)
    if oid is not _marker:
        oid = oid_repr(oid)
    return "%s oid=%s" % (klass, oid)

# TODO: deprecate for 3.6.
class DataManagerAdapter(object):
    """Adapt zodb 4-style data managers to zodb3 style

    Adapt transaction.interfaces.IDataManager to
    ZODB.interfaces.IPureDatamanager
    """

    # Note that it is pretty important that this does not have a _p_jar
    # attribute. This object will be registered with a zodb3 TM, which
    # will then try to get a _p_jar from it, using it as the default.
    # (Objects without a _p_jar are their own data managers.)

    def __init__(self, datamanager):
        self._datamanager = datamanager

    # TODO: I'm not sure why commit() doesn't do anything

    def commit(self, transaction):
        # We don't do anything here because ZODB4-style data managers
        # didn't have a separate commit step
        pass

    def abort(self, transaction):
        self._datamanager.abort(transaction)

    def tpc_begin(self, transaction):
        # We don't do anything here because ZODB4-style data managers
        # didn't have a separate tpc_begin step
        pass

    def tpc_abort(self, transaction):
        self._datamanager.abort(transaction)

    def tpc_finish(self, transaction):
        self._datamanager.commit(transaction)

    def tpc_vote(self, transaction):
        self._datamanager.prepare(transaction)

    def sortKey(self):
        return self._datamanager.sortKey()

class Savepoint:
    """Transaction savepoint.

    Transaction savepoints coordinate savepoints for data managers
    participating in a transaction.
    """
    interface.implements(interfaces.ISavepoint)

    valid = property(lambda self: self.transaction is not None)

    def __init__(self, transaction, optimistic, *resources):
        self.transaction = transaction
        self._savepoints = savepoints = []

        for datamanager in resources:
            try:
                savepoint = datamanager.savepoint
            except AttributeError:
                if not optimistic:
                    raise TypeError("Savepoints unsupported", datamanager)
                savepoint = NoRollbackSavepoint(datamanager)
            else:
                savepoint = savepoint()

            savepoints.append(savepoint)

    def rollback(self):
        transaction = self.transaction
        if transaction is None:
            raise interfaces.InvalidSavepointRollbackError
        self.transaction = None
        transaction._remove_and_invalidate_after(self)

        try:
            for savepoint in self._savepoints:
                savepoint.rollback()
        except:
            # Mark the transaction as failed
            transaction._saveCommitishError() # reraises!

class AbortSavepoint:

    def __init__(self, datamanager, transaction):
        self.datamanager = datamanager
        self.transaction = transaction

    def rollback(self):
        self.datamanager.abort(self.transaction)

class NoRollbackSavepoint:

    def __init__(self, datamanager):
        self.datamanager = datamanager

    def rollback(self):
        raise TypeError("Savepoints unsupported", self.datamanager)