Commit 4b35b558 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: skip distribution for message with no dependency

parent a6583a23
......@@ -62,14 +62,21 @@ class SQLDict(SQLBase):
active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
dumped_message_list = map(self.dumpMessage, message_list)
date_list = [m.activity_kw.get('at_date') for m in message_list]
group_method_id_list = [m.getGroupId() for m in message_list]
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
order_validation_text_list = map(self.getOrderValidationText,
message_list)
order_validation_text_list = []
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
# BBB: 'order_validation_text' SQL column is now useless.
# If we remove it, 'message' & 'message_queue' can have the same
# schema, and much code can be merged into SQLBase.
order_validation_text_list.append(x)
processing_node_list.append(0 if x == 'none' else -1)
dumped_message_list = map(self.dumpMessage, message_list)
# The uid_list also is store in the ZODB
uid_list = activity_tool.getPortalObject().portal_ids.generateNewIdList(
id_generator='uid', id_group='portal_activity',
......@@ -85,7 +92,7 @@ class SQLDict(SQLBase):
group_method_id_list=group_method_id_list,
tag_list=tag_list,
serialization_tag_list=serialization_tag_list,
processing_node_list=None,
processing_node_list=processing_node_list,
order_validation_text_list=order_validation_text_list)
def generateMessageUID(self, m):
......@@ -225,10 +232,10 @@ class SQLDict(SQLBase):
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
if len(result):
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
order_validation_text=None)
if len(uid_list)>0:
if result:
uid_list = activity_tool.SQLDict_readUidList(
path=path, method_id=method_id)
if uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table,
uid=[x.uid for x in uid_list])
......@@ -260,8 +267,9 @@ class SQLDict(SQLBase):
validation_text_dict = {'none': 1}
message_dict = {}
for line in result:
message = self.loadMessage(line.message, uid=line.uid, line=line,
order_validation_text=line.order_validation_text)
message = self.loadMessage(line.message, uid=line.uid, line=line)
if not hasattr(message, 'order_validation_text'): # BBB
m.order_validation_text = line.order_validation_text
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date)
......@@ -341,8 +349,9 @@ class SQLDict(SQLBase):
line=line,
uid=line.uid,
date=line.date,
processing_node=line.processing_node,
order_validation_text=line.order_validation_text)
processing_node=line.processing_node)
if not hasattr(m, 'order_validation_text'): # BBB
m.order_validation_text = line.order_validation_text
message_list.append(m)
return message_list
else:
......
......@@ -70,6 +70,10 @@ class SQLQueue(SQLBase):
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1)
dumped_message_list = map(self.dumpMessage, message_list)
activity_tool.SQLQueue_writeMessageList(
uid_list=uid_list,
......@@ -81,7 +85,7 @@ class SQLQueue(SQLBase):
group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=None,
processing_node_list=processing_node_list,
serialization_tag_list=serialization_tag_list)
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
......@@ -206,6 +210,7 @@ class SQLQueue(SQLBase):
message_dict = {}
for line in result:
message = self.loadMessage(line.message, uid=line.uid, line=line)
if not hasattr(message, 'order_validation_text'): # BBB
message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict, now_date=now_date)
......@@ -263,6 +268,7 @@ class SQLQueue(SQLBase):
uid=line.uid,
date=line.date,
processing_node=line.processing_node)
if not hasattr(m, 'order_validation_text'): # BBB
m.order_validation_text = self.getOrderValidationText(m)
message_list.append(m)
return message_list
......
......@@ -10,7 +10,6 @@ class_file:
<params>
method_id
path
order_validation_text
</params>
SELECT uid FROM
message
......@@ -18,4 +17,3 @@ WHERE
processing = 0
<dtml-if expr="method_id is not None"> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
<dtml-if expr="path is not None"> AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if expr="order_validation_text is not None"> AND order_validation_text = <dtml-sqlvar order_validation_text type="string"> </dtml-if>
......@@ -30,7 +30,7 @@ VALUES
<dtml-sqlvar expr="active_process_uid_list[loop_item]" type="int" optional>,
<dtml-if expr="date_list is not None"><dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime"><dtml-else>UTC_TIMESTAMP()</dtml-if><dtml-else>UTC_TIMESTAMP()</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-if expr="processing_node_list is not None"><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
<dtml-sqlvar expr="processing_node_list[loop_item]" type="int">,
0,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
......
......@@ -30,7 +30,7 @@ VALUES
<dtml-sqlvar expr="active_process_uid_list[loop_item]" type="int" optional>,
<dtml-if expr="date_list is not None"><dtml-if expr="date_list[loop_item] is not None"><dtml-sqlvar expr="date_list[loop_item]" type="datetime"><dtml-else>UTC_TIMESTAMP()</dtml-if><dtml-else>UTC_TIMESTAMP()</dtml-if>,
<dtml-sqlvar expr="method_id_list[loop_item]" type="string">,
<dtml-if expr="processing_node_list is not None"><dtml-sqlvar expr="processing_node_list[loop_item]" type="int"><dtml-else>-1</dtml-if>,
<dtml-sqlvar expr="processing_node_list[loop_item]" type="int">,
0,
<dtml-sqlvar expr="priority_list[loop_item]" type="int">,
<dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">,
......
......@@ -181,7 +181,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Needed so that the message are commited into the queue
self.commit()
self.assertEquals(self.title1,organisation.getTitle())
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(self.title2,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
......@@ -211,7 +210,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),1)
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(1,organisation.getFoobar())
message_list = portal.portal_activities.getMessageList()
......@@ -264,7 +262,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
organisation.activate(activity=activity).DeferredSetTitle(self.title2)
organisation.flushActivity(invoke=1)
self.commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.commit()
message_list = portal.portal_activities.getMessageList()
......@@ -347,9 +344,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.commit()
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
self.assertEquals(organisation.getTitle(),self.title1)
......@@ -361,7 +355,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
"""
portal = self.getPortal()
def crashThisActivity(self):
self.IWillCrach()
self.IWillCrash()
organisation = portal.organisation._getOb(self.company_id)
Organisation.crashThisActivity = crashThisActivity
organisation.activate(activity=activity).crashThisActivity()
......@@ -370,7 +364,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list = portal.portal_activities.getMessageList()
LOG('Before MessageWithErrorOnActivityFails, message_list',0,[x.__dict__ for x in message_list])
self.assertEquals(len(message_list),1)
portal.portal_activities.distribute()
portal.portal_activities.tic()
# XXX HERE WE SHOULD USE TIME SHIFT IN ORDER TO SIMULATE MULTIPLE TICS
# Test if there is still the message after it crashed
......@@ -396,7 +389,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.commit()
self.assertEquals(self.title1,organisation.getTitle())
self.assertRaises(ActivityPendingError,organisation.edit,id=self.company_id2)
portal.portal_activities.distribute()
portal.portal_activities.tic()
def TryActiveProcess(self, activity):
......@@ -413,8 +405,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(self.title1,organisation.getTitle())
result = active_process.getResultList()[0]
self.assertEquals(result.method_id , 'getTitle')
......@@ -1602,7 +1592,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),5)
portal.portal_activities.distribute()
portal.portal_activities.tic()
expected = dict(SQLDict=1, SQLQueue=5)[activity]
self.assertEquals(expected, organisation.getFoobar())
......@@ -1615,7 +1604,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),5)
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(expected * 2, organisation.getFoobar())
......@@ -1635,7 +1623,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),20)
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(dict(SQLDict=11, SQLQueue=60)[activity],
organisation.getFoobar())
......@@ -2352,29 +2339,27 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message = '\nCheck similarities are not deleted before execution of original message (SQLDict)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation')
self.tic()
activity_tool = self.getActivityTool()
check_result_dict = {}
def checkActivityCount(self, other_tag):
if len(check_result_dict) == 0:
check_result_dict['done'] = activity_tool.countMessage(tag=other_tag)
marker = []
def doSomething(self, other_tag):
marker.append(self.countMessage(tag=other_tag))
activity_tool.__class__.doSomething = doSomething
try:
Organisation.checkActivityCount = checkActivityCount
# Adds two similar but not the same activities.
organisation.activate(activity='SQLDict', tag='a').checkActivityCount(other_tag='b')
organisation.activate(activity='SQLDict', tag='b').checkActivityCount(other_tag='a')
activity_tool.activate(activity='SQLDict', after_tag='foo',
tag='a').doSomething(other_tag='b')
activity_tool.activate(activity='SQLDict', after_tag='bar',
tag='b').doSomething(other_tag='a')
self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.tic() # make sure distribution phase was not skipped
activity_tool.distribute()
# after distribute, similarities are still there.
self.assertEqual(len(activity_tool.getMessageList()), 2)
self.tic()
activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 0)
self.assertEqual(len(check_result_dict), 1)
self.assertEqual(check_result_dict['done'], 1)
self.assertEqual(marker, [1])
finally:
delattr(Organisation, 'checkActivityCount')
del activity_tool.__class__.doSomething
def test_102_2_CheckSQLDictDeleteDuplicatesBeforeExecution(self, quiet=0, run=run_all_test):
"""
......@@ -2386,32 +2371,30 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message = '\nCheck duplicates are deleted before execution of original message (SQLDict)'
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
organisation = self.getPortal().organisation_module.newContent(portal_type='Organisation')
self.tic()
activity_tool = self.getActivityTool()
check_result_dict = {}
def checkActivityCount(self, other_tag):
if len(check_result_dict) == 0:
check_result_dict['done'] = activity_tool.countMessage(tag=other_tag)
marker = []
def doSomething(self, other_tag):
marker.append(self.countMessage(tag=other_tag))
activity_tool.__class__.doSomething = doSomething
try:
Organisation.checkActivityCount = checkActivityCount
# Adds two same activities.
organisation.activate(activity='SQLDict', tag='a', priority=2).checkActivityCount(other_tag='a')
activity_tool.activate(activity='SQLDict', after_tag='foo', priority=2,
tag='a').doSomething(other_tag='a')
self.commit()
uid1, = [x.uid for x in activity_tool.getMessageList()]
organisation.activate(activity='SQLDict', tag='a', priority=1).checkActivityCount(other_tag='a')
activity_tool.activate(activity='SQLDict', after_tag='bar', priority=1,
tag='a').doSomething(other_tag='a')
self.commit()
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute()
# After distribute, duplicate is deleted.
uid2, = [x.uid for x in activity_tool.getMessageList()]
self.assertNotEqual(uid1, uid2)
self.tic()
activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 0)
self.assertEqual(len(check_result_dict), 1)
self.assertEqual(check_result_dict['done'], 1)
self.assertEqual(marker, [1])
finally:
delattr(Organisation, 'checkActivityCount')
del activity_tool.__class__.doSomething
def test_102_3_CheckSQLDictDistributeWithSerializationTagAndGroupMethodId(
self, quiet=0):
......@@ -3187,7 +3170,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
for activity in 'SQLDict', 'SQLQueue':
activity_tool.activate(activity=activity).doSomething(activity)
self.commit()
activity_tool.distribute()
# Make first commit in dequeueMessage raise
registerFailingTransactionManager()
self.assertRaises(CommitFailed, activity_tool.tic)
......@@ -3438,7 +3420,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.__class__.doSomething = doSomething
activity_tool.activate(activity='SQLQueue').doSomething()
self.commit()
activity_tool.distribute()
activity_tool.tic()
message_list = activity_tool.getMessageList()
self.assertEquals(['doSomething'],[x.method_id for x in message_list])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment