From 033c5022014681b1e01037838e9491c81f617d82 Mon Sep 17 00:00:00 2001 From: Jean-Paul Smets <jp@nexedi.com> Date: Sat, 10 Apr 2004 17:12:05 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@660 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/CMFActivity/ActivityBuffer.py | 110 ++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100755 product/CMFActivity/ActivityBuffer.py diff --git a/product/CMFActivity/ActivityBuffer.py b/product/CMFActivity/ActivityBuffer.py new file mode 100755 index 0000000000..039c177c1b --- /dev/null +++ b/product/CMFActivity/ActivityBuffer.py @@ -0,0 +1,110 @@ +############################################################################## +# +# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved. +# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved. +# Jean-Paul Smets-Solanes <jp@nexedi.com> +# +# WARNING: This program as such is intended to be used by professional +# programmers who take the whole responsability of assessing all potential +# consequences resulting from its eventual inadequacies and bugs +# End users who are looking for a ready-to-use solution with commercial +# garantees and support are strongly adviced to contract a Free Software +# Service Company +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE +# +# +# Based on: db.py in ZMySQLDA +# +############################################################################## + +from Shared.DC.ZRDB.TM import TM +from zLOG import LOG, ERROR, INFO + +class ActivityBuffer(TM): + + _p_oid=_p_changed=_registered=None + + def __init__(self): + from thread import allocate_lock + self._use_TM = self._transactions = 1 + if self._use_TM: + self._tlock = allocate_lock() + self._tthread = None + self._lock = allocate_lock() + + # Keeps a list of messages to add and remove + # at end of transaction + def _begin(self, *ignored): + from thread import get_ident + self._tlock.acquire() + self._tthread = get_ident() + self.requires_prepare = 1 + try: + LOG("_begin", 0, '') + self.queued_activity = [] + self.flushed_activity = [] + except: + LOG('ActivityBuffer', ERROR, "exception during _begin", + error=sys.exc_info()) + self._tlock.release() + raise + + def _finish(self, *ignored): + from thread import get_ident + if not self._tlock.locked() or self._tthread != get_ident(): + LOG('ActivityBuffer', INFO, "ignoring _finish") + return + try: + try: + # Try to push / delete all messages + for (activity, activity_tool, message) in self.flushed_activity: + activity.finishDeleteMessage(activity_tool, message) + for (activity, activity_tool, message) in self.queued_activity: + activity.finishQueueMessage(activity_tool, message) + except: + LOG('ActivityBuffer', ERROR, "exception during _finish", + error=sys.exc_info()) + raise + finally: + self._tlock.release() + + def _abort(self, *ignored): + from thread import get_ident + if not self._tlock.locked() or self._tthread != get_ident(): + LOG('ActivityBuffer', 0, "ignoring _abort") + return + self._tlock.release() + + def tpc_prepare(self, *ignored): + if not self.requires_prepare: return + self.requires_prepare = 0 + from thread import get_ident + if not self._tlock.locked() or self._tthread != get_ident(): + LOG('ActivityBuffer', 0, "ignoring tpc_prepare") + return + try: + # Try to push / delete all messages + for (activity, activity_tool, message) in self.flushed_activity: + activity.prepareDeleteMessage(activity_tool, message) + for (activity, activity_tool, message) in self.queued_activity: + activity.prepareQueueMessage(activity_tool, message) + except: + LOG('ActivityBuffer', ERROR, "exception during tpc_prepare", + error=sys.exc_info()) + raise + + def deferredQueueMessage(self, activity_tool, activity, message): + self._register() + LOG("deferredQueueMessage", 0, '') + self.queued_activity.append((activity, activity_tool, message)) + + def deferredDeleteMessage(self, activity_tool, activity, message): + self._register() + LOG("deferredDeleteMessage", 0, '') + self.flushed_activity.append((activity, activity_tool, message)) -- 2.30.9