diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index cd44cd9970da0bb25f1aee0d7c3b16252bb6314f..75c5a33cec516652d08ea9f8b7ef6a27e2f246b0 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -67,45 +67,53 @@ class SQLDict(RAMDict): result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) if len(result) == 0: # If empty, take any message - result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=None) + priority = None + result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) if len(result) > 0: line = result[0] path = line.path method_id = line.method_id + uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None ) + uid_list = map(lambda x:x.uid, uid_list) # Make sure message can not be processed anylonger - activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node = processing_node) + if len(uid_list) > 0: + activity_tool.SQLDict_processMessage(uid = uid_list) get_transaction().commit() # Release locks before starting a potentially long calculation m = self.loadMessage(line.message) # Make sure object exists if not m.validate(self, activity_tool): if line.priority > MAX_PRIORITY: # This is an error - activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = VALIDATE_ERROR_STATE) + if len(uid_list) > 0: + activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE) # Assign message back to 'error' state get_transaction().commit() # and commit else: # Lower priority - activity_tool.SQLDict_setPriority(path=path, method_id=method_id, processing_node = processing_node, + if len(uid_list) > 0: + activity_tool.SQLDict_setPriority(uid = uid_list, priority = line.priority + 1) get_transaction().commit() # Release locks before starting a potentially long calculation else: # Try to invoke activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? if m.is_executed: # Make sure message could be invoked - activity_tool.SQLDict_delMessage(path=path, method_id=method_id, - processing_node=processing_node, processing=1) # Delete it + if len(uid_list) > 0: + activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it get_transaction().commit() # If successful, commit else: get_transaction().abort() # If not, abort transaction and start a new one if line.priority > MAX_PRIORITY: # This is an error - activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = INVOKE_ERROR_STATE) + if len(uid_list) > 0: + activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE) # Assign message back to 'error' state get_transaction().commit() # and commit else: # Lower priority - activity_tool.SQLDict_setPriority(path=path, method_id=method_id, processing_node = processing_node, - priority = line.priority + 1) + if len(uid_list) > 0: + activity_tool.SQLDict_setPriority(uid = uid_list, + priority = line.priority + 1) get_transaction().commit() # Release locks before starting a potentially long calculation return 0 get_transaction().commit() # Release locks before starting a potentially long calculation @@ -131,6 +139,8 @@ class SQLDict(RAMDict): NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible """ path = '/'.join(object_path) + uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,processing_node=None) + uid_list = map(lambda x:x.uid, uid_list) # LOG('Flush', 0, str((path, invoke, method_id))) if invoke: result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) @@ -155,7 +165,8 @@ class SQLDict(RAMDict): raise ActivityFlushError, ( 'The document %s does not exist' % path) # Erase all messages in a single transaction - activity_tool.SQLDict_delMessage(path=path, method_id=method_id) # Delete all "old" messages (not -1 processing) + if len(uid_list) > 0: + activity_tool.SQLDict_delMessage(uid = uid_list) # Delete all "old" messages (not -1 processing) def getMessageList(self, activity_tool, processing_node=None): message_list = [] @@ -177,7 +188,7 @@ class SQLDict(RAMDict): if not path_dict.has_key(path): # Only assign once (it would be different for a queue) path_dict[path] = 1 - activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node) + activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None) get_transaction().commit() # Release locks immediately to allow processing of messages processing_node = processing_node + 1 if processing_node > node_count: