diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 9a88305518a57d7d536ff7e93e71c9d8abe89493..ab4064e62656976f0e1be948b505e26cbd3a3e46 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -612,6 +612,7 @@ class SQLDict(RAMDict, SQLBase): now_date = self.getNow(activity_tool) result = readMessageList(path=None, method_id=None, processing_node=-1, to_date=now_date, include_processing=0, offset=offset, count=READ_MESSAGE_LIMIT) + validated_count = 0 while len(result) and validated_count < MAX_VALIDATED_LIMIT: get_transaction().commit() @@ -623,6 +624,51 @@ class SQLDict(RAMDict, SQLBase): order_validation_text = line.order_validation_text) self.getExecutableMessageList(activity_tool, message, message_dict, validation_text_dict, now_date=now_date) + + if message_dict: + message_unique_set = set() + deletable_uid_list = [] + # remove duplicates + # SQLDict considers object_path, method_id, tag to unify activities, + # but ignores method arguments. They are outside of semantics. + for key in message_dict.keys(): + message = message_dict[key] + unique_key = (tuple(message.object_path), message.method_id, + message.activity_kw.get('tag'), + message.activity_kw.get('group_id'), + ) + if unique_key in message_unique_set: + deletable_uid_list.append(message.uid) + del message_dict[message.uid] + else: + message_unique_set.add(unique_key) + # don't let through if there is the same serialization tag in the + # message dict. if there is the same serialization tag, only one can + # be validated and others must wait. + # but messages with group_method_id are exceptions. serialization_tag + # does not stop simultaneous validation. because such messages will + # be processed together at once. + serialization_tag_set = set() + serialization_tag_group_method_id_dict = {} + for key in message_dict.keys(): + message = message_dict[key] + # serialize messages with serialization_tag. + serialization_tag = message.activity_kw.get('serialization_tag') + group_method_id = message.activity_kw.get('group_method_id') + if serialization_tag is not None: + if serialization_tag in serialization_tag_set: + if group_method_id is not None: + if serialization_tag_group_method_id_dict[serialization_tag]!=group_method_id: + del message_dict[message.uid] + else: + del message_dict[message.uid] + else: + serialization_tag_set.add(serialization_tag) + if group_method_id is not None: + serialization_tag_group_method_id_dict[serialization_tag] = group_method_id + if deletable_uid_list: + activity_tool.SQLDict_delMessage(uid=deletable_uid_list) + distributable_count = len(message_dict) if distributable_count: activity_tool.SQLDict_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()]) diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index 277016ed499a99c4395fc24a224fb164c151e723..40ca8ac1c9abf7b9a3c6372e8fb768fb5033f5b7 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -2819,7 +2819,13 @@ class TestCMFActivity(ERP5TypeTestCase): self.assertEqual(len(result), 2) activity_tool.distribute() result = activity_tool.getMessageList() - self.assertEqual(len([x for x in result if x.processing_node == 0]), 2) + # If activity is SQLDict, serialization tag prevents validating the same + # serialization tagged messages simultaneously. + # If activity is SQLQueue, this does not happen. + if activity=='SQLDict': + self.assertEqual(len([x for x in result if x.processing_node == 0]), 1) + else: + self.assertEqual(len([x for x in result if x.processing_node == 0]), 2) self.tic() result = activity_tool.getMessageList() self.assertEqual(len(result), 0)