Commit e7b9a2ed authored by Sebastien Robin's avatar Sebastien Robin

added new unit tests in order to test Active Process

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@1192 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent aa86e15f
No related merge requests found
......@@ -57,14 +57,7 @@ class TestCMFActivity(ERP5TypeTestCase):
company_id = 'Nexedi'
title1 = 'title1'
title2 = 'title2'
destination_company_stock = 'site/Stock_MP/Gravelines'
destination_company_group = 'group/Coramy'
destination_company_id = 'Coramy'
component_id = 'brick'
sales_order_id = '1'
purchase_order_id = '1'
quantity = 10
base_price = 0.7832
company_id2 = 'Coramy'
def getBusinessTemplateList(self):
"""
......@@ -114,6 +107,9 @@ class TestCMFActivity(ERP5TypeTestCase):
newSecurityManager(None, user)
def InvokeAndCancelActivity(self, activity):
"""
Simple test where we invoke and cancel an activity
"""
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
......@@ -142,6 +138,10 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(len(message_list),0)
def DeferedSetTitleActivity(self, activity):
"""
We check that the title is changed only after that
the activity was called
"""
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
......@@ -157,6 +157,10 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(len(message_list),0)
def CallOnceWithActivity(self, activity):
"""
With this test we can check if methods are called
only once (sometimes it was twice !!!)
"""
portal = self.getPortal()
def setFoobar(self):
if hasattr(self,'foobar'):
......@@ -195,6 +199,9 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(2,organisation.getFoobar())
def TryFlushActivity(self, activity):
"""
Check the method flush
"""
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
......@@ -206,6 +213,9 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(organisation.getTitle(),self.title2)
def TryActivateInsideFlush(self, activity):
"""
Create a new activity inside a flush action
"""
portal = self.getPortal()
def DeferredSetTitle(self,value):
self.activate(activity=activity).setTitle(value)
......@@ -224,6 +234,9 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(organisation.getTitle(),self.title2)
def TryTwoMethods(self, activity):
"""
Try several activities
"""
portal = self.getPortal()
def DeferredSetDescription(self,value):
self.setDescription(value)
......@@ -247,6 +260,9 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(organisation.getDescription(),self.title1)
def TryTwoMethodsAndFlushThem(self, activity):
"""
make sure flush works with several activities
"""
portal = self.getPortal()
def DeferredSetTitle(self,value):
self.activate(activity=activity).setTitle(value)
......@@ -271,6 +287,9 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(organisation.getDescription(),self.title1)
def TryActivateFlushActivateTic(self, activity,second=None,commit_sub=0):
"""
try to commit sub transactions
"""
portal = self.getPortal()
def DeferredSetTitle(self,value,commit_sub=0):
if commit_sub:
......@@ -302,6 +321,9 @@ class TestCMFActivity(ERP5TypeTestCase):
self.assertEquals(organisation.getDescription(),self.title1)
def TryMessageWithErrorOnActivity(self, activity):
"""
Make sure that message with errors are not deleted
"""
portal = self.getPortal()
def crashThisActivity(self):
self.IWillCrach()
......@@ -324,6 +346,81 @@ class TestCMFActivity(ERP5TypeTestCase):
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
def DeferedSetTitleWithRenamedObject(self, activity):
"""
make sure that an activity is flushed before we rename
the object
"""
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
self.assertEquals(self.title1,organisation.getTitle())
organisation.activate(activity=activity).setTitle(self.title2)
# Needed so that the message are commited into the queue
get_transaction().commit()
self.assertEquals(self.title1,organisation.getTitle())
organisation.edit(id=self.company_id2)
get_transaction().commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(self.title2,organisation.getTitle())
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
organisation.edit(id=self.company_id)
get_transaction().commit()
def TryActiveProcess(self, activity):
"""
Try to store the result inside an active process
"""
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
active_process = portal.portal_activities.newActiveProcess()
self.assertEquals(self.title1,organisation.getTitle())
organisation.activate(activity=activity,active_process=active_process).getTitle()
# Needed so that the message are commited into the queue
get_transaction().commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
self.assertEquals(self.title1,organisation.getTitle())
result = active_process.getResultList()[0]
self.assertEquals(result.method_id , 'getTitle')
self.assertEquals(result.result , self.title1)
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
def TryActiveProcessInsideActivity(self, activity):
"""
Try two levels avec active_process, we create one first
activity with an acitive process, then this new activity
uses another active process
"""
portal = self.getPortal()
organisation = portal.organisation._getOb(self.company_id)
organisation.setTitle(self.title1)
def Organisation_test(self):
active_process = self.portal_activities.newActiveProcess()
self.activate(active_process=active_process).getTitle()
return active_process
from Products.ERP5Type.Document.Organisation import Organisation
Organisation.Organisation_test = Organisation_test
active_process = portal.portal_activities.newActiveProcess()
organisation.activate(activity=activity,active_process=active_process).Organisation_test()
# Needed so that the message are commited into the queue
get_transaction().commit()
portal.portal_activities.distribute()
portal.portal_activities.tic()
portal.portal_activities.distribute()
portal.portal_activities.tic()
sub_active_process = active_process.getResultList()[0].result
LOG('TryActiveProcessInsideActivity, sub_active_process',0,sub_active_process)
#result = active_process.getResultList()[0]
self.assertEquals(sub_active_process.method_id , 'getTitle')
self.assertEquals(sub_active_process.result , self.title1)
message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0)
def test_01_DeferedSetTitleSQLDict(self, quiet=0, run=run_all_test):
# Test if we can add a complete sales order
if not run: return
......@@ -694,12 +791,86 @@ class TestCMFActivity(ERP5TypeTestCase):
LOG('Testing... ',0,message)
self.TryActivateFlushActivateTic('RAMQueue',commit_sub=1)
def test_42_TryRenameObjectWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Rename Object With SQL Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.DeferedSetTitleWithRenamedObject('SQLDict')
def test_43_TryRenameObjectWithSQLQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Rename Object With SQL Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.DeferedSetTitleWithRenamedObject('SQLQueue')
def test_44_TryRenameObjectWithRAMDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Rename Object With RAM Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.DeferedSetTitleWithRenamedObject('RAMDict')
def test_45_TryRenameObjectWithRAMQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Rename Object With RAM Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.DeferedSetTitleWithRenamedObject('RAMQueue')
def test_46_TryActiveProcessWithSQLDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Active Process With SQL Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActiveProcess('SQLDict')
def test_47_TryActiveProcessWithSQLQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Active Process With SQL Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActiveProcess('SQLQueue')
def test_48_TryActiveProcessWithRAMDict(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Active Process With RAM Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActiveProcess('RAMDict')
def test_49_TryActiveProcessWithRAMQueue(self, quiet=0, run=run_all_test):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Active Process With RAM Queue '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActiveProcess('RAMQueue')
def test_50_TryActiveProcessInsideActivityWithSQLDict(self, quiet=0, run=0):
# Test if we call methods only once
if not run: return
if not quiet:
message = '\nTry Active Process With SQL Dict '
ZopeTestCase._print(message)
LOG('Testing... ',0,message)
self.TryActiveProcessInsideActivity('SQLDict')
if __name__ == '__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment