diff --git a/product/CMFActivity/Activity/RAMDict.py b/product/CMFActivity/Activity/RAMDict.py index 30514923792e1b203ae8e239a934561c4b0fa951..0e81877b1c8282eec6672661050129eea5ee7adb 100755 --- a/product/CMFActivity/Activity/RAMDict.py +++ b/product/CMFActivity/Activity/RAMDict.py @@ -45,14 +45,20 @@ class RAMDict(Queue): Queue.__init__(self) self.dict = {} + def getDict(self, activity_tool): + path = activity_tool.getPhysicalPath() + if not self.queue_dict.has_key(path): + self.queue_dict[path] = {} + return self.queue_dict[path] + def finishQueueMessage(self, activity_tool, m): if m.is_registered: - self.dict[(tuple(m.object_path), m.method_id)] = m + self.getDict(activity_tool)[(tuple(m.object_path), m.method_id)] = m def finishDeleteMessage(self, activity_tool, message): - for key, m in self.dict.items(): + for key, m in self.getDict(activity_tool).items(): if m.object_path == message.object_path and m.method_id == message.method_id: - del self.dict[(tuple(m.object_path), m.method_id)] + del self.getDict(activity_tool)[(tuple(m.object_path), m.method_id)] def registerActivityBuffer(self, activity_buffer): class_name = self.__class__.__name__ @@ -70,20 +76,24 @@ class RAMDict(Queue): m.is_registered = 1 def dequeueMessage(self, activity_tool, processing_node): - if len(self.dict.keys()) is 0: + if len(self.getDict(activity_tool).keys()) is 0: return 1 # Go to sleep - for key, m in self.dict.items(): + for key, m in self.getDict(activity_tool).items(): if m.validate(self, activity_tool): activity_tool.invoke(m) - del self.dict[key] - get_transaction().commit() - return 0 + if m.is_executed: + del self.getDict(activity_tool)[key] + get_transaction().commit() + return 0 + else: + # Start a new transaction and keep on to next message + get_transaction().commit() return 1 def hasActivity(self, activity_tool, object, **kw): if object is not None: object_path = object.getPhysicalPath() - for m in self.dict.values(): + for m in self.getDict(activity_tool).values(): if m.object_path == object_path: return 1 else: @@ -97,8 +107,7 @@ class RAMDict(Queue): # Parse each message in registered for m in activity_tool.getRegisteredMessageList(self): if object_path == m.object_path and (method_id is None or method_id == m.method_id): - activity_tool.unregisterMessage(self, m) - if not method_dict.has_key(method_id): + if not method_dict.has_key(m.method_id): if invoke: # First Validate if m.validate(self, activity_tool): @@ -107,18 +116,41 @@ class RAMDict(Queue): # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (method_id , path)) + else: + method_dict[m.method_id] = 1 + activity_tool.unregisterMessage(self, m) else: # The message no longer exists raise ActivityFlushError, ( - 'The document %s does not exist' % path) + 'The document %s does not exist' % path) + else: + method_dict[m.method_id] = 1 + activity_tool.unregisterMessage(self, m) + else: + method_dict[m.method_id] = 1 + activity_tool.unregisterMessage(self, m) # Parse each message in RAM dict - for key, m in self.dict.items(): + for key, m in self.getDict(activity_tool).items(): if object_path == m.object_path and (method_id is None or method_id == m.method_id): - LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) - if invoke: activity_tool.invoke(m) - self.deleteMessage(activity_tool, m) + if not method_dict.has_key(m.method_id): + LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) + if invoke: + activity_tool.invoke(m) + if m.is_executed: + method_dict[m.method_id] = 1 + self.deleteMessage(activity_tool, m) + else: + method_dict[m.method_id] = 1 + self.deleteMessage(activity_tool, m) + else: + self.deleteMessage(activity_tool, m) def getMessageList(self, activity_tool, processing_node=None): - return self.dict.values() - + new_queue = [] + for m in self.getDict(activity_tool).values(): + m.processing_node = 1 + m.priority = 0 + new_queue.append(m) + return new_queue + registerActivity(RAMDict)