Commit f38abe79 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Use RAM-based approach for ActivityBuffer instead of a volatile attribute. Clean up some mess.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@13188 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 0e0868f3
...@@ -134,9 +134,9 @@ class ActiveObject(ExtensionClass.Base): ...@@ -134,9 +134,9 @@ class ActiveObject(ExtensionClass.Base):
def recursiveFlushActivity(self, invoke=0, **kw): def recursiveFlushActivity(self, invoke=0, **kw):
# flush all activities related to this object # flush all activities related to this object
self.flushActivity(invoke=invoke, **kw) self.flushActivity(invoke=invoke, **kw)
if hasattr(aq_base(self), 'objectValues'): if getattr(aq_base(self), 'objectValues', None) is not None:
for o in self.objectValues(): for o in self.objectValues():
if hasattr(aq_base(o), 'recursiveFlushActivity'): if getattr(aq_base(o), 'recursiveFlushActivity', None) is not None:
o.recursiveFlushActivity(invoke=invoke, **kw) o.recursiveFlushActivity(invoke=invoke, **kw)
security.declareProtected( permissions.View, 'hasActivity' ) security.declareProtected( permissions.View, 'hasActivity' )
......
...@@ -143,7 +143,7 @@ class Queue: ...@@ -143,7 +143,7 @@ class Queue:
going to be executed going to be executed
""" """
try: try:
if activity_tool.unrestrictedTraverse(message.object_path) is None: if activity_tool.unrestrictedTraverse(message.object_path, None) is None:
# Do not try to call methods on objects which do not exist # Do not try to call methods on objects which do not exist
LOG('WARNING ActivityTool', 0, LOG('WARNING ActivityTool', 0,
'Object %s does not exist' % '/'.join(message.object_path)) 'Object %s does not exist' % '/'.join(message.object_path))
...@@ -214,27 +214,23 @@ class Queue: ...@@ -214,27 +214,23 @@ class Queue:
# Registration Management # Registration Management
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__ pass
setattr(activity_buffer, '_%s_message_list' % class_name, [])
def isMessageRegistered(self, activity_buffer, activity_tool, m): def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ message_list = activity_buffer.getMessageList(self)
return m in getattr(activity_buffer, '_%s_message_list' % class_name) return m in message_list
def registerMessage(self, activity_buffer, activity_tool, m): def registerMessage(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ message_list = activity_buffer.getMessageList(self)
getattr(activity_buffer, '_%s_message_list' % class_name).append(m) message_list.append(m)
m.is_registered = 1 m.is_registered = 1
def unregisterMessage(self, activity_buffer, activity_tool, m): def unregisterMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 0 m.is_registered = 0
def getRegisteredMessageList(self, activity_buffer, activity_tool): def getRegisteredMessageList(self, activity_buffer, activity_tool):
class_name = self.__class__.__name__ message_list = activity_buffer.getMessageList(self)
if hasattr(activity_buffer, '_%s_message_list' % class_name): return [m for m in message_list if m.is_registered]
return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name))
else:
return ()
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, activity_tool, delay): def timeShift(self, activity_tool, delay):
......
...@@ -63,18 +63,17 @@ class RAMDict(Queue): ...@@ -63,18 +63,17 @@ class RAMDict(Queue):
del self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)] del self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)]
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__ pass
setattr(activity_buffer, '_%s_message_list' % class_name, [])
setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
def isMessageRegistered(self, activity_buffer, activity_tool, m): def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ uid_set = activity_buffer.getUidSet(self)
return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((tuple(m.object_path), m.method_id)) return (tuple(m.object_path), m.method_id) in uid_set
def registerMessage(self, activity_buffer, activity_tool, m): def registerMessage(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ message_list = activity_buffer.getMessageList(self)
getattr(activity_buffer, '_%s_message_list' % class_name).append(m) message_list.append(m)
getattr(activity_buffer, '_%s_uid_dict' % class_name)[(tuple(m.object_path), m.method_id)] = 1 uid_set = activity_buffer.getUidSet(self)
uid_set.add((tuple(m.object_path), m.method_id))
m.is_registered = 1 m.is_registered = 1
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
......
...@@ -116,36 +116,28 @@ class SQLDict(RAMDict): ...@@ -116,36 +116,28 @@ class SQLDict(RAMDict):
# Registration management # Registration management
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__ pass
setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
setattr(activity_buffer, '_%s_message_list' % class_name, [])
def isMessageRegistered(self, activity_buffer, activity_tool, m): def isMessageRegistered(self, activity_buffer, activity_tool, m):
class_name = self.__class__.__name__ uid_set = activity_buffer.getUidSet(self)
uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name) return (tuple(m.object_path), m.method_id) in uid_set
return uid_dict.has_key((tuple(m.object_path), m.method_id))
def registerMessage(self, activity_buffer, activity_tool, m): def registerMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 1 m.is_registered = 1
class_name = self.__class__.__name__ uid_set = activity_buffer.getUidSet(self)
uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name) uid_set.add((tuple(m.object_path), m.method_id))
uid_dict[(tuple(m.object_path), m.method_id)] = 1 message_list = activity_buffer.getMessageList(self)
getattr(activity_buffer,'_%s_message_list' % class_name).append(m) message_list.append(m)
def unregisterMessage(self, activity_buffer, activity_tool, m): def unregisterMessage(self, activity_buffer, activity_tool, m):
m.is_registered = 0 # This prevents from inserting deleted messages into the queue m.is_registered = 0 # This prevents from inserting deleted messages into the queue
class_name = self.__class__.__name__ class_name = self.__class__.__name__
uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name) uid_set = activity_buffer.getUidSet(self)
if uid_dict.has_key((tuple(m.object_path), m.method_id)): uid_set.discard((tuple(m.object_path), m.method_id))
del uid_dict[(tuple(m.object_path), m.method_id)]
def getRegisteredMessageList(self, activity_buffer, activity_tool): def getRegisteredMessageList(self, activity_buffer, activity_tool):
class_name = self.__class__.__name__ message_list = activity_buffer.getMessageList(self)
if hasattr(activity_buffer,'_%s_message_list' % class_name):
message_list = getattr(activity_buffer,'_%s_message_list' % class_name)
return [m for m in message_list if m.is_registered] return [m for m in message_list if m.is_registered]
else:
return ()
def getOrderValidationText(self, message): def getOrderValidationText(self, message):
# Return an identifier of validators related to ordering. # Return an identifier of validators related to ordering.
...@@ -456,7 +448,9 @@ class SQLDict(RAMDict): ...@@ -456,7 +448,9 @@ class SQLDict(RAMDict):
else: else:
raise ActivityFlushError, ( raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path)) 'Could not validate %s on %s' % (m.method_id , path))
self.deleteMessage(activity_tool, m)
if len(result):
activity_tool.SQLDict_delMessage(uid = [line.uid for line in result])
def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw): def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
# YO: reading all lines might cause a deadlock # YO: reading all lines might cause a deadlock
......
...@@ -72,7 +72,7 @@ class SQLQueue(RAMQueue): ...@@ -72,7 +72,7 @@ class SQLQueue(RAMQueue):
def prepareDeleteMessage(self, activity_tool, m): def prepareDeleteMessage(self, activity_tool, m):
# Erase all messages in a single transaction # Erase all messages in a single transaction
#LOG("prepareDeleteMessage", 0, str(m.__dict__)) #LOG("prepareDeleteMessage", 0, str(m.__dict__))
activity_tool.SQLQueue_delMessage(uid = m.uid) activity_tool.SQLQueue_delMessage(uid = [m.uid])
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None) readMessage = getattr(activity_tool, 'SQLQueue_readMessage', None)
...@@ -147,7 +147,7 @@ class SQLQueue(RAMQueue): ...@@ -147,7 +147,7 @@ class SQLQueue(RAMQueue):
if m.is_executed: if m.is_executed:
activity_tool.SQLQueue_delMessage(uid=line.uid) # Delete it activity_tool.SQLQueue_delMessage(uid=[line.uid]) # Delete it
else: else:
try: try:
# If not, abort transaction and start a new one # If not, abort transaction and start a new one
...@@ -234,7 +234,9 @@ class SQLQueue(RAMQueue): ...@@ -234,7 +234,9 @@ class SQLQueue(RAMQueue):
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
self.deleteMessage(activity_tool, m)
if len(result):
activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result])
# def start(self, activity_tool, active_process=None): # def start(self, activity_tool, active_process=None):
# uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process) # uid_list = activity_tool.SQLQueue_readUidList(path=path, active_process=active_process)
......
...@@ -26,23 +26,29 @@ ...@@ -26,23 +26,29 @@
from Shared.DC.ZRDB.TM import TM from Shared.DC.ZRDB.TM import TM
from zLOG import LOG, ERROR, INFO from zLOG import LOG, ERROR, INFO
import sys import sys
from thread import allocate_lock, get_ident import threading
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
except ImportError: except ImportError:
pass pass
# This variable is used to store thread-local buffered information.
# This must be RAM-based, because the use of a volatile attribute does
# not guarantee that the information persists until the end of a
# transaction, but we need to assure that the information is accessible
# for flushing activities. So the approach here is that information is
# stored in RAM, and removed at _finish and _abort, so that the information
# would not span over transactions.
buffer_dict_lock = threading.Lock()
buffer_dict = {}
class ActivityBuffer(TM): class ActivityBuffer(TM):
_p_oid=_p_changed=_registered=None _p_oid=_p_changed=_registered=None
def __init__(self, activity_tool=None): def __init__(self, activity_tool=None):
self._use_TM = self._transactions = 1 self.requires_prepare = 0
if self._use_TM:
self._tlock = allocate_lock()
self._tthread = None
self._lock = allocate_lock()
# Directly store the activity tool as an attribute. At the beginning # Directly store the activity tool as an attribute. At the beginning
# the activity tool was stored as a part of the key in queued_activity and # the activity tool was stored as a part of the key in queued_activity and
...@@ -57,18 +63,51 @@ class ActivityBuffer(TM): ...@@ -57,18 +63,51 @@ class ActivityBuffer(TM):
# so store only the required information. # so store only the required information.
self._activity_tool_path = activity_tool.getPhysicalPath() self._activity_tool_path = activity_tool.getPhysicalPath()
try:
buffer_dict_lock.acquire()
if self._activity_tool_path not in buffer_dict:
buffer_dict[self._activity_tool_path] = threading.local()
finally:
buffer_dict_lock.release()
# Create attributes only if they are not present.
buffer = self._getBuffer()
if not hasattr(buffer, 'queued_activity'):
buffer.queued_activity = []
buffer.flushed_activity = []
buffer.message_list_dict = {}
buffer.uid_set_dict = {}
def _getBuffer(self):
return buffer_dict[self._activity_tool_path]
def _clearBuffer(self):
buffer = self._getBuffer()
del buffer.queued_activity[:]
del buffer.flushed_activity[:]
buffer.message_list_dict.clear()
buffer.uid_set_dict.clear()
def getMessageList(self, activity):
buffer = self._getBuffer()
return buffer.message_list_dict.setdefault(activity, [])
def getUidSet(self, activity):
buffer = self._getBuffer()
return buffer.uid_set_dict.setdefault(activity, set())
# Keeps a list of messages to add and remove # Keeps a list of messages to add and remove
# at end of transaction # at end of transaction
def _begin(self, *ignored): def _begin(self, *ignored):
LOG('ActivityBuffer', 0, '_begin %r' % (self,))
from ActivityTool import activity_list from ActivityTool import activity_list
self._tlock.acquire()
self._tthread = get_ident()
self.requires_prepare = 1 self.requires_prepare = 1
try: try:
self.queued_activity = []
self.flushed_activity = [] # Reset registration for each transaction.
for activity in activity_list: # Reset registration for each transaction for activity in activity_list:
activity.registerActivityBuffer(self) activity.registerActivityBuffer(self)
# In Zope 2.8 (ZODB 3.4), use beforeCommitHook instead of # In Zope 2.8 (ZODB 3.4), use beforeCommitHook instead of
# patching Trasaction. # patching Trasaction.
transaction = get_transaction() transaction = get_transaction()
...@@ -79,56 +118,46 @@ class ActivityBuffer(TM): ...@@ -79,56 +118,46 @@ class ActivityBuffer(TM):
except: except:
LOG('ActivityBuffer', ERROR, "exception during _begin", LOG('ActivityBuffer', ERROR, "exception during _begin",
error=sys.exc_info()) error=sys.exc_info())
self._tlock.release()
raise raise
def _finish(self, *ignored): def _finish(self, *ignored):
if not self._tlock.locked() or self._tthread != get_ident(): LOG('ActivityBuffer', 0, '_finish %r' % (self,))
LOG('ActivityBuffer', INFO, "ignoring _finish")
return
try: try:
try: try:
# Try to push / delete all messages # Try to push / delete all messages
for (activity, message) in self.flushed_activity: buffer = self._getBuffer()
#LOG('ActivityBuffer finishDeleteMessage', ERROR, str(message.method_id)) for (activity, message) in buffer.flushed_activity:
activity.finishDeleteMessage(self._activity_tool_path, message) activity.finishDeleteMessage(self._activity_tool_path, message)
for (activity, message) in self.queued_activity: for (activity, message) in buffer.queued_activity:
#LOG('ActivityBuffer finishQueueMessage', ERROR, str(message.method_id))
activity.finishQueueMessage(self._activity_tool_path, message) activity.finishQueueMessage(self._activity_tool_path, message)
except: except:
LOG('ActivityBuffer', ERROR, "exception during _finish", LOG('ActivityBuffer', ERROR, "exception during _finish",
error=sys.exc_info()) error=sys.exc_info())
raise raise
finally: finally:
self._tlock.release() self._clearBuffer()
def _abort(self, *ignored): def _abort(self, *ignored):
if not self._tlock.locked() or self._tthread != get_ident(): self._clearBuffer()
LOG('ActivityBuffer', 0, "ignoring _abort")
return
self._tlock.release()
def tpc_prepare(self, transaction, sub=None): def tpc_prepare(self, transaction, sub=None):
if sub is not None: # Do nothing if it is a subtransaction # Do nothing if it is a subtransaction
if sub is not None:
return return
if not self.requires_prepare: return
self.requires_prepare = 0 if not self.requires_prepare:
if not self._tlock.locked() or self._tthread != get_ident():
LOG('ActivityBuffer', 0, "ignoring tpc_prepare")
return return
self.requires_prepare = 0
try: try:
# Try to push / delete all messages # Try to push / delete all messages
for (activity, message) in self.flushed_activity: buffer = self._getBuffer()
#LOG('ActivityBuffer prepareDeleteMessage', ERROR, str(message.method_id)) for (activity, message) in buffer.flushed_activity:
activity.prepareDeleteMessage(self._activity_tool, message) activity.prepareDeleteMessage(self._activity_tool, message)
activity_dict = {} activity_dict = {}
for (activity, message) in self.queued_activity: for (activity, message) in buffer.queued_activity:
key = activity activity_dict.setdefault(activity, []).append(message)
if key not in activity_dict: for activity, message_list in activity_dict.iteritems():
activity_dict[key] = []
activity_dict[key].append(message)
for key, message_list in activity_dict.items():
activity = key
if hasattr(activity, 'prepareQueueMessageList'): if hasattr(activity, 'prepareQueueMessageList'):
activity.prepareQueueMessageList(self._activity_tool, message_list) activity.prepareQueueMessageList(self._activity_tool, message_list)
else: else:
...@@ -144,11 +173,13 @@ class ActivityBuffer(TM): ...@@ -144,11 +173,13 @@ class ActivityBuffer(TM):
# Activity is called to prevent queuing some messages (useful for example # Activity is called to prevent queuing some messages (useful for example
# to prevent reindexing objects multiple times) # to prevent reindexing objects multiple times)
if not activity.isMessageRegistered(self, activity_tool, message): if not activity.isMessageRegistered(self, activity_tool, message):
self.queued_activity.append((activity, message)) buffer = self._getBuffer()
buffer.queued_activity.append((activity, message))
# We register queued messages so that we can # We register queued messages so that we can
# unregister them # unregister them
activity.registerMessage(self, activity_tool, message) activity.registerMessage(self, activity_tool, message)
def deferredDeleteMessage(self, activity_tool, activity, message): def deferredDeleteMessage(self, activity_tool, activity, message):
self._register() self._register()
self.flushed_activity.append((activity, message)) buffer = self._getBuffer()
buffer.flushed_activity.append((activity, message))
...@@ -641,15 +641,15 @@ class ActivityTool (Folder, UniqueObject): ...@@ -641,15 +641,15 @@ class ActivityTool (Folder, UniqueObject):
self._v_activity_buffer._register() # Required if called by flush, outside activate self._v_activity_buffer._register() # Required if called by flush, outside activate
return activity.unregisterMessage(self._v_activity_buffer, self, message) return activity.unregisterMessage(self._v_activity_buffer, self, message)
def flush(self, object, invoke=0, **kw): def flush(self, obj, invoke=0, **kw):
global is_initialized global is_initialized
if not is_initialized: self.initialize() if not is_initialized: self.initialize()
if getattr(self, '_v_activity_buffer', None) is None: if getattr(self, '_v_activity_buffer', None) is None:
self._v_activity_buffer = ActivityBuffer(activity_tool=self) self._v_activity_buffer = ActivityBuffer(activity_tool=self)
if type(object) is TupleType: if isinstance(obj, tuple):
object_path = object object_path = obj
else: else:
object_path = object.getPhysicalPath() object_path = obj.getPhysicalPath()
for activity in activity_list: for activity in activity_list:
activity.flush(self, object_path, invoke=invoke, **kw) activity.flush(self, object_path, invoke=invoke, **kw)
......
...@@ -11,4 +11,5 @@ class_file: ...@@ -11,4 +11,5 @@ class_file:
DELETE FROM DELETE FROM
message_queue message_queue
WHERE WHERE
uid = <dtml-sqlvar uid type="int"> <dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if>
</dtml-in>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment