diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 987e3d6ed67cbd7ebf7480ed8d8d18330df07641..5c44ea43fe5905b2d81939e5d1b4a58fc0b30330 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -31,6 +31,8 @@ from RAMDict import RAMDict from zLOG import LOG +MAX_RETRY = 10 + DISTRIBUTABLE_STATE = -1 INVOKE_ERROR_STATE = -2 VALIDATE_ERROR_STATE = -3 @@ -56,7 +58,14 @@ class SQLDict(RAMDict): get_transaction().commit() # Release locks before starting a potentially long calculation m = self.loadMessage(line.message) if m.validate(self, activity_tool): - activity_tool.invoke(m) # Try to invoke the message + retry = 0 + while retry < MAX_RETRY: + activity_tool.invoke(m) # Try to invoke the message + if m.is_executed: + retry=MAX_RETRY + else: + get_transaction().abort() # Abort and retry + retry = retry + 1 if m.is_executed: # Make sure message could be invoked activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=processing_node) # Delete it get_transaction().commit() # If successful, commit @@ -104,7 +113,14 @@ class SQLDict(RAMDict): method_dict[method_id] = 1 m = self.loadMessage(line.message) if m.validate(self, activity_tool): - activity_tool.invoke(m) # Try to invoke the message + retry = 0 + while retry < MAX_RETRY: + activity_tool.invoke(m) # Try to invoke the message + if m.is_executed: + retry=MAX_RETRY + else: + get_transaction().abort() # Abort and retry + retry = retry + 1 if m.is_executed: # Make sure message could be invoked activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=processing_node) # Delete it if commit: get_transaction().commit() # If successful, commit