Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
erp5
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Tatuya Kamada
erp5
Commits
8455256c
Commit
8455256c
authored
Mar 01, 2018
by
Vincent Pelletier
Committed by
Hardik Juneja
Mar 06, 2018
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
testCMFActivity: Get rid quiet & run test method arguments
parent
bf597a3a
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
87 additions
and
498 deletions
+87
-498
product/CMFActivity/tests/testCMFActivity.py
product/CMFActivity/tests/testCMFActivity.py
+87
-498
No files found.
product/CMFActivity/tests/testCMFActivity.py
View file @
8455256c
...
...
@@ -69,7 +69,6 @@ def registerFailingTransactionManager(*args, **kw):
class
TestCMFActivity
(
ERP5TypeTestCase
,
LogInterceptor
):
run_all_test
=
1
# Different variables used for this test
company_id
=
'Nexedi'
title1
=
'title1'
...
...
@@ -122,7 +121,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
o1
=
organisation_module
.
newContent
(
id
=
self
.
company_id
)
self
.
tic
()
def
login
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
login
(
self
):
uf
=
self
.
getPortal
().
acl_users
uf
.
_doAddUser
(
'seb'
,
''
,
[
'Manager'
],
[])
uf
.
_doAddUser
(
'ERP5TypeTestCase'
,
''
,
[
'Manager'
],
[])
...
...
@@ -696,258 +695,118 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool
.
manageClearActivities
()
self
.
commit
()
def
test_01_DeferredSetTitleSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_01_DeferredSetTitleSQLDict
(
self
):
# Test if we can add a complete sales order
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test Deferred Set Title SQLDict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
DeferredSetTitleActivity
(
'SQLDict'
)
def
test_02_DeferredSetTitleSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_02_DeferredSetTitleSQLQueue
(
self
):
# Test if we can add a complete sales order
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test Deferred Set Title SQLQueue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
DeferredSetTitleActivity
(
'SQLQueue'
)
def
test_05_InvokeAndCancelSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_05_InvokeAndCancelSQLDict
(
self
):
# Test if we can add a complete sales order
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test Invoke And Cancel SQLDict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
InvokeAndCancelActivity
(
'SQLDict'
)
def
test_06_InvokeAndCancelSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_06_InvokeAndCancelSQLQueue
(
self
):
# Test if we can add a complete sales order
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test Invoke And Cancel SQLQueue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
InvokeAndCancelActivity
(
'SQLQueue'
)
def
test_09_CallOnceWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_09_CallOnceWithSQLDict
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Call Once With SQL Dict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
CallOnceWithActivity
(
'SQLDict'
)
def
test_10_CallOnceWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_10_CallOnceWithSQLQueue
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Call Once With SQL Queue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
CallOnceWithActivity
(
'SQLQueue'
)
def
test_13_TryMessageWithErrorOnSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_13_TryMessageWithErrorOnSQLDict
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Message With Error On SQL Dict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryMessageWithErrorOnActivity
(
'SQLDict'
)
def
test_14_TryMessageWithErrorOnSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_14_TryMessageWithErrorOnSQLQueue
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Message With Error On SQL Queue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryMessageWithErrorOnActivity
(
'SQLQueue'
)
def
test_17_TryFlushActivityWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_17_TryFlushActivityWithSQLDict
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Flush Activity With SQL Dict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryFlushActivity
(
'SQLDict'
)
def
test_18_TryFlushActivityWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_18_TryFlushActivityWithSQLQueue
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Flush Activity With SQL Queue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryFlushActivity
(
'SQLQueue'
)
def
test_21_TryActivateInsideFlushWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_21_TryActivateInsideFlushWithSQLDict
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Activate Inside Flush With SQL Dict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryActivateInsideFlush
(
'SQLDict'
)
def
test_22_TryActivateInsideFlushWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_22_TryActivateInsideFlushWithSQLQueue
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Activate Inside Flush With SQL Queue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryActivateInsideFlush
(
'SQLQueue'
)
def
test_25_TryTwoMethodsWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_25_TryTwoMethodsWithSQLDict
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Two Methods With SQL Dict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryTwoMethods
(
'SQLDict'
)
def
test_26_TryTwoMethodsWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_26_TryTwoMethodsWithSQLQueue
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Two Methods With SQL Queue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryTwoMethods
(
'SQLQueue'
)
def
test_29_TryTwoMethodsAndFlushThemWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_29_TryTwoMethodsAndFlushThemWithSQLDict
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Two Methods And Flush Them With SQL Dict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryTwoMethodsAndFlushThem
(
'SQLDict'
)
def
test_30_TryTwoMethodsAndFlushThemWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_30_TryTwoMethodsAndFlushThemWithSQLQueue
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Two Methods And Flush Them With SQL Queue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryTwoMethodsAndFlushThem
(
'SQLQueue'
)
def
test_33_TryActivateFlushActivateTicWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_33_TryActivateFlushActivateTicWithSQLDict
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Activate Flush Activate Tic With SQL Dict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryActivateFlushActivateTic
(
'SQLDict'
)
def
test_34_TryActivateFlushActivateTicWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_34_TryActivateFlushActivateTicWithSQLQueue
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Activate Flush Activate Tic With SQL Queue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryActivateFlushActivateTic
(
'SQLQueue'
)
def
test_37_TryActivateFlushActivateTicWithMultipleActivities
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_37_TryActivateFlushActivateTicWithMultipleActivities
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Activate Flush Activate Tic With MultipleActivities '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryActivateFlushActivateTic
(
'SQLQueue'
,
second
=
'SQLDict'
)
self
.
TryActivateFlushActivateTic
(
'SQLDict'
,
second
=
'SQLQueue'
)
def
test_38_TryCommitSubTransactionWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_38_TryCommitSubTransactionWithSQLDict
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Commit Sub Transaction With SQL Dict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryActivateFlushActivateTic
(
'SQLDict'
,
commit_sub
=
1
)
def
test_39_TryCommitSubTransactionWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_39_TryCommitSubTransactionWithSQLQueue
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Commit Sub Transaction With SQL Queue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryActivateFlushActivateTic
(
'SQLQueue'
,
commit_sub
=
1
)
def
test_42_TryRenameObjectWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_42_TryRenameObjectWithSQLDict
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Rename Object With SQL Dict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
DeferredSetTitleWithRenamedObject
(
'SQLDict'
)
def
test_43_TryRenameObjectWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_43_TryRenameObjectWithSQLQueue
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Rename Object With SQL Queue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
DeferredSetTitleWithRenamedObject
(
'SQLQueue'
)
def
test_46_TryActiveProcessWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_46_TryActiveProcessWithSQLDict
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Active Process With SQL Dict '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryActiveProcess
(
'SQLDict'
)
def
test_47_TryActiveProcessWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_47_TryActiveProcessWithSQLQueue
(
self
):
# Test if we call methods only once
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Active Process With SQL Queue '
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryActiveProcess
(
'SQLQueue'
)
def
test_54_TryAfterMethodIdWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_54_TryAfterMethodIdWithSQLDict
(
self
):
# Test if after_method_id can be used
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Active Method After Another Activate Method With SQLDict'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryMethodAfterMethod
(
'SQLDict'
)
def
test_55_TryAfterMethodIdWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_55_TryAfterMethodIdWithSQLQueue
(
self
):
# Test if after_method_id can be used
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Active Method After Another Activate Method With SQLQueue'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryMethodAfterMethod
(
'SQLQueue'
)
def
test_56_TryCallActivityWithRightUser
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_56_TryCallActivityWithRightUser
(
self
):
# Test if me execute methods with the right user
# This should be independant of the activity used
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Call Activity With Right User'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
# We are first logged as seb
portal
=
self
.
getPortal
()
organisation
=
portal
.
organisation
.
_getOb
(
self
.
company_id
)
...
...
@@ -968,58 +827,28 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Check if what we did was executed as toto
self
.
assertEqual
(
email
.
getOwnerInfo
()[
'id'
],
'toto'
)
def
test_59_TryAfterTagWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_59_TryAfterTagWithSQLDict
(
self
):
# Test if after_tag can be used
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try After Tag With SQL Dict'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryAfterTag
(
'SQLDict'
)
def
test_60_TryAfterTagWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_60_TryAfterTagWithSQLQueue
(
self
):
# Test if after_tag can be used
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try After Tag With SQL Queue'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryAfterTag
(
'SQLQueue'
)
def
test_61_CheckSchedulingWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_61_CheckSchedulingWithSQLDict
(
self
):
# Test if scheduling is correct with SQLDict
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check Scheduling With SQL Dict'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
CheckScheduling
(
'SQLDict'
)
def
test_62_CheckSchedulingWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_62_CheckSchedulingWithSQLQueue
(
self
):
# Test if scheduling is correct with SQLQueue
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check Scheduling With SQL Queue'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
CheckScheduling
(
'SQLQueue'
)
def
test_61_CheckSchedulingAfterTagListWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_61_CheckSchedulingAfterTagListWithSQLDict
(
self
):
# Test if scheduling is correct with SQLDict
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check Scheduling After Tag List With SQL Dict'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
CheckSchedulingAfterTagList
(
'SQLDict'
)
def
test_62_CheckSchedulingWithAfterTagListSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_62_CheckSchedulingWithAfterTagListSQLQueue
(
self
):
# Test if scheduling is correct with SQLQueue
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check Scheduling After Tag List With SQL Queue'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
CheckSchedulingAfterTagList
(
'SQLQueue'
)
def
flushAllActivities
(
self
,
silent
=
0
,
loop_size
=
1000
):
...
...
@@ -1043,8 +872,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
if
not
silent
:
self
.
fail
(
'flushAllActivities maximum loop count reached'
)
def
test_65_TestMessageValidationAndFailedActivities
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_65_TestMessageValidationAndFailedActivities
(
self
):
"""after_method_id and failed activities.
Tests that if we have an active method scheduled by
...
...
@@ -1055,11 +883,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
was eventually agreed that this was a bug. If an activity fails, all the
activities that depend on it should be block until the first one is
resolved."""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
after_method_id and failed activities'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
activity_tool
=
self
.
getPortal
().
portal_activities
original_title
=
'something'
obj
=
self
.
getPortal
().
organisation_module
.
newContent
(
...
...
@@ -1108,28 +931,18 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
[
'failingMethod'
])
self
.
assertEqual
(
obj
.
getTitle
(),
original_title
)
def
test_66_TestCountMessageWithTagWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_66_TestCountMessageWithTagWithSQLDict
(
self
):
"""
Test new countMessageWithTag function with SQLDict.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check countMessageWithTag'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
CheckCountMessageWithTag
(
'SQLDict'
)
def
test_67_TestCancelFailedActiveObject
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_67_TestCancelFailedActiveObject
(
self
):
"""Cancel an active object to make sure that it does not refer to
a persistent object.
XXX: this test fails if run first
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test if it is possible to safely cancel an active object'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
activity_tool
=
self
.
getPortal
().
portal_activities
activity_tool
.
manageClearActivities
()
...
...
@@ -1170,11 +983,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self
.
commit
()
self
.
assertEqual
(
len
(
activity_tool
.
getMessageList
()),
0
)
def
test_68_RetryMessageExecution
(
self
,
quiet
=
0
):
if
not
quiet
:
message
=
'
\
n
Check number of executions of failing activities'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_68_RetryMessageExecution
(
self
):
activity_tool
=
self
.
portal
.
portal_activities
self
.
assertFalse
(
activity_tool
.
getMessageList
())
exec_count
=
[
0
]
...
...
@@ -1231,73 +1040,37 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
del
activity_tool
.
__class__
.
doSomething
self
.
assertFalse
(
activity_tool
.
getMessageList
())
def
test_70_TestConflictErrorsWhileValidatingWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_70_TestConflictErrorsWhileValidatingWithSQLDict
(
self
):
"""
Test if conflict errors spoil out active objects with SQLDict.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test Conflict Errors While Validating With SQLDict'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryConflictErrorsWhileValidating
(
'SQLDict'
)
def
test_71_TestConflictErrorsWhileValidatingWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_71_TestConflictErrorsWhileValidatingWithSQLQueue
(
self
):
"""
Test if conflict errors spoil out active objects with SQLQueue.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test Conflict Errors While Validating With SQLQueue'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryConflictErrorsWhileValidating
(
'SQLQueue'
)
def
test_72_TestErrorsWhileFinishingCommitDBWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_72_TestErrorsWhileFinishingCommitDBWithSQLDict
(
self
):
"""
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test Errors While Finishing Commit DB With SQLDict'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryErrorsWhileFinishingCommitDB
(
'SQLDict'
)
def
test_73_TestErrorsWhileFinishingCommitDBWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_73_TestErrorsWhileFinishingCommitDBWithSQLQueue
(
self
):
"""
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test Errors While Finishing Commit DB With SQLQueue'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryErrorsWhileFinishingCommitDB
(
'SQLQueue'
)
def
test_74_TryFlushActivityWithAfterTagSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_74_TryFlushActivityWithAfterTagSQLDict
(
self
):
# Test if after_tag can be used
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Flus Activity With After Tag With SQL Dict'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryFlushActivityWithAfterTag
(
'SQLDict'
)
def
test_75_TryFlushActivityWithAfterTagWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_75_TryFlushActivityWithAfterTagWithSQLQueue
(
self
):
# Test if after_tag can be used
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Flush Activity With After Tag With SQL Queue'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryFlushActivityWithAfterTag
(
'SQLQueue'
)
def
test_76_ActivateKwForNewContent
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check reindex message uses activate_kw passed to newContent'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_76_ActivateKwForNewContent
(
self
):
o1
=
self
.
getOrganisationModule
().
newContent
(
activate_kw
=
dict
(
tag
=
'The Tag'
))
self
.
commit
()
...
...
@@ -1308,12 +1081,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self
.
assertEqual
(
m
.
activity_kw
.
get
(
'tag'
),
'The Tag'
)
def
test_77_FlushAfterMultipleActivate
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check all message are flushed in SQLDict'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_77_FlushAfterMultipleActivate
(
self
):
orga_module
=
self
.
getOrganisationModule
()
p
=
orga_module
.
newContent
(
portal_type
=
'Organisation'
)
self
.
tic
()
...
...
@@ -1345,28 +1113,17 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self
.
commit
()
self
.
assertEqual
(
len
(
activity_tool
.
getMessageList
()),
0
)
def
test_78_IsMessageRegisteredSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_78_IsMessageRegisteredSQLDict
(
self
):
"""
This test tests behaviour of IsMessageRegistered method.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test IsMessageRegistered behaviour with SQLDict'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
checkIsMessageRegisteredMethod
(
'SQLDict'
)
def
test_79_AbortTransactionSynchronously
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_79_AbortTransactionSynchronously
(
self
):
"""
This test checks if transaction.abort() synchronizes connections. It
didn't do so back in Zope 2.7
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test Aborting Transaction Synchronizes'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
# Make a new persistent object, and commit it so that an oid gets
# assigned.
module
=
self
.
getOrganisationModule
()
...
...
@@ -1400,13 +1157,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
getattr
(
organisation
,
'uid'
)
def
callWithGroupIdParamater
(
self
,
activity
,
quiet
,
run
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Test Activity with group_id parameter (%s)'
%
activity
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
callWithGroupIdParamater
(
self
,
activity
):
portal
=
self
.
getPortal
()
organisation
=
portal
.
organisation
.
_getOb
(
self
.
company_id
)
# Defined a group method
...
...
@@ -1473,28 +1224,22 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list
=
portal
.
portal_activities
.
getMessageList
()
self
.
assertEqual
(
len
(
message_list
),
0
)
def
test_80a_CallWithGroupIdParamaterSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_80a_CallWithGroupIdParamaterSQLDict
(
self
):
"""
Test that group_id parameter is used to separate execution of the same method
"""
self
.
callWithGroupIdParamater
(
'SQLDict'
,
quiet
=
quiet
,
run
=
run
)
self
.
callWithGroupIdParamater
(
'SQLDict'
)
def
test_80b_CallWithGroupIdParamaterSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_80b_CallWithGroupIdParamaterSQLQueue
(
self
):
"""
Test that group_id parameter is used to separate execution of the same method
"""
self
.
callWithGroupIdParamater
(
'SQLQueue'
,
quiet
=
quiet
,
run
=
run
)
self
.
callWithGroupIdParamater
(
'SQLQueue'
)
def
test_81_ActivateKwForWorkflowTransition
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_81_ActivateKwForWorkflowTransition
(
self
):
"""
Test call of a workflow transition with activate_kw parameter propagate them
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check reindex message uses activate_kw passed to workflow transition'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
o1
=
self
.
getOrganisationModule
().
newContent
()
self
.
tic
()
o1
.
validate
(
activate_kw
=
dict
(
tag
=
'The Tag'
))
...
...
@@ -1505,15 +1250,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
for
m
in
messages_for_o1
:
self
.
assertEqual
(
m
.
activity_kw
.
get
(
'tag'
),
'The Tag'
)
def
test_82_LossOfVolatileAttribute
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_82_LossOfVolatileAttribute
(
self
):
"""
Test that the loss of volatile attribute doesn't loose activities
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check loss of volatile attribute doesn
\
'
t cause message to be lost'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
tic
()
activity_tool
=
self
.
getActivityTool
()
message_list
=
activity_tool
.
getMessageList
()
...
...
@@ -1535,7 +1275,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
message_list
=
activity_tool
.
getMessageList
()
self
.
assertEqual
(
len
(
message_list
),
2
)
def
test_88_ProcessingMultipleMessagesMustRevertIndividualMessagesOnError
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_88_ProcessingMultipleMessagesMustRevertIndividualMessagesOnError
(
self
):
"""
Check that, on queues which support it, processing a batch of multiple
messages doesn't cause failed ones to becommited along with succesful
...
...
@@ -1544,11 +1284,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
Queues supporting message batch processing:
- SQLQueue
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check processing a batch of messages with failures'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
tic
()
activity_tool
=
self
.
getActivityTool
()
obj
=
self
.
getPortal
().
organisation_module
.
newContent
(
portal_type
=
'Organisation'
)
...
...
@@ -1573,18 +1308,13 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally
:
delattr
(
Organisation
,
'appendToTitle'
)
def
test_89_RequestIsolationInsideSameTic
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_89_RequestIsolationInsideSameTic
(
self
):
"""
Check that request information do not leak from one activity to another
inside the same TIC invocation.
This only apply to queues supporting batch processing:
- SQLQueue
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check request isolation between messages of the same batch'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
tic
()
obj
=
self
.
getPortal
().
organisation_module
.
newContent
(
portal_type
=
'Organisation'
,
title
=
'Pending'
)
marker_id
=
'marker_%i'
%
(
random
.
randint
(
1
,
10
),
)
...
...
@@ -1636,28 +1366,18 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally
:
del
Organisation
.
failingMethod
def
test_90_userNotificationOnActivityFailureWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_90_userNotificationOnActivityFailureWithSQLDict
(
self
):
"""
Check that a user notification method is called on message when activity
fails and will not be tried again.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check user notification sent on activity final error (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryUserNotificationOnActivityFailure
(
'SQLDict'
)
def
test_91_userNotificationOnActivityFailureWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_91_userNotificationOnActivityFailureWithSQLQueue
(
self
):
"""
Check that a user notification method is called on message when activity
fails and will not be tried again.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check user notification sent on activity final error (SQLQueue)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryUserNotificationOnActivityFailure
(
'SQLQueue'
)
def
TryUserNotificationRaise
(
self
,
activity
):
...
...
@@ -1681,26 +1401,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
Message
.
notifyUser
=
original_notifyUser
delattr
(
Organisation
,
'failingMethod'
)
def
test_92_userNotificationRaiseWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_92_userNotificationRaiseWithSQLDict
(
self
):
"""
Check that activities are not left with processing=1 when notifyUser raises.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that activities are not left with processing=1 when notifyUser raises (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryUserNotificationRaise
(
'SQLDict'
)
def
test_93_userNotificationRaiseWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_93_userNotificationRaiseWithSQLQueue
(
self
):
"""
Check that activities are not left with processing=1 when notifyUser raises.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that activities are not left with processing=1 when notifyUser raises (SQLQueue)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryUserNotificationRaise
(
'SQLQueue'
)
def
TryActivityRaiseInCommitDoesNotStallActivityConection
(
self
,
activity
):
...
...
@@ -1730,20 +1440,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally
:
delattr
(
Organisation
,
'registerFailingTransactionManager'
)
def
test_96_ActivityRaiseInCommitDoesNotStallActivityConectionSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that raising in commit does not stall cmf activity SQL connection (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_96_ActivityRaiseInCommitDoesNotStallActivityConectionSQLDict
(
self
):
self
.
TryActivityRaiseInCommitDoesNotStallActivityConection
(
'SQLDict'
)
def
test_97_ActivityRaiseInCommitDoesNotStallActivityConectionSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that raising in commit does not stall cmf activity SQL connection (SQLQueue)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_97_ActivityRaiseInCommitDoesNotStallActivityConectionSQLQueue
(
self
):
self
.
TryActivityRaiseInCommitDoesNotStallActivityConection
(
'SQLQueue'
)
def
TryActivityRaiseInCommitDoesNotLooseMessages
(
self
,
activity
):
...
...
@@ -1764,20 +1464,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally
:
delattr
(
Organisation
,
'registerFailingTransactionManager'
)
def
test_98_ActivityRaiseInCommitDoesNotLooseMessagesSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that raising in commit does not loose messages (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_98_ActivityRaiseInCommitDoesNotLooseMessagesSQLDict
(
self
):
self
.
TryActivityRaiseInCommitDoesNotLooseMessages
(
'SQLDict'
)
def
test_99_ActivityRaiseInCommitDoesNotLooseMessagesSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that raising in commit does not loose messages (SQLQueue)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_99_ActivityRaiseInCommitDoesNotLooseMessagesSQLQueue
(
self
):
self
.
TryActivityRaiseInCommitDoesNotLooseMessages
(
'SQLQueue'
)
def
TryChangeSkinInActivity
(
self
,
activity
):
...
...
@@ -1797,32 +1487,17 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally
:
delattr
(
Organisation
,
'changeSkinToNone'
)
def
test_100_TryChangeSkinInActivitySQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try Change Skin In Activity (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_100_TryChangeSkinInActivitySQLDict
(
self
):
self
.
TryChangeSkinInActivity
(
'SQLDict'
)
def
test_101_TryChangeSkinInActivitySQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Try ChangeSkin In Activity (SQLQueue)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_101_TryChangeSkinInActivitySQLQueue
(
self
):
self
.
TryChangeSkinInActivity
(
'SQLQueue'
)
def
test_102_1_CheckSQLDictDoesNotDeleteSimilaritiesBeforeExecution
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_102_1_CheckSQLDictDoesNotDeleteSimilaritiesBeforeExecution
(
self
):
"""
Test that SQLDict does not delete similar messages which have the same
method_id and path but a different tag before execution.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check similarities are not deleted before execution of original message (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
activity_tool
=
self
.
getActivityTool
()
marker
=
[]
def
doSomething
(
self
,
other_tag
):
...
...
@@ -1845,16 +1520,11 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally
:
del
activity_tool
.
__class__
.
doSomething
def
test_102_2_CheckSQLDictDeleteDuplicatesBeforeExecution
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_102_2_CheckSQLDictDeleteDuplicatesBeforeExecution
(
self
):
"""
Test that SQLDict delete the same messages before execution if messages
has the same method_id and path and tag.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check duplicates are deleted before execution of original message (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
activity_tool
=
self
.
getActivityTool
()
marker
=
[]
def
doSomething
(
self
,
other_tag
):
...
...
@@ -1881,7 +1551,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
del
activity_tool
.
__class__
.
doSomething
def
test_102_3_CheckSQLDictDistributeWithSerializationTagAndGroupMethodId
(
self
,
quiet
=
0
):
self
):
"""
Distribuation was at some point buggy with this scenario when there was
activate with the same serialization_tag and one time with a group_method
...
...
@@ -1904,7 +1574,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self
.
tic
()
self
.
assertEqual
(
len
(
activity_tool
.
getMessageList
()),
0
)
def
test_103_interQueuePriorities
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_103_interQueuePriorities
(
self
):
"""
Important note: there is no way to really reliably check that this
feature is correctly implemented, as activity execution order is
...
...
@@ -1912,11 +1582,6 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
The best which can be done is to check that under certain circumstances
the activity exeicution order match expectations.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check inter-queue priorities'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
organisation
=
self
.
getPortal
().
organisation_module
.
newContent
(
portal_type
=
'Organisation'
)
self
.
tic
()
activity_tool
=
self
.
getActivityTool
()
...
...
@@ -1984,20 +1649,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally
:
del
document
.
__class__
.
doSomething
def
test_104_activityRuntimeEnvironmentSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check ActivityRuntimeEnvironment (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_104_activityRuntimeEnvironmentSQLDict
(
self
):
self
.
CheckActivityRuntimeEnvironment
(
'SQLDict'
)
def
test_105_activityRuntimeEnvironmentSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check ActivityRuntimeEnvironment (SQLQueue)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_105_activityRuntimeEnvironmentSQLQueue
(
self
):
self
.
CheckActivityRuntimeEnvironment
(
'SQLQueue'
)
def
CheckSerializationTag
(
self
,
activity
):
...
...
@@ -2051,20 +1706,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self
.
tic
()
self
.
assertEqual
(
len
(
activity_tool
.
getMessageList
()),
0
)
def
test_106_checkSerializationTagSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check serialization tag (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_106_checkSerializationTagSQLDict
(
self
):
self
.
CheckSerializationTag
(
'SQLDict'
)
def
test_107_checkSerializationTagSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check serialization tag (SQLQueue)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_107_checkSerializationTagSQLQueue
(
self
):
self
.
CheckSerializationTag
(
'SQLQueue'
)
def
test_108_testAbsoluteUrl
(
self
):
...
...
@@ -2139,29 +1784,13 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self
.
assertEqual
(
TO_STRING
,
organisation
.
getTitle
())
self
.
assertEqual
(
TO_STRING
,
organisation
.
getDescription
())
def
test_112_checkLocalizerWorksSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check Localizer works (SQLQueue)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_112_checkLocalizerWorksSQLQueue
(
self
):
self
.
CheckLocalizerWorks
(
'SQLQueue'
)
def
test_113_checkLocalizerWorksSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check Localizer works (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_113_checkLocalizerWorksSQLDict
(
self
):
self
.
CheckLocalizerWorks
(
'SQLDict'
)
def
test_114_checkSQLQueueActivitySucceedsAfterActivityChangingSkin
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check SQLQueue activity succeeds after an activity changing skin selection'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_114_checkSQLQueueActivitySucceedsAfterActivityChangingSkin
(
self
):
portal
=
self
.
getPortalObject
()
activity_tool
=
self
.
getActivityTool
()
# Check that a reference script can be reached
...
...
@@ -2199,12 +1828,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
delattr
(
Organisation
,
'firstTest'
)
delattr
(
Organisation
,
'secondTest'
)
def
test_115_checkProcessShutdown
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that no activity is executed after process_shutdown has been called'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_115_checkProcessShutdown
(
self
):
# Thread execution plan for this test:
# main ActivityThread ProcessShutdownThread
# start ActivityThread None None
...
...
@@ -2453,17 +2077,12 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
finally
:
SQLBase
.
MAX_MESSAGE_LIST_SIZE
=
MAX_MESSAGE_LIST_SIZE
def
test_115_TestSerializationTagSQLDictPreventsParallelExecution
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_115_TestSerializationTagSQLDictPreventsParallelExecution
(
self
):
"""
Test if there are multiple activities with the same serialization tag,
then serialization tag guarantees that only one of the same serialization
tagged activities can be processed at the same time.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
from
Products.CMFActivity
import
ActivityTool
portal
=
self
.
getPortal
()
...
...
@@ -2618,26 +2237,16 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
del
Organisation
.
failingMethod
self
.
_ignore_log_errors
()
def
test_118_userNotificationSavedOnEventLogWhenNotifyUserRaisesWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_118_userNotificationSavedOnEventLogWhenNotifyUserRaisesWithSQLDict
(
self
):
"""
Check the error is saved on event log even if the mail notification is not sent.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check the error is saved on event log even if the mail notification is not sent (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryNotificationSavedOnEventLogWhenNotifyUserRaises
(
'SQLDict'
)
def
test_119_userNotificationSavedOnEventLogWhenNotifyUserRaisesWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
def
test_119_userNotificationSavedOnEventLogWhenNotifyUserRaisesWithSQLQueue
(
self
):
"""
Check the error is saved on event log even if the mail notification is not sent.
"""
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check the error is saved on event log even if the mail notification is not sent (SQLQueue)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
self
.
TryNotificationSavedOnEventLogWhenNotifyUserRaises
(
'SQLQueue'
)
def
TryUserMessageContainingNoTracebackIsStillSent
(
self
,
activity
):
...
...
@@ -2672,20 +2281,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
Message
.
notifyUser
=
original_notifyUser
delattr
(
Organisation
,
'failingMethod'
)
def
test_120_sendMessageWithNoTracebackWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that message with no traceback is still sent (SQLQueue)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_120_sendMessageWithNoTracebackWithSQLQueue
(
self
):
self
.
TryUserMessageContainingNoTracebackIsStillSent
(
'SQLQueue'
)
def
test_121_sendMessageWithNoTracebackWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that message with no traceback is still sent (SQLDict)'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_121_sendMessageWithNoTracebackWithSQLDict
(
self
):
self
.
TryUserMessageContainingNoTracebackIsStillSent
(
'SQLDict'
)
def
TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises
(
self
,
activity
):
...
...
@@ -2731,20 +2330,10 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
del
Organisation
.
failingMethod
self
.
_ignore_log_errors
()
def
test_122_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLDict
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that message not saved in site error logger is not lost'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_122_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLDict
(
self
):
self
.
TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises
(
'SQLDict'
)
def
test_123_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLQueue
(
self
,
quiet
=
0
,
run
=
run_all_test
):
if
not
run
:
return
if
not
quiet
:
message
=
'
\
n
Check that message not saved in site error logger is not lost'
ZopeTestCase
.
_print
(
message
)
LOG
(
'Testing... '
,
0
,
message
)
def
test_123_userNotificationSavedOnEventLogWhenSiteErrorLoggerRaisesWithSQLQueue
(
self
):
self
.
TryNotificationSavedOnEventLogWhenSiteErrorLoggerRaises
(
'SQLQueue'
)
def
test_124_checkConflictErrorAndNoRemainingActivities
(
self
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment