Commit 47a925b9 authored by Yusei Tahara's avatar Yusei Tahara

Fix a bug. serialization tag guarantees that tagged messages are processed...

Fix a bug. serialization tag guarantees that tagged messages are processed serially and not parallelly.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@28706 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 66f65bd7
...@@ -612,6 +612,7 @@ class SQLDict(RAMDict, SQLBase): ...@@ -612,6 +612,7 @@ class SQLDict(RAMDict, SQLBase):
now_date = self.getNow(activity_tool) now_date = self.getNow(activity_tool)
result = readMessageList(path=None, method_id=None, processing_node=-1, result = readMessageList(path=None, method_id=None, processing_node=-1,
to_date=now_date, include_processing=0, offset=offset, count=READ_MESSAGE_LIMIT) to_date=now_date, include_processing=0, offset=offset, count=READ_MESSAGE_LIMIT)
validated_count = 0 validated_count = 0
while len(result) and validated_count < MAX_VALIDATED_LIMIT: while len(result) and validated_count < MAX_VALIDATED_LIMIT:
get_transaction().commit() get_transaction().commit()
...@@ -623,6 +624,51 @@ class SQLDict(RAMDict, SQLBase): ...@@ -623,6 +624,51 @@ class SQLDict(RAMDict, SQLBase):
order_validation_text = line.order_validation_text) order_validation_text = line.order_validation_text)
self.getExecutableMessageList(activity_tool, message, message_dict, self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date) validation_text_dict, now_date=now_date)
if message_dict:
message_unique_set = set()
deletable_uid_list = []
# remove duplicates
# SQLDict considers object_path, method_id, tag to unify activities,
# but ignores method arguments. They are outside of semantics.
for key in message_dict.keys():
message = message_dict[key]
unique_key = (tuple(message.object_path), message.method_id,
message.activity_kw.get('tag'),
message.activity_kw.get('group_id'),
)
if unique_key in message_unique_set:
deletable_uid_list.append(message.uid)
del message_dict[message.uid]
else:
message_unique_set.add(unique_key)
# don't let through if there is the same serialization tag in the
# message dict. if there is the same serialization tag, only one can
# be validated and others must wait.
# but messages with group_method_id are exceptions. serialization_tag
# does not stop simultaneous validation. because such messages will
# be processed together at once.
serialization_tag_set = set()
serialization_tag_group_method_id_dict = {}
for key in message_dict.keys():
message = message_dict[key]
# serialize messages with serialization_tag.
serialization_tag = message.activity_kw.get('serialization_tag')
group_method_id = message.activity_kw.get('group_method_id')
if serialization_tag is not None:
if serialization_tag in serialization_tag_set:
if group_method_id is not None:
if serialization_tag_group_method_id_dict[serialization_tag]!=group_method_id:
del message_dict[message.uid]
else:
del message_dict[message.uid]
else:
serialization_tag_set.add(serialization_tag)
if group_method_id is not None:
serialization_tag_group_method_id_dict[serialization_tag] = group_method_id
if deletable_uid_list:
activity_tool.SQLDict_delMessage(uid=deletable_uid_list)
distributable_count = len(message_dict) distributable_count = len(message_dict)
if distributable_count: if distributable_count:
activity_tool.SQLDict_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()]) activity_tool.SQLDict_assignMessage(processing_node=0, uid=[message.uid for message in message_dict.itervalues()])
......
...@@ -2819,6 +2819,12 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -2819,6 +2819,12 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEqual(len(result), 2) self.assertEqual(len(result), 2)
activity_tool.distribute() activity_tool.distribute()
result = activity_tool.getMessageList() result = activity_tool.getMessageList()
# If activity is SQLDict, serialization tag prevents validating the same
# serialization tagged messages simultaneously.
# If activity is SQLQueue, this does not happen.
if activity=='SQLDict':
self.assertEqual(len([x for x in result if x.processing_node == 0]), 1)
else:
self.assertEqual(len([x for x in result if x.processing_node == 0]), 2) self.assertEqual(len([x for x in result if x.processing_node == 0]), 2)
self.tic() self.tic()
result = activity_tool.getMessageList() result = activity_tool.getMessageList()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment