From 6efbc340b2724374366e2494bf6e8ad1ff6a520b Mon Sep 17 00:00:00 2001 From: Vincent Pelletier <vincent@nexedi.com> Date: Fri, 4 Apr 2008 15:39:06 +0000 Subject: [PATCH] Use symbolic constants instead of values 0 and 1 for Message.is_executed . Mark messages as not executable when either their path or the method to call on it cannot be retrieved. Make messages marked as not excutable immediately fail with VALIDATE_ERROR_STATE state. git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@20311 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/CMFActivity/Activity/RAMDict.py | 8 +- product/CMFActivity/Activity/RAMQueue.py | 8 +- product/CMFActivity/Activity/SQLDict.py | 27 ++++-- product/CMFActivity/Activity/SQLQueue.py | 26 ++++-- product/CMFActivity/ActivityTool.py | 79 +++++++++++------- product/CMFActivity/tests/testCMFActivity.py | 87 ++++++++++++++++++++ 6 files changed, 182 insertions(+), 53 deletions(-) diff --git a/product/CMFActivity/Activity/RAMDict.py b/product/CMFActivity/Activity/RAMDict.py index b376e035c3..3a1bcce445 100644 --- a/product/CMFActivity/Activity/RAMDict.py +++ b/product/CMFActivity/Activity/RAMDict.py @@ -26,7 +26,7 @@ # ############################################################################## -from Products.CMFActivity.ActivityTool import registerActivity +from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED from Products.CMFActivity.Errors import ActivityFlushError from Queue import Queue, VALID @@ -83,7 +83,7 @@ class RAMDict(Queue): for key, m in self.getDict(path).items(): if m.validate(self, activity_tool) is VALID: activity_tool.invoke(m) - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: del self.getDict(path)[key] get_transaction().commit() return 0 @@ -133,7 +133,7 @@ class RAMDict(Queue): # First Validate if m.validate(self, activity_tool) is VALID: activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? - if not m.is_executed: # Make sure message could be invoked + if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (method_id , path)) @@ -158,7 +158,7 @@ class RAMDict(Queue): LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) if invoke: activity_tool.invoke(m) - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: method_dict[m.method_id] = 1 self.deleteMessage(activity_tool, m) else: diff --git a/product/CMFActivity/Activity/RAMQueue.py b/product/CMFActivity/Activity/RAMQueue.py index 27f52a86fa..d2bb08aad6 100644 --- a/product/CMFActivity/Activity/RAMQueue.py +++ b/product/CMFActivity/Activity/RAMQueue.py @@ -26,7 +26,7 @@ # ############################################################################## -from Products.CMFActivity.ActivityTool import registerActivity +from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_EXECUTED from Queue import Queue, VALID try: @@ -70,7 +70,7 @@ class RAMQueue(Queue): get_transaction().commit() # Start a new transaction return 0 # Keep on ticking activity_tool.invoke(m) - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) get_transaction().commit() # Start a new transaction return 0 # Keep on ticking @@ -117,7 +117,7 @@ class RAMQueue(Queue): else: if invoke: activity_tool.invoke(m) - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: activity_tool.unregisterMessage(self, m) else: activity_tool.unregisterMessage(self, m) @@ -130,7 +130,7 @@ class RAMQueue(Queue): else: if invoke: activity_tool.invoke(m) - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: self.deleteMessage(activity_tool, m) # Only delete if no error happens else: self.deleteMessage(activity_tool, m) diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 8732ca15b2..c6e3b3f8cc 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -26,7 +26,7 @@ # ############################################################################## -from Products.CMFActivity.ActivityTool import registerActivity +from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \ abortTransactionSynchronously from RAMDict import RAMDict @@ -344,9 +344,10 @@ class SQLDict(RAMDict, SQLBase): make_available_uid_list = [] message_with_active_process_list = [] notify_user_list = [] - something_failed = (len([x for x in message_uid_priority_list if not x[1].is_executed]) != 0) + non_executable_message_list = [] + something_failed = (len([x for x in message_uid_priority_list if x[1].is_executed == MESSAGE_NOT_EXECUTED]) != 0) for uid, m, priority in message_uid_priority_list: - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: if something_failed: make_available_uid_list.append(uid) make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, [])) @@ -357,7 +358,7 @@ class SQLDict(RAMDict, SQLBase): # XXX: Bug here: Even if a duplicate message has an active_process, # it won't be called on the duplicate. message_with_active_process_list.append(m) - else: + elif m.is_executed == MESSAGE_NOT_EXECUTED: # Should duplicate messages follow strictly the original message, or # should they be just made available again ? make_available_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, [])) @@ -376,6 +377,11 @@ class SQLDict(RAMDict, SQLBase): except: LOG('SQLDict', WARNING, 'Failed to increase priority of %r' % (uid, ), error=sys.exc_info()) delay_uid_list.append(uid) + else: + # Internal CMFActivity error: the message can not be executed because + # something is missing (context object cannot be found, method cannot + # be accessed on object). + non_executable_message_list.append(uid) if len(deletable_uid_list): try: self._retryOnLockError(activity_tool.SQLDict_delMessage, kw={'uid': deletable_uid_list}) @@ -396,6 +402,11 @@ class SQLDict(RAMDict, SQLBase): processing_node=INVOKE_ERROR_STATE) except: LOG('SQLDict', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info()) + if len(non_executable_message_list): + try: + activity_tool.SQLDict_assignMessage(uid=non_executable_message_list, processing_node=VALIDATE_ERROR_STATE) + except: + LOG('SQLDict', ERROR, 'Failed to set message to invalid path state for %r' % (non_executable_message_list, ), error=sys.exc_info()) if len(make_available_uid_list): try: makeMessageListAvailable(make_available_uid_list) @@ -472,7 +483,7 @@ class SQLDict(RAMDict, SQLBase): else: LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list)) # Abort if something failed. - if len([x for x in message_uid_priority_list if not x[1].is_executed]) != 0: + if len([x for x in message_uid_priority_list if x[1].is_executed == MESSAGE_NOT_EXECUTED]) != 0: endTransaction = abortTransactionSynchronously else: endTransaction = get_transaction().commit @@ -489,7 +500,7 @@ class SQLDict(RAMDict, SQLBase): LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.') raise for x in message_uid_priority_list: - x[1].is_executed = 0 + x[1].is_executed = MESSAGE_NOT_EXECUTED failed_message_uid_list = [x[0] for x in message_uid_priority_list] try: makeMessageListAvailable(failed_message_uid_list, uid_to_duplicate_uid_list_dict) @@ -541,7 +552,7 @@ class SQLDict(RAMDict, SQLBase): validate_value = m.validate(self, activity_tool) if validate_value is VALID: activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? - if not m.is_executed: # Make sure message could be invoked + if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (m.method_id , path)) @@ -572,7 +583,7 @@ class SQLDict(RAMDict, SQLBase): if validate_value is VALID: activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? # LOG('SQLDict.flush m.is_executed',0,m.is_executed) - if not m.is_executed: # Make sure message could be invoked + if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (m.method_id , path)) diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index dda0d2a604..d222636263 100644 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -26,7 +26,7 @@ # ############################################################################## -from Products.CMFActivity.ActivityTool import registerActivity +from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED from RAMQueue import RAMQueue from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \ abortTransactionSynchronously @@ -197,12 +197,13 @@ class SQLQueue(RAMQueue, SQLBase): final_error_uid_list = [] message_with_active_process_list = [] notify_user_list = [] + non_executable_message_list = [] for uid, m, priority in message_uid_priority_list: - if m.is_executed: + if m.is_executed == MESSAGE_EXECUTED: deletable_uid_list.append(uid) if m.active_process: message_with_active_process_list.append(m) - else: + elif m.is_executed == MESSAGE_NOT_EXECUTED: if type(m.exc_type) is ClassType and \ issubclass(m.exc_type, ConflictError): delay_uid_list.append(uid) @@ -224,6 +225,11 @@ class SQLQueue(RAMQueue, SQLBase): LOG('SQLQueue', ERROR, 'Failed to unreserve %r' % (uid, ), error=sys.exc_info()) else: LOG('SQLQueue', TRACE, 'Freed message %r' % (uid, )) + else: + # Internal CMFActivity error: the message can not be executed because + # something is missing (context object cannot be found, method cannot + # be accessed on object). + non_executable_message_list.append(uid) if len(deletable_uid_list): try: self._retryOnLockError(activity_tool.SQLQueue_delMessage, kw={'uid': deletable_uid_list}) @@ -249,6 +255,12 @@ class SQLQueue(RAMQueue, SQLBase): processing_node=INVOKE_ERROR_STATE) except: LOG('SQLQueue', ERROR, 'Failed to set message to error state for %r' % (final_error_uid_list, ), error=sys.exc_info()) + if len(non_executable_message_list): + try: + activity_tool.SQLQueue_assignMessage(uid=non_executable_message_list, + processing_node=VALIDATE_ERROR_STATE) + except: + LOG('SQLQueue', ERROR, 'Failed to set message to invalid path state for %r' % (final_error_uid_list, ), error=sys.exc_info()) try: for m in notify_user_list: m.notifyUser(activity_tool) @@ -287,7 +299,7 @@ class SQLQueue(RAMQueue, SQLBase): # Try to invoke try: activity_tool.invoke(value[1]) - if value[1].is_executed: + if value[1].is_executed != MESSAGE_NOT_EXECUTED: # Commit so that if a message raises it doesn't causes previous # successfull messages to be rolled back. This commit might fail, # so it is protected the same way as activity execution by the @@ -307,7 +319,7 @@ class SQLQueue(RAMQueue, SQLBase): # We must make sure that the message is not set as executed. # It is possible that the message is executed but the commit # of the transaction fails - value[1].is_executed = 0 + value[1].is_executed = MESSAGE_NOT_EXECUTED try: makeMessageListAvailable([value[0]]) except: @@ -368,7 +380,7 @@ class SQLQueue(RAMQueue, SQLBase): validate_value = m.validate(self, activity_tool) if validate_value is VALID: activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? - if not m.is_executed: # Make sure message could be invoked + if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (m.method_id , path)) @@ -391,7 +403,7 @@ class SQLQueue(RAMQueue, SQLBase): validate_value = m.validate(self, activity_tool) if validate_value is VALID: activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? - if not m.is_executed: # Make sure message could be invoked + if m.is_executed != MESSAGE_EXECUTED: # Make sure message could be invoked # The message no longer exists raise ActivityFlushError, ( 'Could not evaluate %s on %s' % (method_id , path)) diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 7cd0a3648e..715928e03c 100644 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -104,6 +104,10 @@ def registerActivity(activity): activity_instance = activity() activity_dict[activity.__name__] = activity_instance +MESSAGE_NOT_EXECUTED = 0 +MESSAGE_EXECUTED = 1 +MESSAGE_NOT_EXECUTABLE = 2 + class Message: """Activity Message Class. @@ -128,7 +132,7 @@ class Message: self.method_id = method_id self.args = args self.kw = kw - self.is_executed = 0 + self.is_executed = MESSAGE_NOT_EXECUTED self.exc_type = None self.exc_value = None self.traceback = None @@ -206,30 +210,41 @@ class Message: def __call__(self, activity_tool): try: obj = self.getObject(activity_tool) - old_security_manager = getSecurityManager() - # Change user if required (TO BE DONE) - # We will change the user only in order to execute this method - user = self.changeUser(self.user_name, activity_tool) + except KeyError: + self.is_executed = MESSAGE_NOT_EXECUTABLE + else: try: - result = getattr(obj, self.method_id)(*self.args, **self.kw) - finally: - setSecurityManager(old_security_manager) - - self.activateResult(activity_tool, result, obj) - self.is_executed = 1 - except: - self.is_executed = 0 - exc_info = sys.exc_info() - self.exc_type = exc_info[0] - self.exc_value = str(exc_info[1]) - self.traceback = ''.join(ExceptionFormatter.format_exception( - *exc_info)) - LOG('ActivityTool', WARNING, - 'Could not call method %s on object %s' % ( - self.method_id, self.object_path), error=exc_info) - # push the error in ZODB error_log - if getattr(activity_tool, 'error_log', None) is not None: - activity_tool.error_log.raising(exc_info) + old_security_manager = getSecurityManager() + # Change user if required (TO BE DONE) + # We will change the user only in order to execute this method + user = self.changeUser(self.user_name, activity_tool) + # XXX: There is no check to see if user is allowed to access + # that method ! + method = getattr(obj, self.method_id, None) + try: + if method is None: + self.is_executed = MESSAGE_NOT_EXECUTABLE + else: + result = method(*self.args, **self.kw) + finally: + setSecurityManager(old_security_manager) + + if method is not None: + self.activateResult(activity_tool, result, obj) + self.is_executed = MESSAGE_EXECUTED + except: + self.is_executed = MESSAGE_NOT_EXECUTED + exc_info = sys.exc_info() + self.exc_type = exc_info[0] + self.exc_value = str(exc_info[1]) + self.traceback = ''.join(ExceptionFormatter.format_exception( + *exc_info)) + LOG('ActivityTool', WARNING, + 'Could not call method %s on object %s' % ( + self.method_id, self.object_path), error=exc_info) + # push the error in ZODB error_log + if getattr(activity_tool, 'error_log', None) is not None: + activity_tool.error_log.raising(exc_info) def validate(self, activity, activity_tool, check_order_validation=1): return activity.validate(activity_tool, self, @@ -846,13 +861,17 @@ class ActivityTool (Folder, UniqueObject): expanded_object_list = [] new_message_list = [] path_dict = {} - # Filter the list of messages. If an object is not available, ignore such a message. + # Filter the list of messages. If an object is not available, mark its message as non-executable. # In addition, expand an object if necessary, and make sure that no duplication happens. for m in message_list: # alternate method is used to segregate objects which cannot be grouped. alternate_method_id = m.activity_kw.get('alternate_method_id') try: obj = m.getObject(self) + except KeyError: + m.is_executed = MESSAGE_NOT_EXECUTABLE + continue + try: i = len(new_message_list) # This is an index of this message in new_message_list. if m.hasExpandMethod(): for subobj in m.getObjectList(self): @@ -890,7 +909,7 @@ class ActivityTool (Folder, UniqueObject): object_list.append(obj) new_message_list.append(m) except: - m.is_executed = 0 + m.is_executed = MESSAGE_NOT_EXECUTED exc_info = sys.exc_info() m.exc_type = exc_info[0] m.exc_value = str(exc_info[1]) @@ -917,7 +936,7 @@ class ActivityTool (Folder, UniqueObject): traceback = ''.join(ExceptionFormatter.format_exception( *exc_info)) for m in new_message_list: - m.is_executed = 0 + m.is_executed = MESSAGE_NOT_EXECUTED m.exc_type = exc_type m.exc_value = exc_value m.traceback = traceback @@ -937,16 +956,16 @@ class ActivityTool (Folder, UniqueObject): object = object_list[i] m = new_message_list[i] if i in failed_message_dict: - m.is_executed = 0 + m.is_executed = MESSAGE_NOT_EXECUTED LOG('ActivityTool', WARNING, 'the method %s partially failed on object %s' % (m.method_id, m.object_path,)) else: try: m.activateResult(self, result, object) - m.is_executed = 1 + m.is_executed = MESSAGE_EXECUTED except: - m.is_executed = 0 + m.is_executed = MESSAGE_NOT_EXECUTED m.exc_type = sys.exc_info()[0] LOG('ActivityTool', WARNING, 'Could not call method %s on object %s' % ( diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index c695cf0c78..69e2cc8f80 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -2826,6 +2826,93 @@ class TestCMFActivity(ERP5TypeTestCase): finally: delattr(Organisation, 'checkAbsoluteUrl') + def CheckMissingActivityContextObject(self, activity): + """ + Check that a message whose context has ben deleted goes to -3 + processing_node. + This must happen on first message execution, without any delay. + """ + readMessageList = getattr(self.getPortalObject(), '%s_readMessageList' % (activity, )) + activity_tool = self.getActivityTool() + container = self.getPortal().organisation_module + organisation = container.newContent(portal_type='Organisation') + get_transaction().commit() + self.tic() + organisation.activate(activity=activity).getTitle() + get_transaction().commit() + self.assertEqual(len(activity_tool.getMessageList()), 1) + # Here, we delete the subobject using most low-level method, to avoid + # pending activity to be removed. + organisation_id = organisation.id + container._delOb(organisation_id) + del organisation # Avoid keeping a reference to a deleted object. + get_transaction().commit() + self.assertEqual(getattr(container, organisation_id, None), None) + self.assertEqual(len(activity_tool.getMessageList()), 1) + activity_tool.distribute() + self.assertEquals(len(readMessageList(processing_node=-3, + include_processing=1)), 0) + activity_tool.tic() + self.assertEquals(len(readMessageList(processing_node=-3, + include_processing=1)), 1) + + def test_109_checkMissingActivityContextObjectSQLDict(self, quiet=0, + run=run_all_test): + if not run: return + if not quiet: + message = '\nCheck missing activity context object (SQLDict)' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + self.CheckMissingActivityContextObject('SQLDict') + + def test_110_checkMissingActivityContextObjectSQLQueue(self, quiet=0, + run=run_all_test): + if not run: return + if not quiet: + message = '\nCheck missing activity context object (SQLQueue)' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + self.CheckMissingActivityContextObject('SQLQueue') + + def test_111_checkMissingActivityContextObjectSQLDict(self, quiet=0, + run=run_all_test): + """ + This is similar to tst 108, but here the object will be missing for an + activity with a group_method_id. + """ + if not run: return + if not quiet: + message = '\nCheck missing activity context object with ' \ + 'group_method_id (SQLDict)' + ZopeTestCase._print(message) + LOG('Testing... ',0,message) + readMessageList = self.getPortalObject().SQLDict_readMessageList + activity_tool = self.getActivityTool() + container = self.getPortalObject().organisation_module + organisation = container.newContent(portal_type='Organisation') + organisation_2 = container.newContent(portal_type='Organisation') + get_transaction().commit() + self.tic() + organisation.reindexObject() + organisation_2.reindexObject() + get_transaction().commit() + self.assertEqual(len(activity_tool.getMessageList()), 2) + # Here, we delete the subobject using most low-level method, to avoid + # pending activity to be removed. + organisation_id = organisation.id + container._delOb(organisation_id) + del organisation # Avoid keeping a reference to a deleted object. + get_transaction().commit() + self.assertEqual(getattr(container, organisation_id, None), None) + self.assertEqual(len(activity_tool.getMessageList()), 2) + activity_tool.distribute() + self.assertEquals(len(readMessageList(processing_node=-3, + include_processing=1)), 0) + activity_tool.tic() + self.assertEquals(len(readMessageList(processing_node=-3, + include_processing=1)), 1) + # The message excuted on "organisation_2" must have succeeded. + self.assertEqual(len(activity_tool.getMessageList()), 1) def test_suite(): suite = unittest.TestSuite() -- 2.30.9