ActivityBuffer.py 5.89 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#          Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
#
# Based on: db.py in ZMySQLDA
#
##############################################################################

from Shared.DC.ZRDB.TM import TM
from zLOG import LOG, ERROR, INFO
Jean-Paul Smets's avatar
Jean-Paul Smets committed
28
import sys
Jean-Paul Smets's avatar
Jean-Paul Smets committed
29 30

class ActivityBuffer(TM):
31

Jean-Paul Smets's avatar
Jean-Paul Smets committed
32
    _p_oid=_p_changed=_registered=None
33

Jean-Paul Smets's avatar
Jean-Paul Smets committed
34 35 36 37 38 39 40
    def __init__(self):
        from thread import allocate_lock
        self._use_TM = self._transactions = 1
        if self._use_TM:
            self._tlock = allocate_lock()
            self._tthread = None
        self._lock = allocate_lock()
41

Jean-Paul Smets's avatar
Jean-Paul Smets committed
42 43 44 45
    # Keeps a list of messages to add and remove
    # at end of transaction
    def _begin(self, *ignored):
        from thread import get_ident
Jean-Paul Smets's avatar
Jean-Paul Smets committed
46
        from ActivityTool import activity_list
Jean-Paul Smets's avatar
Jean-Paul Smets committed
47 48 49 50 51 52
        self._tlock.acquire()
        self._tthread = get_ident()
        self.requires_prepare = 1
        try:
            self.queued_activity = []
            self.flushed_activity = []
Jean-Paul Smets's avatar
Jean-Paul Smets committed
53 54
            for activity in activity_list:              # Reset registration for each transaction
                activity.registerActivityBuffer(self)
55 56 57 58 59 60 61
            # In Zope 2.8 (ZODB 3.4), use beforeCommitHook instead of
            # patching Trasaction.
            transaction = get_transaction()
            try:
              transaction.beforeCommitHook(self.tpc_prepare, transaction)
            except AttributeError:
              pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
62 63 64 65 66
        except:
            LOG('ActivityBuffer', ERROR, "exception during _begin",
                error=sys.exc_info())
            self._tlock.release()
            raise
67

Jean-Paul Smets's avatar
Jean-Paul Smets committed
68 69 70 71 72 73 74 75 76
    def _finish(self, *ignored):
        from thread import get_ident
        if not self._tlock.locked() or self._tthread != get_ident():
            LOG('ActivityBuffer', INFO, "ignoring _finish")
            return
        try:
            try:
                # Try to push / delete all messages
                for (activity, activity_tool, message) in self.flushed_activity:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
77
                    #LOG('ActivityBuffer finishDeleteMessage', ERROR, str(message.method_id))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
78 79
                    activity.finishDeleteMessage(activity_tool, message)
                for (activity, activity_tool, message) in self.queued_activity:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
80
                    #LOG('ActivityBuffer finishQueueMessage', ERROR, str(message.method_id))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
                    activity.finishQueueMessage(activity_tool, message)
            except:
                LOG('ActivityBuffer', ERROR, "exception during _finish",
                    error=sys.exc_info())
                raise
        finally:
            self._tlock.release()

    def _abort(self, *ignored):
        from thread import get_ident
        if not self._tlock.locked() or self._tthread != get_ident():
            LOG('ActivityBuffer', 0, "ignoring _abort")
            return
        self._tlock.release()

96 97 98
    def tpc_prepare(self, transaction, sub=None):
        if sub is not None: # Do nothing if it is a subtransaction
          return
Jean-Paul Smets's avatar
Jean-Paul Smets committed
99 100 101 102 103 104 105 106 107
        if not self.requires_prepare: return
        self.requires_prepare = 0
        from thread import get_ident
        if not self._tlock.locked() or self._tthread != get_ident():
            LOG('ActivityBuffer', 0, "ignoring tpc_prepare")
            return
        try:
            # Try to push / delete all messages
            for (activity, activity_tool, message) in self.flushed_activity:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
108
                #LOG('ActivityBuffer prepareDeleteMessage', ERROR, str(message.method_id))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
109
                activity.prepareDeleteMessage(activity_tool, message)
110
            activity_dict = {}
Jean-Paul Smets's avatar
Jean-Paul Smets committed
111
            for (activity, activity_tool, message) in self.queued_activity:
112 113 114 115 116 117 118 119 120 121 122
                key = (activity, activity_tool)
                if key not in activity_dict:
                    activity_dict[key] = []
                activity_dict[key].append(message)
            for key, message_list in activity_dict.items():
                activity, activity_tool = key
                if hasattr(activity, 'prepareQueueMessageList'):
                    activity.prepareQueueMessageList(activity_tool, message_list)
                else:
                  for message in message_list:
                      activity.prepareQueueMessage(activity_tool, message)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
123 124 125 126
        except:
            LOG('ActivityBuffer', ERROR, "exception during tpc_prepare",
                error=sys.exc_info())
            raise
127

Jean-Paul Smets's avatar
Jean-Paul Smets committed
128
    def deferredQueueMessage(self, activity_tool, activity, message):
129
      self._register()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
130 131 132 133
      # Activity is called to prevent queuing some messages (useful for example
      # to prevent reindexing objects multiple times)
      if not activity.isMessageRegistered(self, activity_tool, message):
        self.queued_activity.append((activity, activity_tool, message))
134
        # We register queued messages so that we can
Jean-Paul Smets's avatar
Jean-Paul Smets committed
135 136
        # unregister them
        activity.registerMessage(self, activity_tool, message)
137

Jean-Paul Smets's avatar
Jean-Paul Smets committed
138 139 140
    def deferredDeleteMessage(self, activity_tool, activity, message):
      self._register()
      self.flushed_activity.append((activity, activity_tool, message))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
141