From f17a5136caecaaad055e66ea8344300da1a69c6f Mon Sep 17 00:00:00 2001 From: Jean-Paul Smets <jp@nexedi.com> Date: Sun, 11 Apr 2004 10:40:19 +0000 Subject: [PATCH] registration API completed more tests needed git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@662 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/CMFActivity/Activity/Queue.py | 41 ++++++++++----- product/CMFActivity/Activity/RAMDict.py | 46 +++++++++++++++-- product/CMFActivity/Activity/RAMQueue.py | 28 ++++++---- product/CMFActivity/Activity/SQLDict.py | 66 ++++++++++++++++++++---- product/CMFActivity/Activity/SQLQueue.py | 16 ++++-- product/CMFActivity/ActivityBuffer.py | 14 +++-- product/CMFActivity/ActivityTool.py | 6 +-- 7 files changed, 167 insertions(+), 50 deletions(-) diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py index 72071b0c7a..56f092fa63 100755 --- a/product/CMFActivity/Activity/Queue.py +++ b/product/CMFActivity/Activity/Queue.py @@ -79,19 +79,13 @@ class Queue: self.activity_tool = activity_tool self.is_initialized = 1 - def queueMessage(self, activity_tool, m): + def queueMessage(self, activity_tool, m): activity_tool.deferredQueueMessage(self, m) - m.is_queued = 1 def deleteMessage(self, activity_tool, m): - activity_tool.deferredDeleteMessage(self, m) - m.is_deleted = 1 - - def isDeleted(self, m): - return m.is_deleted - - def isQueued(self, m): - return m.is_queued + if not self.isMessageDeleted(activity_tool, m): + activity_tool.deferredDeleteMessage(self, m) + # We must never deleted twice def dequeueMessage(self, activity_tool, processing_node): pass @@ -137,7 +131,7 @@ class Queue: def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw): return 0 - def flush(self, activity_tool, object, **kw): + def flush(self, activity_tool, object, **kw): pass def start(self, active_process=None): @@ -171,4 +165,27 @@ class Queue: def finishDequeueMessage(self, activity_tool, m): pass - \ No newline at end of file + + # Registration Management + def registerActivityBuffer(self, activity_buffer): + class_name = self.__class__.__name__ + if not hasattr(activity_buffer, '_%s_message_list' % class_name): + setattr(activity_buffer, '_%s_message_list' % class_name, []) + + def isMessageRegistered(self, activity_buffer, activity_tool, m): + class_name = self.__class__.__name__ + self.registerActivityBuffer(activity_buffer) + return m in getattr(activity_buffer, '_%s_message_list' % class_name) + + def registerMessage(self, activity_buffer, activity_tool, m): + class_name = self.__class__.__name__ + self.registerActivityBuffer(activity_buffer) + getattr(activity_buffer, '_%s_message_list' % class_name).append(m) + m.is_registered = 1 + + def unregisterMessage(self, activity_buffer, activity_tool, m): + m.is_registered = 0 + + def getRegisteredMessageList(self, activity_buffer, activity_tool): + class_name = self.__class__.__name__ + return filter(lambda m: m.is_registered, getattr(activity_buffer, '_%s_message_list' % class_name)) \ No newline at end of file diff --git a/product/CMFActivity/Activity/RAMDict.py b/product/CMFActivity/Activity/RAMDict.py index 9b63054094..dc5a9e6ef5 100755 --- a/product/CMFActivity/Activity/RAMDict.py +++ b/product/CMFActivity/Activity/RAMDict.py @@ -46,13 +46,32 @@ class RAMDict(Queue): self.dict = {} def finishQueueMessage(self, activity_tool, m): - self.dict[(m.object_path, m.method_id)] = m + if m.is_registered: + self.dict[(m.object_path, m.method_id)] = m def finishDeleteMessage(self, activity_tool, message): for key, m in self.dict.items(): if m.object_path == message.object_path and m.method_id == message.method_id: del self.dict[(m.object_path, m.method_id)] + def registerActivityBuffer(self, activity_buffer): + class_name = self.__class__.__name__ + if not hasattr(activity_buffer, '_%s_message_list' % class_name): + setattr(activity_buffer, '_%s_message_list' % class_name, []) + setattr(activity_buffer, '_%s_uid_dict' % class_name, {}) + + def isMessageRegistered(self, activity_buffer, activity_tool, m): + class_name = self.__class__.__name__ + self.registerActivityBuffer(activity_buffer) + return getattr(activity_buffer, '_%s_uid_dict' % class_name).has_key((m.object_path, m.method_id)) + + def registerMessage(self, activity_buffer, activity_tool, m): + class_name = self.__class__.__name__ + self.registerActivityBuffer(activity_buffer) + getattr(activity_buffer, '_%s_message_list' % class_name).append(m) + getattr(activity_buffer, '_%s_uid_dict' % class_name)[(m.object_path, m.method_id)] = 1 + m.is_registered = 1 + def dequeueMessage(self, activity_tool, processing_node): if len(self.dict.keys()) is 0: return 1 # Go to sleep @@ -70,10 +89,31 @@ class RAMDict(Queue): return 1 return 0 - def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): + def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): + path = '/'.join(object_path) + # LOG('Flush', 0, str((path, invoke, method_id))) + method_dict = {} + # Parse each message in registered + for m in activity_tool.getRegisteredMessageList(self): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + self.unregisterMessage(m) + if not method_dict.has_key(method_id): + if invoke: + # First Validate + if m.validate(self, activity_tool): + activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? + if not m.is_executed: # Make sure message could be invoked + # The message no longer exists + raise ActivityFlushError, ( + 'Could not evaluate %s on %s' % (method_id , path)) + else: + # The message no longer exists + raise ActivityFlushError, ( + 'The document %s does not exist' % path) + # Parse each message in RAM dict for key, m in self.dict.items(): if not m.is_deleted: - if m.object_path == object_path: + if object_path == m.object_path and (method_id is None or method_id == m.method_id): LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) if invoke: activity_tool.invoke(m) self.deleteMessage(m) diff --git a/product/CMFActivity/Activity/RAMQueue.py b/product/CMFActivity/Activity/RAMQueue.py index a63d622628..46e95df4fe 100755 --- a/product/CMFActivity/Activity/RAMQueue.py +++ b/product/CMFActivity/Activity/RAMQueue.py @@ -34,21 +34,22 @@ class RAMQueue(Queue): """ A simple RAM based queue """ - message_queue_id = 0 - def __init__(self): Queue.__init__(self) self.queue = [] - + self.last_uid = 0 + def finishQueueMessage(self, activity_tool, m): - self.message_queue_id = self.message_queue_id + 1 - m.message_queue_id = self.message_queue_id - self.queue.append(m) + if m.is_registered: + # XXX - Some lock is required on this section + self.last_uid = self.last_uid + 1 + m.uid = self.last_uid + self.queue.append(m) def finishDeleteMessage(self, activity_tool, m): i = 0 for my_message in self.queue: - if my_message.message_queue_id == m.message_queue_id: + if my_message.uid == m.uid: del self.queue[i] return i = i + 1 @@ -69,11 +70,16 @@ class RAMQueue(Queue): return 0 def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw): + # Parse each message in registered + for m in activity_tool.getRegisteredMessageList(self): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + if invoke: activity_tool.invoke(m) + self.unregisterMessage(m) + # Parse each message in queue for m in self.queue: - if not m.is_deleted: - if m.object_path == object_path: - if invoke: activity_tool.invoke(m) - self.deleteMessage(m) + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + if invoke: activity_tool.invoke(m) + self.deleteMessage(m) def getMessageList(self, activity_tool, processing_node=None): new_queue = [] diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 787f6c72ba..c878261365 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -51,13 +51,14 @@ class SQLDict(RAMDict): and provide sequentiality. Should not create conflict because use of OOBTree. """ - + # Transaction commit methods def prepareQueueMessage(self, activity_tool, m): - activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , - method_id = m.method_id, - priority = m.activity_kw.get('priority', 1), - message = self.dumpMessage(m)) - # Also store uid of activity + if m.is_registered: + activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , + method_id = m.method_id, + priority = m.activity_kw.get('priority', 1), + message = self.dumpMessage(m)) + # Also store uid of activity def prepareDeleteMessage(self, activity_tool, m): # Erase all messages in a single transaction @@ -65,6 +66,32 @@ class SQLDict(RAMDict): uid_list = map(lambda x:x.uid, uid_list) activity_tool.SQLDict_delMessage(uid = uid_list) + # Registration management + def registerActivityBuffer(self, activity_buffer): + if not hasattr(activity_buffer, '_sqldict_uid_dict'): + activity_buffer._sqldict_uid_dict = {} + activity_buffer._sqldict_message_list = [] + + def isMessageRegistered(self, activity_buffer, activity_tool, m): + self.registerActivityBuffer(activity_buffer) + return activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id)) + + def registerMessage(self, activity_buffer, activity_tool, m): + self.registerActivityBuffer(activity_buffer) + m.is_registered = 1 + activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)] = 1 + activity_buffer._sqldict_message_list.append(m) + + def unregisterMessage(self, activity_buffer, activity_tool, m): + self.registerActivityBuffer(activity_buffer) + m.is_registered = 0 # This prevents from inserting deleted messages into the queue + if activity_buffer._sqldict_uid_dict.has_key((m.object_path, m.method_id)): + del activity_buffer._sqldict_uid_dict[(m.object_path, m.method_id)] + + def getRegisteredMessageList(self, activity_buffer, activity_tool): + return filter(lambda m: m.is_registered, activity_buffer._sqldict_message_list) + + # Queue semantic def dequeueMessage(self, activity_tool, processing_node): priority = random.choice(priority_weight) # Try to find a message at given priority level @@ -84,7 +111,7 @@ class SQLDict(RAMDict): activity_tool.SQLDict_processMessage(uid = uid_list) get_transaction().commit() # Release locks before starting a potentially long calculation # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state - m = self.loadMessage(line.message) + m = self.loadMessage(line.message, uid = line.uid) # Make sure object exists if not m.validate(self, activity_tool): if line.priority > MAX_PRIORITY: @@ -151,16 +178,33 @@ class SQLDict(RAMDict): """ path = '/'.join(object_path) # LOG('Flush', 0, str((path, invoke, method_id))) - result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) method_dict = {} - # Parse each message + # Parse each message in registered + for m in activity_tool.getRegisteredMessageList(self): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + self.unregisterMessage(m) + if not method_dict.has_key(method_id): + if invoke: + # First Validate + if m.validate(self, activity_tool): + activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? + if not m.is_executed: # Make sure message could be invoked + # The message no longer exists + raise ActivityFlushError, ( + 'Could not evaluate %s on %s' % (method_id , path)) + else: + # The message no longer exists + raise ActivityFlushError, ( + 'The document %s does not exist' % path) + # Parse each message in SQL dict + result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) for line in result: path = line.path method_id = line.method_id if not method_dict.has_key(method_id): # Only invoke once (it would be different for a queue) method_dict[method_id] = 1 - m = self.loadMessage(line.message) + m = self.loadMessage(line.message, uid = line.uid) self.deleteMessage(m) if invoke: # First Validate @@ -188,7 +232,7 @@ class SQLDict(RAMDict): message_list = [] result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None) for line in result: - m = self.loadMessage(line.message) + m = self.loadMessage(line.message, uid = line.uid) m.processing_node = line.processing_node m.priority = line.priority message_list.append(m) diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index b4b12edc8d..ac14baed9a 100755 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -53,10 +53,11 @@ class SQLQueue(RAMQueue): """ def prepareQueueMessage(self, activity_tool, m): - activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) , - method_id = m.method_id, - priority = m.activity_kw.get('priority', 1), - message = self.dumpMessage(m)) + if m.is_registered: + activity_tool.SQLQueue_writeMessage(path = '/'.join(m.object_path) , + method_id = m.method_id, + priority = m.activity_kw.get('priority', 1), + message = self.dumpMessage(m)) def prepareDeleteMessage(self, activity_tool, m): # Erase all messages in a single transaction @@ -131,10 +132,15 @@ class SQLQueue(RAMQueue): """ return # Do nothing here to precent overlocking path = '/'.join(object_path) + # Parse each message in registered + for m in activity_tool.getRegisteredMessageList(self): + if object_path == m.object_path and (method_id is None or method_id == m.method_id): + if invoke: activity_tool.invoke(m) + self.unregisterMessage(m) + # Parse each message in SQL queue # LOG('Flush', 0, str((path, invoke, method_id))) result = activity_tool.SQLQueue_readMessageList(path=path, method_id=method_id,processing_node=None) method_dict = {} - # Parse each message for line in result: path = line.path method_id = line.method_id diff --git a/product/CMFActivity/ActivityBuffer.py b/product/CMFActivity/ActivityBuffer.py index 039c177c1b..17917de8d6 100755 --- a/product/CMFActivity/ActivityBuffer.py +++ b/product/CMFActivity/ActivityBuffer.py @@ -46,7 +46,6 @@ class ActivityBuffer(TM): self._tthread = get_ident() self.requires_prepare = 1 try: - LOG("_begin", 0, '') self.queued_activity = [] self.flushed_activity = [] except: @@ -100,11 +99,16 @@ class ActivityBuffer(TM): raise def deferredQueueMessage(self, activity_tool, activity, message): - self._register() - LOG("deferredQueueMessage", 0, '') - self.queued_activity.append((activity, activity_tool, message)) + self._register() + # Activity is called to prevent queuing some messages (useful for example + # to prevent reindexing objects multiple times) + if not activity.isMessageRegistered(self, activity_tool, message): + self.queued_activity.append((activity, activity_tool, message)) + # We register queued messages so that we can + # unregister them + activity.registerMessage(self, activity_tool, message) def deferredDeleteMessage(self, activity_tool, activity, message): self._register() - LOG("deferredDeleteMessage", 0, '') self.flushed_activity.append((activity, activity_tool, message)) + diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 8189ba84c8..07c702186d 100755 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -84,9 +84,6 @@ allow_class(Result) class Message: - is_deleted = 0 - is_queued = 0 - def __init__(self, object, active_process, activity_kw, method_id, args, kw): if type(object) is type('a'): self.object_path = object.split('/') @@ -318,6 +315,9 @@ class ActivityTool (Folder, UniqueObject): def deferredDeleteMessage(self, activity, message): self._v_activity_buffer.deferredDeleteMessage(self, activity, message) + def getRegisteredMessageList(self, activity): + return activity.getRegisteredMessageList(self, self._v_activity_buffer) + def flush(self, object, invoke=0, **kw): global is_initialized if not is_initialized: self.initialize() -- 2.30.9