Commit 001122d3 authored by Vincent Pelletier's avatar Vincent Pelletier

Add a method to count pending activities with a given tag. Unit test inclued.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@9392 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 7833d7e5
...@@ -560,13 +560,19 @@ class SQLDict(RAMDict): ...@@ -560,13 +560,19 @@ class SQLDict(RAMDict):
def _validate_after_tag(self, activity_tool, message, value): def _validate_after_tag(self, activity_tool, message, value):
# Count number of occurances of tag # Count number of occurances of tag
if type(value) is StringType: if self.countMessageWithTag(activity_tool, value) > 0:
value = [value]
result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, tag=value)
if result[0].uid_count > 0:
return INVALID_ORDER return INVALID_ORDER
return VALID return VALID
def countMessageWithTag(self, activity_tool, value):
"""
Return the number of message which match the given tag.
"""
if isinstance(value, StringType):
value = [value]
result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, tag=value)
return result[0].uid_count
def _validate_after_tag_and_method_id(self, activity_tool, message, value): def _validate_after_tag_and_method_id(self, activity_tool, message, value):
# Count number of occurances of tag and method_id # Count number of occurances of tag and method_id
if (type(value) != TupleType and type(value) != ListType) or len(value)<2: if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
......
...@@ -867,6 +867,19 @@ class ActivityTool (Folder, UniqueObject): ...@@ -867,6 +867,19 @@ class ActivityTool (Folder, UniqueObject):
LOG('getMessageList, could not get message from Activity:',0,activity) LOG('getMessageList, could not get message from Activity:',0,activity)
return message_list return message_list
security.declarePublic('countMessageWithTag')
def countMessageWithTag(self, value):
"""
Return the number of messages which match the given tag.
"""
message_count = 0
for activity in activity_list:
try:
message_count += activity.countMessageWithTag(self, value)
except AttributeError:
LOG('getMessageList, could not get message from Activity:', 0, activity)
return message_count
security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' ) security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
def newActiveProcess(self, **kw): def newActiveProcess(self, **kw):
from ActiveProcess import addActiveProcess from ActiveProcess import addActiveProcess
......
...@@ -626,6 +626,29 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -626,6 +626,29 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(o.getTitle(), 'foo') self.assertEquals(o.getTitle(), 'foo')
def CheckCountMessageWithTag(self, activity):
"""
Check countMessageWithTag function.
"""
portal = self.getPortal()
portal_activities = portal.portal_activities
organisation_module = self.getOrganisationModule()
if not organisation_module.hasContent(self.company_id):
organisation_module.newContent(id=self.company_id)
o = portal.organisation._getOb(self.company_id)
o.setTitle('?')
get_transaction().commit()
self.tic()
o.activate(tag = 'toto', activity = activity).setTitle('a')
get_transaction().commit()
self.assertEquals(o.getTitle(), '?')
self.assertEquals(portal_activities.countMessageWithTag('toto'), 1)
self.tic()
self.assertEquals(o.getTitle(), 'a')
self.assertEquals(portal_activities.countMessageWithTag('toto'), 0)
def test_01_DeferedSetTitleSQLDict(self, quiet=0, run=run_all_test): def test_01_DeferedSetTitleSQLDict(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order # Test if we can add a complete sales order
if not run: return if not run: return
...@@ -1305,6 +1328,28 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -1305,6 +1328,28 @@ class TestCMFActivity(ERP5TypeTestCase):
# restore notification # restore notification
Message.notifyUser = originalNotifyUser Message.notifyUser = originalNotifyUser
def test_66_TestCountMessageWithTagWithSQLDict(self, quiet=0, run=run_all_test):
"""
Test new countMessageWithTag function with SQLDict.
"""
if not run: return
if not quiet:
message = '\nCheck countMessageWithTag'
ZopeTestCase._print(message)
LOG('Testing... ', 0, message)
self.CheckCountMessageWithTag('SQLDict')
def test_67_TestCountMessageWithTagWithSQLQueue(self, quiet=0, run=run_all_test):
"""
Test new countMessageWithTag function with SQLQueue.
"""
if not run: return
if not quiet:
message = '\nCheck countMessageWithTag'
ZopeTestCase._print(message)
LOG('Testing... ', 0, message)
self.CheckCountMessageWithTag('SQLQueue')
if __name__ == '__main__': if __name__ == '__main__':
framework() framework()
else: else:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment