Commit a0dfb228 authored by Jérome Perrin's avatar Jérome Perrin

Merge remote-tracking branch 'upstream/master' into zope4py2

parents f1216db4 164e4631
Pipeline #24625 failed with stage
in 0 seconds
......@@ -2,4 +2,4 @@
because only validated accounts are displayed in the cache.
"""
if sci['object'].getValidationState() == 'validated':
container.Account_flushAccountListCache(sci)
container.script_Account_flushAccountListCache(sci)
......@@ -13,7 +13,7 @@ section_portal_type_list = ['Person', 'Organisation']
invalid_state_list = ['invalidated', 'deleted']
# first of all, validate the transaction itself
container.validateTransaction(state_change)
container.script_validateTransaction(state_change)
# Check that all lines uses open accounts, and doesn't use invalid third
......
return sci.getPortal().portal_workflow.accounting_workflow.scripts[script.getId()](sci)
return sci.getPortal().portal_workflow.accounting_workflow[script.getId()](sci)
state_change['object'].AccountingTransaction_setDefaultMirrorAccountList()
return container.setReferences(state_change)
return container.script_setReferences(state_change)
return sci.getPortal().portal_workflow.accounting_workflow.scripts[script.getId()](sci)
return sci.getPortal().portal_workflow.accounting_workflow[script.getId()](sci)
return state_change.getPortal().portal_workflow.accounting_workflow.scripts[script.getId()](state_change)
return state_change.getPortal().portal_workflow.accounting_workflow[script.getId()](state_change)
return state_change.getPortal().portal_workflow.accounting_workflow.scripts[script.getId()](state_change)
return state_change.getPortal().portal_workflow.accounting_workflow[script.getId()](state_change)
......@@ -7,4 +7,4 @@ if old_state.getId() == 'draft':
if internal_invoice.InternalInvoiceTransaction_getAuthenticatedUserSection() == internal_invoice.getDestinationSection():
raise ValidationFailed(translateString("Your entity should not be destination."))
return state_change.getPortal().portal_workflow.accounting_workflow.scripts[script.getId()](state_change)
return state_change.getPortal().portal_workflow.accounting_workflow[script.getId()](state_change)
......@@ -29,7 +29,7 @@
##############################################################################
from AccessControl import ClassSecurityInfo
from Products.CMFCore.WorkflowCore import WorkflowAction
from Products.ERP5Type.Base import WorkflowMethod
from Products.ERP5Type import Permissions, PropertySheet
from Products.ERP5Type.XMLObject import XMLObject
......@@ -64,7 +64,7 @@ class ApparelMeasurement(XMLObject, XMLMatrix, Image):
# Inheritance
_edit = Image._edit
security.declareProtected(Permissions.ModifyPortalContent, 'edit' )
edit = WorkflowAction( _edit )
edit = WorkflowMethod( _edit )
security.declareProtected(Permissions.View, 'index_html')
index_html = Image.index_html
......
......@@ -24,4 +24,4 @@ for simulation_movement in simulation_movement_list:
if simulation_movement.getOrder() == delivery_movement.getRelativeUrl():
simulation_movement.setOrder(None)
context.DeliveryMovement_updateSimulation(state_change)
container.script_DeliveryMovement_updateSimulation(state_change)
......@@ -31,7 +31,7 @@ import os
import random
import unittest
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.DCWorkflow.DCWorkflow import ValidationFailed
from Products.ERP5Type.Core.Workflow import ValidationFailed
from AccessControl import Unauthorized
class TestCertificateAuthority(ERP5TypeTestCase):
......
......@@ -966,19 +966,23 @@ class TestBase(ERP5TypeTestCase, ZopeTestCase.Functional):
# Add a non-existent workflow.
pw = self.getWorkflowTool()
dummy_simulation_worlflow_id = 'fake_simulation_workflow'
dummy_validation_worlflow_id = 'fake_validation_workflow'
dummy_simulation_workflow_id = 'fake_simulation_workflow'
dummy_validation_workflow_id = 'fake_validation_workflow'
#Assume that erp5_styles workflow Manage permissions with acquired Role by default
addWorkflowByType(pw, 'erp5_workflow', dummy_simulation_worlflow_id)
addWorkflowByType(pw, 'erp5_workflow', dummy_validation_worlflow_id)
dummy_simulation_worlflow = pw[dummy_simulation_worlflow_id]
dummy_validation_worlflow = pw[dummy_validation_worlflow_id]
dummy_validation_worlflow.variables.setStateVar('validation_state')
dummy_simulation_workflow = pw.newContent(
portal_type='Workflow',
reference=dummy_simulation_workflow_id,
)
dummy_validation_workflow = pw.newContent(
portal_type='Workflow',
reference=dummy_validation_workflow_id,
state_variable='validation_state',
)
organisation_type = portal.portal_types.getTypeInfo(portal_type)
organisation_initial_workflow_list = organisation_type.getTypeWorkflowList()
organisation_type.setTypeWorkflowList([dummy_validation_worlflow_id,
dummy_simulation_worlflow_id])
permission_list = list(dummy_simulation_worlflow.permissions)
organisation_type.setTypeWorkflowList([dummy_validation_workflow_id,
dummy_simulation_workflow_id])
permission_list = dummy_simulation_workflow.getWorkflowManagedPermissionList()
manager_has_permission = {}
for permission in permission_list:
manager_has_permission[permission] = ('Manager',)
......@@ -989,7 +993,7 @@ class TestBase(ERP5TypeTestCase, ZopeTestCase.Functional):
user = getSecurityManager().getUser()
try:
self.assertTrue(permission_list)
self.assertFalse(dummy_simulation_worlflow.states.draft.permission_roles)
self.assertFalse(dummy_simulation_workflow['state_draft'].getStatePermissionRoleListDict())
#1
obj = module.newContent(portal_type=portal_type)
#No role is defined by default on workflow
......@@ -999,28 +1003,28 @@ class TestBase(ERP5TypeTestCase, ZopeTestCase.Functional):
for permission in permission_list:
self.assertTrue(user.has_permission(permission, obj))
#2 Now configure both workflow with same configuration
dummy_simulation_worlflow.states.draft.permission_roles = manager_has_permission.copy()
dummy_validation_worlflow.states.draft.permission_roles = manager_has_permission.copy()
dummy_simulation_worlflow.updateRoleMappingsFor(obj)
dummy_validation_worlflow.updateRoleMappingsFor(obj)
dummy_simulation_workflow['state_draft'].setStatePermissionRoleListDict(manager_has_permission.copy())
dummy_validation_workflow['state_draft'].setStatePermissionRoleListDict(manager_has_permission.copy())
dummy_simulation_workflow.updateRoleMappingsFor(obj)
dummy_validation_workflow.updateRoleMappingsFor(obj)
for permission in permission_list:
self.assertTrue(user.has_permission(permission, obj))
#3 change only dummy_simulation_worlflow
dummy_simulation_worlflow.states.draft.permission_roles = manager_has_no_permission.copy()
dummy_simulation_worlflow.updateRoleMappingsFor(obj)
#3 change only dummy_simulation_workflow
dummy_simulation_workflow['state_draft'].setStatePermissionRoleListDict(manager_has_no_permission.copy())
dummy_simulation_workflow.updateRoleMappingsFor(obj)
for permission in permission_list:
self.assertFalse(user.has_permission(permission, obj))
#4 enable acquisition for dummy_simulation_worlflow
dummy_simulation_worlflow.states.draft.permission_roles = None
dummy_simulation_worlflow.updateRoleMappingsFor(obj)
#4 enable acquisition for dummy_simulation_workflow
dummy_simulation_workflow['state_draft'].setAcquirePermissionList(permission_list)
dummy_simulation_workflow.updateRoleMappingsFor(obj)
for permission in permission_list:
self.assertTrue(user.has_permission(permission, obj))
finally:
# Make sure that the artificial workflow is not referred to any longer.
organisation_type.setTypeWorkflowList(organisation_initial_workflow_list)
pw.manage_delObjects([dummy_simulation_worlflow_id, dummy_validation_worlflow_id])
pw.manage_delObjects([dummy_simulation_workflow_id, dummy_validation_workflow_id])
def test_getViewPermissionOwnerDefault(self):
"""Test getViewPermissionOwner method behaviour"""
......
......@@ -39,7 +39,6 @@ from urllib import pathname2url
from Products.ERP5Type.Globals import PersistentMapping
from Products.ERP5Type.dynamic.lazy_class import ERP5BaseBroken
from Products.ERP5Type.tests.utils import LogInterceptor
from Products.ERP5Type.Workflow import addWorkflowByType
import shutil
import os
import random
......@@ -1315,8 +1314,11 @@ class BusinessTemplateMixin(ERP5TypeTestCase, LogInterceptor):
"""
wf_id = 'geek_workflow'
pw = self.getWorkflowTool()
addWorkflowByType(pw, WORKFLOW_TYPE, wf_id)
workflow = pw._getOb(wf_id, None)
workflow = pw.newContent(
portal_type='Workflow',
reference=wf_id,
)
self.tic()
self.assertTrue(workflow is not None)
sequence.edit(workflow_id=workflow.getId())
......@@ -2886,8 +2888,11 @@ class BusinessTemplateMixin(ERP5TypeTestCase, LogInterceptor):
"""
wf_id = 'custom_geek_workflow'
pw = self.getWorkflowTool()
addWorkflowByType(pw, WORKFLOW_TYPE, wf_id)
workflow = pw._getOb(wf_id, None)
workflow = pw.newContent(
portal_type='Workflow',
reference=wf_id,
)
self.tic()
self.assertTrue(workflow is not None)
sequence.edit(workflow_id=workflow.getId())
......
......@@ -33,7 +33,6 @@ import transaction
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.Base import _aq_reset
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Workflow import addWorkflowByType
class TestInteractionWorkflow(ERP5TypeTestCase):
......@@ -76,20 +75,26 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def _createInteractionWorkflowWithId(self, wf_id):
wf_tool = self.getWorkflowTool()
return addWorkflowByType(wf_tool, "interaction_workflow", wf_id)
return wf_tool.newContent(
portal_type='Interaction Workflow',
reference=wf_id,
)
def createInteractionWorkflow(self):
if getattr(self.getWorkflowTool(), 'test_workflow', None) is None:
self._createInteractionWorkflowWithId('test_workflow')
wf = self.getWorkflowTool()['test_workflow']
self.wf = wf
if getattr(wf.scripts, 'afterEdit', None) is None:
wf.scripts.manage_addProduct['PythonScripts']\
.manage_addPythonScript(id='afterEdit')
self.script = wf.scripts['afterEdit']
if getattr(wf.interactions, 'edit_interaction', None) is None:
wf.interactions.addInteraction(id='edit_interaction')
self.interaction = wf.interactions['edit_interaction']
if getattr(wf, 'script_afterEdit', None) is None:
wf.manage_addProduct['PythonScripts']\
.manage_addPythonScript(id='script_afterEdit')
self.script = wf.script_afterEdit
if getattr(wf, 'interactions_edit_interaction', None) is None:
wf.newContent(
portal_type='Interaction Workflow Interaction',
reference='edit_interaction',
)
self.interaction = wf.interaction_edit_interaction
type_object = self.portal.portal_types.getTypeInfo(self.portal_type)
type_object.setTypeWorkflowList(['test_workflow', 'validation_workflow'])
_aq_reset() # XXX Fails XXX _setLastId not found when doing newContent
......@@ -97,16 +102,24 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def createInteractionWorkflowWithTwoInteractions(self):
wf = self._createInteractionWorkflowWithId('test_workflow')
self.wf = wf
wf.scripts.manage_addProduct['PythonScripts']\
.manage_addPythonScript(id='afterEditA')
self.scriptA = wf.scripts['afterEditA']
wf.interactions.addInteraction(id='editA')
self.interactionA = wf.interactions['editA']
wf.scripts.manage_addProduct['PythonScripts']\
.manage_addPythonScript(id='afterEditB')
self.scriptB = wf.scripts['afterEditB']
wf.interactions.addInteraction(id='editB')
self.interactionB = wf.interactions['editB']
wf.manage_addProduct['PythonScripts']\
.manage_addPythonScript(id='script_afterEditA')
self.scriptA = wf.script_afterEditA
if getattr(wf, 'interactions_editA', None) is None:
wf.newContent(
portal_type='Interaction Workflow Interaction',
reference='editA',
)
self.interactionA = wf.interaction_editA
wf.manage_addProduct['PythonScripts']\
.manage_addPythonScript(id='script_afterEditB')
self.scriptB = wf.script_afterEditB
if getattr(wf, 'interactions_editB', None) is None:
wf.newContent(
portal_type='Interaction Workflow Interaction',
reference='editB',
)
self.interactionB = wf.interaction_editB
type_object = self.portal.portal_types.getTypeInfo(self.portal_type)
type_object.setTypeWorkflowList(['test_workflow', 'validation_workflow'])
_aq_reset() # XXX Fails XXX _setLastId not found when doing newContent
......@@ -118,10 +131,10 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_edit_interaction(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'afterEdit',
method_id='edit',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('edit',),
after_script_value=self.script,
)
params = 'sci,**kw'
body = "context = sci.object\n" +\
"context.setDescription('toto')"
......@@ -133,10 +146,10 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_interaction_on_edit(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'afterEdit',
method_id='edit',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('edit',),
after_script_value=self.script,
)
body = "context = sci.object\n" +\
"context.setDescription('toto')"
params = 'sci,**kw'
......@@ -148,10 +161,10 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_interaction_on_method(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'afterEdit',
method_id='doSomethingStupid',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('doSomethingStupid',),
after_script_value=self.script,
)
body = "context = sci.object\n" +\
"context.setDescription('toto')"
params = 'sci,**kw'
......@@ -163,10 +176,10 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_interaction_on_category_setter(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'afterEdit',
method_id='setSizeList _setSizeList',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('setSizeList', '_setSizeList',),
after_script_value=self.script,
)
body = "context = sci.object\n" +\
"context.setDescription('toto')"
params = 'sci,**kw'
......@@ -178,10 +191,10 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_interaction_executed_once(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'afterEdit',
method_id='edit',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('edit',),
after_script_value=self.script,
)
params = 'sci,**kw'
body = "context = sci.object\n" +\
"description = context.getDescription()\n" +\
......@@ -195,10 +208,10 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_returned_value(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'afterEdit',
method_id='newContent',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('newContent',),
after_script_value=self.script,
)
params = 'sci,**kw'
body = "context = sci.object\n" +\
"return 3\n"
......@@ -212,10 +225,10 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_multiple_methods(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'afterEdit',
method_id='setCorporateName setActivityCode',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('setCorporateName', 'setActivityCode',),
after_script_value=self.script,
)
params = 'sci,**kw'
body = "context = sci.object\n" +\
"description = context.getDescription()\n" +\
......@@ -229,14 +242,14 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_same_method_two_interactions(self):
self.createInteractionWorkflowWithTwoInteractions()
self.interactionA.setProperties(
'afterEditA',
method_id='edit',
after_script_name=('afterEditA',))
self.interactionB.setProperties(
'afterEditB',
method_id='edit',
after_script_name=('afterEditB',))
self.interactionA.edit(
trigger_method_id_list=('edit',),
after_script_value=self.scriptA,
)
self.interactionB.edit(
trigger_method_id_list=('edit',),
after_script_value=self.scriptB,
)
params = 'sci,**kw'
body = "context = sci.object\n" +\
"context.log('InteractionWF.test_09 in script', 'a')\n" +\
......@@ -257,10 +270,10 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_multiple_scripts(self):
self.createInteractionWorkflowWithTwoInteractions()
self.interactionA.setProperties(
'afterEdit',
method_id='edit',
after_script_name=('afterEditA', 'afterEditB'))
self.interactionA.edit(
trigger_method_id_list=('edit',),
after_script_value_list=(self.scriptA, self.scriptB),
)
params = 'sci,**kw'
body = "context = sci.object\n" +\
"context.log('InteractionWF.test_10 in script', 'a')\n" +\
......@@ -281,14 +294,14 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_private_accessor(self):
self.createInteractionWorkflowWithTwoInteractions()
self.interactionA.setProperties(
'afterEditA',
method_id='_setVatCode',
after_script_name=('afterEditA',))
self.interactionB.setProperties(
'afterEditB',
method_id='setVatCode',
after_script_name=('afterEditB',))
self.interactionA.edit(
trigger_method_id_list=('_setVatCode',),
after_script_value=self.scriptA,
)
self.interactionB.edit(
trigger_method_id_list=('setVatCode',),
after_script_value=self.scriptB,
)
params = 'sci,**kw'
body = "context = sci.object\n" +\
"context.log('InteractionWF.test_11 in script', 'a')\n" +\
......@@ -309,14 +322,14 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_private_accessor_on_acquired_property(self):
self.createInteractionWorkflowWithTwoInteractions()
self.interactionA.setProperties(
'afterEditA',
method_id='_setDefaultEmailText',
after_script_name=('afterEditA',))
self.interactionB.setProperties(
'afterEditB',
method_id='setDefaultEmailText',
after_script_name=('afterEditB',))
self.interactionA.edit(
trigger_method_id_list=('_setDefaultEmailText',),
after_script_value=self.scriptA,
)
self.interactionB.edit(
trigger_method_id_list=('setDefaultEmailText',),
after_script_value=self.scriptB,
)
params = 'sci,**kw'
body = "context = sci.object\n" +\
"context.log('InteractionWF.test_12 in script', 'a')\n" +\
......@@ -341,10 +354,10 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
# Check that edit does not detect the property modified in interaction
# script as modified by user
self.createInteractionWorkflow()
self.interaction.setProperties(
'afterEdit',
method_id='_setTitle',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('_setTitle',),
after_script_value=self.script,
)
params = 'sci,**kw'
body = "context = sci.object\n" +\
"vat_code = context.getVatCode()\n" +\
......@@ -383,10 +396,10 @@ class TestInteractionWorkflow(ERP5TypeTestCase):
def test_BeforeScriptParameters(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'afterEdit',
method_id='getProperty',
script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('getProperty',),
before_script_value=self.script,
)
params = 'sci,**kw'
body = """\
context = sci['object']
......@@ -404,10 +417,10 @@ context.setDescription('%s,%s,%s' % (d, args, result))
def test_AfterScriptParameters(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'afterEdit',
method_id='getProperty',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('getProperty',),
after_script_value=self.script,
)
params = 'sci,**kw'
body = """\
context = sci['object']
......@@ -426,10 +439,10 @@ context.setDescription('%s,%s,%s' % (d, args, result))
def test_BeforeCommitParameters(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'beforeCommit',
method_id='getProperty',
before_commit_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('getProperty',),
before_commit_script_value=self.script,
)
params = 'sci, **kw'
body = """\
context = sci['object']
......@@ -459,11 +472,11 @@ context.setDescription('%s,%s,%s' % (d, args, result))
def test_activity_interaction(self):
# Tests for Later Script (in activity)
self.createInteractionWorkflow()
self.interaction.setProperties(
'editObject',
once_per_transaction=1,
method_id='_setGroup.*',
activate_script_name=('afterEdit',))
self.interaction.edit(
trigger_once_per_transaction=1,
trigger_method_id_list=('_setGroup.*',),
activate_script_value=self.script,
)
params = 'sci, **kw'
body = """\
context = sci['object']
......@@ -481,11 +494,11 @@ context.setTitle('Bar')
def test_skip_temp_object(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'editObject',
temporary_document_disallowed=False,
method_id='_setGroup.*',
after_script_name=('afterEdit',))
self.interaction.edit(
temporary_document_disallowed=0,
trigger_method_id_list=('_setGroup.*',),
after_script_value=self.script,
)
params = 'sci, **kw'
body = """\
context = sci['object']
......@@ -500,22 +513,22 @@ context.setTitle('Bar')
self.assertEqual(temp.getTitle(), 'Bar')
# but not if it has been forbidden
temp.setTitle('Foo')
self.interaction.setProperties(
'editObject',
temporary_document_disallowed=True,
method_id='_setGroup.*',
after_script_name=('afterEdit',))
self.interaction.edit(
temporary_document_disallowed=1,
trigger_method_id_list=('_setGroup.*',),
after_script_value=self.script,
)
temp.setGroupValue(None)
self.assertEqual(temp.getTitle(), 'Foo')
def test_temp_object_doesnt_skip_normal(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'editObject',
once_per_transaction=True,
temporary_document_disallowed=True,
method_id='_setGroup.*',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_once_per_transaction=1,
temporary_document_disallowed=1,
trigger_method_id_list=('_setGroup.*',),
after_script_value=self.script,
)
params = 'sci, **kw'
body = """\
context = sci['object']
......@@ -544,12 +557,12 @@ context.setTitle('Bar')
def test_temp_object_does_skip_normal(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'editObject',
once_per_transaction=True,
temporary_document_disallowed=False,
method_id='_setGroup.*',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_once_per_transaction=1,
temporary_document_disallowed=0,
trigger_method_id_list=('_setGroup.*',),
after_script_value=self.script,
)
params = 'sci, **kw'
body = """\
context = sci['object']
......@@ -586,10 +599,10 @@ context.setTitle('Bar')
# test that we can add an interaction by defining methods using regular
# expression
self.createInteractionWorkflow()
self.interaction.setProperties(
'regexp',
method_id='_set.* set.*',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('_set.*', 'set.*'),
after_script_value=self.script,
)
call_list = self.portal.REQUEST['call_list'] = []
self.script.ZPythonScript_edit('sci',
......@@ -614,10 +627,10 @@ context.setTitle('Bar')
# wrapping a method in an interaction workflow adds a default security to
# this method if the method does not exists.
self.createInteractionWorkflow()
self.interaction.setProperties(
'default',
method_id='nonExistantMethod',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('nonExistantMethod',),
after_script_value=self.script,
)
self.script.ZPythonScript_edit('sci', '')
# the default security is "Access contents information"
self.organisation.manage_permission(
......@@ -629,10 +642,10 @@ context.setTitle('Bar')
# wrapping a method in an interaction workflow adds a default security to
# this method, but does not override existing security definition
self.createInteractionWorkflow()
self.interaction.setProperties(
'default',
method_id='setDescription',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('setDescription',),
after_script_value=self.script,
)
self.script.ZPythonScript_edit('sci', '')
# This rely on the fact that 'setDescription' is protected with 'Modify
# portal content'
......@@ -651,20 +664,20 @@ context.setTitle('Bar')
security.apply(Organisation)
self.createInteractionWorkflow()
self.interaction.setProperties(
'default',
method_id='doSomethingStupid',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('doSomethingStupid',),
after_script_value=self.script,
)
self.script.ZPythonScript_edit('sci', '')
self.assertEqual(self.organisation.doSomethingStupid__roles__, ())
def test_wrap_workflow_transition(self):
self.createInteractionWorkflow()
self.interaction.setProperties(
'default',
method_id='validate',
after_script_name=('afterEdit',))
self.interaction.edit(
trigger_method_id_list=('validate',),
after_script_value=self.script,
)
params = 'sci, **kw'
body = "context = sci[\'object\']\n" +\
"context.setDescription('titi')"
......@@ -678,12 +691,12 @@ context.setTitle('Bar')
self.createInteractionWorkflow()
type_object = self.portal.portal_types.getTypeInfo('Bank Account')
type_object.setTypeWorkflowList(['test_workflow', 'validation_workflow'])
self.interaction.setProperties(
'default',
# only for bank accounts
portal_type_filter=['Bank Account'],
method_id='getReference',
after_script_name=('afterEdit',))
self.interaction.edit(
# only for bank accounts
portal_type_filter_list=['Bank Account'],
trigger_method_id_list=('getReference',),
after_script_value=self.script,
)
params = 'sci, **kw'
body = "context = sci[\'object\']\n" +\
"context.setDescription('modified')"
......@@ -703,12 +716,12 @@ context.setTitle('Bar')
self.createInteractionWorkflow()
type_object = self.portal.portal_types.getTypeInfo('Bank Account')
type_object.setTypeWorkflowList(['test_workflow', 'validation_workflow'])
self.interaction.setProperties(
'default',
# only for payment nodes portal type group (ie. bank account)
portal_type_group_filter=['payment_node'],
method_id='getReference',
after_script_name=('afterEdit',))
self.interaction.edit(
# only for bank accounts
portal_type_group_filter_list=['payment_node'],
trigger_method_id_list=('getReference',),
after_script_value=self.script,
)
params = 'sci, **kw'
body = "context = sci[\'object\']\n" +\
"context.setDescription('modified')"
......
from Products.ERP5Type.Core.Workflow import ValidationFailed
from Products.ERP5Type.Message import translateString
container.Event_checkConsistency(sci)
container.script_Event_checkConsistency(sci)
portal = sci.getPortal()
portal_workflow = portal.portal_workflow
......
......@@ -10,9 +10,9 @@ portal = context.getPortalObject()
query_dict = {}
workflow = portal.portal_workflow.ticket_workflow
workflow_state_var = workflow.variables.getStateVar()
workflow_state_var = workflow.getStateVariable()
for worklist in workflow.worklists.objectValues():
for worklist in workflow.getWorklistValueList():
identity_criterion_dict = worklist.getIdentityCriterionDict()
if portal_type \
and 'portal_type' in worklist.getCriterionPropertyList() \
......
......@@ -15,4 +15,4 @@ if test_result.getPortalType() == 'Test Result Node':
edit_kw[key] = key_value
test_result.edit(**edit_kw)
else:
context.TestResult_complete(sci)
container.script_TestResult_complete(sci)
......@@ -3,14 +3,14 @@ portal = context.getPortalObject()
portal_type = ticket.getPortalType()
if portal_type in ("Leave Request", "Leave Request Period"):
context.Alarm_safeTrigger(
container.script_Alarm_safeTrigger(
portal.portal_alarms.create_representative_record_for_leave_request
)
elif portal_type in ("Expense Validation Request"):
context.Alarm_safeTrigger(
container.script_Alarm_safeTrigger(
portal.portal_alarms.create_representative_record_for_expense_validation_request
)
elif portal_type in ("Travel Request"):
context.Alarm_safeTrigger(
container.script_Alarm_safeTrigger(
portal.portal_alarms.create_representative_record_for_travel_request
)
......@@ -3534,7 +3534,11 @@ class Base(
# Use meta transition to jump from one state to another
# without existing transitions.
from Products.ERP5.InteractionWorkflow import InteractionWorkflowDefinition
from Products.ERP5Type import WITH_LEGACY_WORKFLOW
if WITH_LEGACY_WORKFLOW:
from Products.ERP5.InteractionWorkflow import InteractionWorkflowDefinition
else:
InteractionWorkflowDefinition = None.__class__
from Products.ERP5Type.Core.InteractionWorkflow import InteractionWorkflow
portal = self.getPortalObject()
workflow_tool = portal.portal_workflow
......
......@@ -98,7 +98,10 @@ class WorkflowTool(BaseTool, OriginalWorkflowTool):
This is public method to allow passing meta transition (Jump form
any state to another in same workflow)
"""
from Products.ERP5.InteractionWorkflow import InteractionWorkflowDefinition
if WITH_LEGACY_WORKFLOW:
from Products.ERP5.InteractionWorkflow import InteractionWorkflowDefinition
else:
InteractionWorkflowDefinition = None.__class__
from Products.ERP5Type.Core.InteractionWorkflow import InteractionWorkflow
workflow_list = self.getWorkflowValueListFor(ob.getPortalType())
if wf_id is None:
......@@ -124,7 +127,10 @@ class WorkflowTool(BaseTool, OriginalWorkflowTool):
"""Test if given state_id is available for ob
in at least one associated workflow
"""
from Products.ERP5.InteractionWorkflow import InteractionWorkflowDefinition
if WITH_LEGACY_WORKFLOW:
from Products.ERP5.InteractionWorkflow import InteractionWorkflowDefinition
else:
InteractionWorkflowDefinition = None.__class__
from Products.ERP5Type.Core.InteractionWorkflow import InteractionWorkflow
for workflow in (wf_id and (self[wf_id],) or self.getWorkflowValueListFor(ob.getPortalType())):
if not isinstance(workflow, (InteractionWorkflowDefinition,
......@@ -639,10 +645,10 @@ if WITH_LEGACY_WORKFLOW:
deprecated('getWorkflowIds() is deprecated; use objectIds()')\
(lambda self: self.objectIds())
WorkflowTool.security.declarePrivate('getWorkflowIds')
WorkflowTool.getWorkflowById = \
deprecated('getWorkflowById() is deprecated')\
(lambda self, wf_id: self._getOb(wf_id, None))
WorkflowTool.security.declarePrivate('getWorkflowById')
# XXX We still use portal_workflow.getInfoFor, that calls WorkflowTool.getWorkflowById
WorkflowTool.getWorkflowById = lambda self, wf_id: self._getOb(wf_id, None)
WorkflowTool.security.declarePrivate('getWorkflowById')
InitializeClass(WorkflowTool)
......
......@@ -37,8 +37,7 @@ if WITH_LEGACY_WORKFLOW:
from Products.ERP5Type.patches import DCWorkflow
from Products.ERP5Type.patches import Worklists
from Products.ERP5Type.patches import BTreeFolder2
if WITH_LEGACY_WORKFLOW:
from Products.ERP5Type.patches import WorkflowTool
from Products.ERP5Type.patches import WorkflowTool
from Products.ERP5Type.patches import DynamicType
from Products.ERP5Type.patches import Expression
from Products.ERP5Type.patches import sqltest
......@@ -63,8 +62,7 @@ from Products.ERP5Type.patches import DateTimePatch
from Products.ERP5Type.patches import PythonScript
from Products.ERP5Type.patches import MailHost
from Products.ERP5Type.patches import memcache_client
if WITH_LEGACY_WORKFLOW:
from Products.ERP5Type.patches import StateChangeInfoPatch
from Products.ERP5Type.patches import StateChangeInfoPatch
from Products.ERP5Type.patches import transforms
from Products.ERP5Type.patches import OFSPdata
from Products.ERP5Type.patches import DemoStorage
......
......@@ -27,7 +27,6 @@
##############################################################################
from Products.ERP5Type import WITH_LEGACY_WORKFLOW
assert WITH_LEGACY_WORKFLOW
from Products.DCWorkflow.Expression import StateChangeInfo
from Products.PythonScripts.Utility import allow_class
......
......@@ -13,41 +13,7 @@
#
##############################################################################
from Products.ERP5Type import WITH_LEGACY_WORKFLOW
assert WITH_LEGACY_WORKFLOW
from zLOG import LOG, WARNING
from types import StringTypes
# Make sure Interaction Workflows are called even if method not wrapped
from AccessControl import ClassSecurityInfo, Unauthorized
from Products.ERP5Type.Globals import InitializeClass
from Products.CMFCore.WorkflowTool import WorkflowTool
from Products.CMFCore.WorkflowCore import ObjectDeleted
from Products.CMFCore.WorkflowCore import WorkflowException
from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
from Products.DCWorkflow.Transitions import TRIGGER_WORKFLOW_METHOD
from Products.DCWorkflow.utils import Message as _
from Products.DCWorkflow.Transitions import TRIGGER_USER_ACTION
from Products.CMFCore.utils import getToolByName
from Products.ZSQLCatalog.SQLCatalog import SimpleQuery, AutoQuery, ComplexQuery, NegatedQuery
from Products.CMFCore.utils import _getAuthenticatedUser
from Products.ERP5Type import Permissions
from Products.ERP5Type.Cache import CachingMethod
from Products.ERP5Type.Workflow import WorkflowHistoryList as NewWorkflowHistoryList
from sets import ImmutableSet
from Acquisition import aq_base
from Products.ERP5Type.Globals import PersistentMapping
from MySQLdb import ProgrammingError, OperationalError
from DateTime import DateTime
security = ClassSecurityInfo()
WorkflowTool.security = security
WORKLIST_METADATA_KEY = 'metadata'
SECURITY_PARAMETER_ID = 'local_roles'
class WorkflowHistoryList(NewWorkflowHistoryList):
......@@ -105,221 +71,256 @@ class WorkflowHistoryList(NewWorkflowHistoryList):
# BBB: A production instance used a temporary patch to speed up.
WorkflowHistoryBucketList = WorkflowHistoryList
def WorkflowTool_getChainDict(self):
"""Test if the given transition exist from the current state.
"""
chain_dict = {}
for portal_type, wf_id_list in self._chains_by_type.iteritems():
for wf_id in wf_id_list:
chain_dict.setdefault(wf_id, []).append(portal_type)
return chain_dict
security.declareProtected(Permissions.ManagePortal, 'getChainDict')
WorkflowTool.getChainDict = WorkflowTool_getChainDict
# Backward compatibility, as WorkflowMethod has been removed in CMFCore 2.2
from MethodObject import Method
class WorkflowMethod( Method ):
""" Wrap a method to workflow-enable it.
from Products.ERP5Type import WITH_LEGACY_WORKFLOW
if WITH_LEGACY_WORKFLOW:
import six
from zLOG import LOG, WARNING
from types import StringTypes
# Make sure Interaction Workflows are called even if method not wrapped
from AccessControl import ClassSecurityInfo, Unauthorized
from Products.ERP5Type.Globals import InitializeClass
from Products.CMFCore.WorkflowTool import WorkflowTool
from Products.CMFCore.WorkflowCore import ObjectDeleted
from Products.CMFCore.WorkflowCore import WorkflowException
from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
from Products.DCWorkflow.Transitions import TRIGGER_WORKFLOW_METHOD
from Products.DCWorkflow.utils import Message as _
from Products.DCWorkflow.Transitions import TRIGGER_USER_ACTION
from Products.CMFCore.utils import getToolByName
from Products.ZSQLCatalog.SQLCatalog import SimpleQuery, AutoQuery, ComplexQuery, NegatedQuery
from Products.CMFCore.utils import _getAuthenticatedUser
from Products.ERP5Type import Permissions
from Products.ERP5Type.Cache import CachingMethod
from sets import ImmutableSet
from Acquisition import aq_base
from Products.ERP5Type.Globals import PersistentMapping
from MySQLdb import ProgrammingError, OperationalError
from DateTime import DateTime
security = ClassSecurityInfo()
WorkflowTool.security = security
WORKLIST_METADATA_KEY = 'metadata'
SECURITY_PARAMETER_ID = 'local_roles'
def WorkflowTool_getChainDict(self):
"""Test if the given transition exist from the current state.
"""
chain_dict = {}
for portal_type, wf_id_list in six.iteritems(self._chains_by_type):
for wf_id in wf_id_list:
chain_dict.setdefault(wf_id, []).append(portal_type)
return chain_dict
security.declareProtected(Permissions.ManagePortal, 'getChainDict')
WorkflowTool.getChainDict = WorkflowTool_getChainDict
# Backward compatibility, as WorkflowMethod has been removed in CMFCore 2.2
from MethodObject import Method
class WorkflowMethod( Method ):
""" Wrap a method to workflow-enable it.
"""
_need__name__=1
def __init__(self, method, id=None, reindex=1):
self._m = method
if id is None:
id = method.__name__
self._id = id
# reindex ignored since workflows now perform the reindexing.
def __call__(self, instance, *args, **kw):
""" Invoke the wrapped method, and deal with the results.
"""
wf = getToolByName(instance, 'portal_workflow', None)
if wf is None or not hasattr(wf, 'wrapWorkflowMethod'):
# No workflow tool found.
try:
res = self._m(instance, *args, **kw)
except ObjectDeleted as ex:
res = ex.getResult()
else:
if hasattr(aq_base(instance), 'reindexObject'):
instance.reindexObject()
else:
res = wf.wrapWorkflowMethod(instance, self._id, self._m,
(instance,) + args, kw)
from Products.CMFCore import WorkflowCore
# BBB: WorkflowMethod has been removed from CMFCore 2
WorkflowCore.WorkflowAction = WorkflowMethod
# XXX: Kept here instead of ERP5Type.Tool.WorkflowTool because not used in
# erp5.git: is it used in projects?
security.declarePublic('canDoActionFor')
def canDoActionFor(self, ob, action, wf_id=None, guard_kw={}):
""" Check we can perform the given workflow action on 'ob'.
"""
_need__name__=1
def __init__(self, method, id=None, reindex=1):
self._m = method
if id is None:
id = method.__name__
self._id = id
# reindex ignored since workflows now perform the reindexing.
def __call__(self, instance, *args, **kw):
""" Invoke the wrapped method, and deal with the results.
"""
wf = getToolByName(instance, 'portal_workflow', None)
if wf is None or not hasattr(wf, 'wrapWorkflowMethod'):
# No workflow tool found.
try:
res = self._m(instance, *args, **kw)
except ObjectDeleted as ex:
res = ex.getResult()
else:
if hasattr(aq_base(instance), 'reindexObject'):
instance.reindexObject()
else:
res = wf.wrapWorkflowMethod(instance, self._id, self._m,
(instance,) + args, kw)
from Products.CMFCore import WorkflowCore
# BBB: WorkflowMethod has been removed from CMFCore 2
WorkflowCore.WorkflowAction = WorkflowMethod
# XXX: Kept here instead of ERP5Type.Tool.WorkflowTool because not used in
# erp5.git: is it used in projects?
security.declarePublic('canDoActionFor')
def canDoActionFor(self, ob, action, wf_id=None, guard_kw={}):
""" Check we can perform the given workflow action on 'ob'.
"""
if wf_id is None:
workflow_list = self.getWorkflowValueListFor(ob) or ()
else:
workflow = self._getOb(wf_id, None)
if workflow:
workflow_list = (workflow,)
if wf_id is None:
workflow_list = self.getWorkflowValueListFor(ob) or ()
else:
workflow_list = ()
for workflow in workflow_list:
state_definition = workflow._getWorkflowStateOf(ob)
if state_definition is not None:
if action in state_definition.transitions:
transition_definition = workflow.transitions.get(action, None)
if transition_definition is not None and \
transition_definition.trigger_type == TRIGGER_USER_ACTION:
return workflow._checkTransitionGuard(transition_definition, ob, **guard_kw)
raise WorkflowException(_(u"No workflow provides the '${action_id}' action.",
mapping={'action_id': action}))
WorkflowTool.canDoActionFor = canDoActionFor
security.declarePrivate('_listTypeInfo')
def _listTypeInfo(self):
""" List the portal types which are available.
workflow = self._getOb(wf_id, None)
if workflow:
workflow_list = (workflow,)
else:
workflow_list = ()
for workflow in workflow_list:
state_definition = workflow._getWorkflowStateOf(ob)
if state_definition is not None:
if action in state_definition.transitions:
transition_definition = workflow.transitions.get(action, None)
if transition_definition is not None and \
transition_definition.trigger_type == TRIGGER_USER_ACTION:
return workflow._checkTransitionGuard(transition_definition, ob, **guard_kw)
raise WorkflowException(_(u"No workflow provides the '${action_id}' action.",
mapping={'action_id': action}))
WorkflowTool.canDoActionFor = canDoActionFor
security.declarePrivate('_listTypeInfo')
def _listTypeInfo(self):
""" List the portal types which are available.
"""
# <patch>
ttool = getattr(self.getPortalObject(), "portal_types", None)
# </patch>
if ttool is not None:
return ttool.listTypeInfo()
return ()
WorkflowTool._listTypeInfo = _listTypeInfo
## From here on: migration/compatibility code DCWorkflow => ERP5 Workflow
# The following 2 functions are necessary for workflow tool dynamic migration
def WorkflowTool_isBootstrapRequired(self):
# migration is required if the tool is not the new one from ERP5 Workflow
# in case of old workflow tool, it acquires the portal type from ERP5 Site
return self.getPortalType() != "Workflow Tool"
def WorkflowTool_bootstrap(self):
"""
Migrate portal_workflow from CMFCore to ERP5 Workflow. Also migrate
Workflow Chains not defined anymore on portal_workflow but on the Portal
Type.
Like other Tools migrations (ERP5CatalogTool...), this is called at
startup by synchronizeDynamicModules(), thus Workflows are *not* migrated
from DCWorkflow to ERP5 Workflow (ERP5 Workflow portal_workflow can work
with both DCWorkflows and ERP5 Workflows), because this is not needed at
this stage (handled by bt5 upgrade) and avoid making bootstrap more
complicated than it already is.
"""
from Products.ERP5Type.Tool.WorkflowTool import WorkflowTool
if not isinstance(self, WorkflowTool):
LOG('WorkflowTool', 0, 'Migrating portal_workflow')
portal = self.getPortalObject()
# CMFCore portal_workflow -> ERP5 Workflow portal_workflow
from Products.ERP5.ERP5Site import addERP5Tool
addERP5Tool(portal, 'portal_workflow_new', 'Workflow Tool')
new_tool = portal._getOb("portal_workflow_new")
new_tool._chains_by_type = self._chains_by_type
for workflow_id, workflow in self.objectItems():
workflow_copy = workflow._getCopy(new_tool)
workflow_copy._setId(workflow_id)
new_tool._setObject(workflow_id, workflow_copy)
workflow_copy = new_tool._getOb(workflow_id)
workflow_copy._postCopy(new_tool, op=0)
workflow_copy.wl_clearLocks()
portal.portal_workflow = new_tool
portal.portal_workflow.id = 'portal_workflow'
portal._delObject('portal_workflow_new')
# Migrate Workflow Chains to Portal Types
if getattr(new_tool, '_chains_by_type', None) is not None:
new_tool.reassignWorkflowWithoutConversion()
WorkflowTool._isBootstrapRequired = WorkflowTool_isBootstrapRequired
WorkflowTool._bootstrap = WorkflowTool_bootstrap
WorkflowTool.getWorkflowValueListFor = WorkflowTool.getWorkflowsFor
def _deleteChainsByType(self, pt, wf_id):
self._chains_by_type[pt] = tuple(wf for wf in self._chains_by_type[pt] if wf!=wf_id)
def getChainsByType(self):
# XXX(WORKFLOW): compatibility code
# get old workflow tool's chains_by_type
if self._chains_by_type is None:
return {}
return self._chains_by_type.copy()
WorkflowTool.getChainsByType = getChainsByType
def reassignWorkflow(self, workflow_id):
# type-workflow reassignment
type_workflow_dict = self.getChainsByType()
type_tool = self.getPortalObject().portal_types
for portal_type_id in type_workflow_dict:
# getTypeInfo takes care of solver, whereas _getOb on Types Tool don't
portal_type = type_tool.getTypeInfo(portal_type_id)
if portal_type is not None and workflow_id in type_workflow_dict[portal_type_id]:
# 1. clean DC workflow tool "chain by type" assignement:
_deleteChainsByType(self, portal_type_id, workflow_id)
# 2. assign workflow to portal type:
type_workflow_list = portal_type.getTypeWorkflowList()
if workflow_id not in type_workflow_list:
portal_type.setTypeWorkflowList(
type_workflow_list + [workflow_id]
)
WorkflowTool.reassignWorkflow = reassignWorkflow
def reassignWorkflowWithoutConversion(self):
# This function should be called when a new template installed and add
# portal_types assignment with converted and non-converted workflows into
# portal_type's workflow list.
# This function only synchronize assignment information to new added portal
# type's TypeWorkflowList.
# To trigger this function, go to portal_workflow and choose "convert DCWorkflow"
# in the action list. Reassignment happens in the background. Nothing need to
# be done, just go back to whatever you are doing.
type_workflow_dict = self.getChainsByType()
type_tool = self.getPortalObject().portal_types
for portal_type_id in type_workflow_dict:
portal_type = type_tool.getTypeInfo(portal_type_id)
if portal_type is not None:
for workflow_id in type_workflow_dict[portal_type_id]:
workflow = getattr(self, workflow_id, None)
if (workflow is not None and
workflow.getPortalType() in ('Workflow',
'Interaction Workflow',
'DCWorkflowDefinition',
'InteractionWorkflowDefinition')):
# 1. clean DC workflow tool "chain by type" assignement:
_deleteChainsByType(self, portal_type.getId(), workflow.id)
# 2. assign workflow to portal type:
type_workflow_list = portal_type.getTypeWorkflowList()
if workflow_id not in type_workflow_list:
portal_type.setTypeWorkflowList(
type_workflow_list + [workflow_id]
)
WorkflowTool.reassignWorkflowWithoutConversion = reassignWorkflowWithoutConversion
WorkflowTool.security.declareProtected(Permissions.AccessContentsInformation,
'getWorkflowTempObjectList')
def getWorkflowTempObjectList(self, temp_object=1, **kw):
""" Return a list of converted temporary workflows. Only necessary in
Workflow Tool to get temporarilly converted DCWorkflow.
"""
# <patch>
ttool = getattr(self.getPortalObject(), "portal_types", None)
# </patch>
if ttool is not None:
return ttool.listTypeInfo()
return ()
WorkflowTool._listTypeInfo = _listTypeInfo
## From here on: migration/compatibility code DCWorkflow => ERP5 Workflow
# The following 2 functions are necessary for workflow tool dynamic migration
def WorkflowTool_isBootstrapRequired(self):
# migration is required if the tool is not the new one from ERP5 Workflow
# in case of old workflow tool, it acquires the portal type from ERP5 Site
return self.getPortalType() != "Workflow Tool"
def WorkflowTool_bootstrap(self):
"""
Migrate portal_workflow from CMFCore to ERP5 Workflow. Also migrate
Workflow Chains not defined anymore on portal_workflow but on the Portal
Type.
Like other Tools migrations (ERP5CatalogTool...), this is called at
startup by synchronizeDynamicModules(), thus Workflows are *not* migrated
from DCWorkflow to ERP5 Workflow (ERP5 Workflow portal_workflow can work
with both DCWorkflows and ERP5 Workflows), because this is not needed at
this stage (handled by bt5 upgrade) and avoid making bootstrap more
complicated than it already is.
"""
from Products.ERP5Type.Tool.WorkflowTool import WorkflowTool
if not isinstance(self, WorkflowTool):
LOG('WorkflowTool', 0, 'Migrating portal_workflow')
portal = self.getPortalObject()
# CMFCore portal_workflow -> ERP5 Workflow portal_workflow
from Products.ERP5.ERP5Site import addERP5Tool
addERP5Tool(portal, 'portal_workflow_new', 'Workflow Tool')
new_tool = portal._getOb("portal_workflow_new")
new_tool._chains_by_type = self._chains_by_type
for workflow_id, workflow in self.objectItems():
workflow_copy = workflow._getCopy(new_tool)
workflow_copy._setId(workflow_id)
new_tool._setObject(workflow_id, workflow_copy)
workflow_copy = new_tool._getOb(workflow_id)
workflow_copy._postCopy(new_tool, op=0)
workflow_copy.wl_clearLocks()
portal.portal_workflow = new_tool
portal.portal_workflow.id = 'portal_workflow'
portal._delObject('portal_workflow_new')
# Migrate Workflow Chains to Portal Types
if getattr(new_tool, '_chains_by_type', None) is not None:
new_tool.reassignWorkflowWithoutConversion()
WorkflowTool._isBootstrapRequired = WorkflowTool_isBootstrapRequired
WorkflowTool._bootstrap = WorkflowTool_bootstrap
WorkflowTool.getWorkflowValueListFor = WorkflowTool.getWorkflowsFor
def _deleteChainsByType(self, pt, wf_id):
self._chains_by_type[pt] = tuple(wf for wf in self._chains_by_type[pt] if wf!=wf_id)
def getChainsByType(self):
# XXX(WORKFLOW): compatibility code
# get old workflow tool's chains_by_type
if self._chains_by_type is None:
return {}
return self._chains_by_type.copy()
WorkflowTool.getChainsByType = getChainsByType
def reassignWorkflow(self, workflow_id):
# type-workflow reassignment
type_workflow_dict = self.getChainsByType()
type_tool = self.getPortalObject().portal_types
for portal_type_id in type_workflow_dict:
# getTypeInfo takes care of solver, whereas _getOb on Types Tool don't
portal_type = type_tool.getTypeInfo(portal_type_id)
if portal_type is not None and workflow_id in type_workflow_dict[portal_type_id]:
# 1. clean DC workflow tool "chain by type" assignement:
_deleteChainsByType(self, portal_type_id, workflow_id)
# 2. assign workflow to portal type:
type_workflow_list = portal_type.getTypeWorkflowList()
if workflow_id not in type_workflow_list:
portal_type.setTypeWorkflowList(
type_workflow_list + [workflow_id]
)
WorkflowTool.reassignWorkflow = reassignWorkflow
def reassignWorkflowWithoutConversion(self):
# This function should be called when a new template installed and add
# portal_types assignment with converted and non-converted workflows into
# portal_type's workflow list.
# This function only synchronize assignment information to new added portal
# type's TypeWorkflowList.
# To trigger this function, go to portal_workflow and choose "convert DCWorkflow"
# in the action list. Reassignment happens in the background. Nothing need to
# be done, just go back to whatever you are doing.
type_workflow_dict = self.getChainsByType()
type_tool = self.getPortalObject().portal_types
for portal_type_id in type_workflow_dict:
portal_type = type_tool.getTypeInfo(portal_type_id)
if portal_type is not None:
for workflow_id in type_workflow_dict[portal_type_id]:
workflow = getattr(self, workflow_id, None)
if (workflow is not None and
workflow.getPortalType() in ('Workflow',
'Interaction Workflow',
'DCWorkflowDefinition',
'InteractionWorkflowDefinition')):
# 1. clean DC workflow tool "chain by type" assignement:
_deleteChainsByType(self, portal_type.getId(), workflow.id)
# 2. assign workflow to portal type:
type_workflow_list = portal_type.getTypeWorkflowList()
if workflow_id not in type_workflow_list:
portal_type.setTypeWorkflowList(
type_workflow_list + [workflow_id]
)
WorkflowTool.reassignWorkflowWithoutConversion = reassignWorkflowWithoutConversion
WorkflowTool.security.declareProtected(Permissions.AccessContentsInformation,
'getWorkflowTempObjectList')
def getWorkflowTempObjectList(self, temp_object=1, **kw):
""" Return a list of converted temporary workflows. Only necessary in
Workflow Tool to get temporarilly converted DCWorkflow.
"""
temp_workflow_list = []
for dc_workflow in self.objectValues():
workflow_type = dc_workflow.__class__.__name__
if workflow_type in ['Workflow', 'Interaction Workflow']:
continue
temp_workflow = dc_workflow.convertToERP5Workflow(temp_object=temp_object)
temp_workflow_list.append(temp_workflow)
return temp_workflow_list
WorkflowTool.getWorkflowTempObjectList = getWorkflowTempObjectList
InitializeClass(WorkflowTool)
temp_workflow_list = []
for dc_workflow in self.objectValues():
workflow_type = dc_workflow.__class__.__name__
if workflow_type in ['Workflow', 'Interaction Workflow']:
continue
temp_workflow = dc_workflow.convertToERP5Workflow(temp_object=temp_object)
temp_workflow_list.append(temp_workflow)
return temp_workflow_list
WorkflowTool.getWorkflowTempObjectList = getWorkflowTempObjectList
InitializeClass(WorkflowTool)
......@@ -38,7 +38,6 @@ from Products.DCWorkflow.Transitions import TRIGGER_USER_ACTION
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type import Permissions
from Products.ERP5.InteractionWorkflow import InteractionWorkflowDefinition
from Products.ERP5Type.Base import Base
......
......@@ -28,6 +28,7 @@
import unittest
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type import WITH_LEGACY_WORKFLOW
from Testing import ZopeTestCase
class TestNamingConvention(ERP5TypeTestCase):
......@@ -65,7 +66,7 @@ class TestNamingConvention(ERP5TypeTestCase):
'erp5_promise', 'erp5_software_pdm', 'erp5_sso_openam', 'erp5_jquery_plugin_hotkey',
'erp5_jquery_plugin_jgraduate', 'erp5_jquery_plugin_svgicon', 'erp5_jquery_plugin_jquerybbq',
'erp5_jquery_plugin_spinbtn', 'erp5_jquery_plugin_svg_editor', 'erp5_svg_editor', 'erp5_syncml',
'erp5_system_event', 'erp5_tiosafe_core', 'erp5_workflow_test',
'erp5_system_event', 'erp5_tiosafe_core',
'erp5_public_accounting_budget', 'erp5_publication',
'erp5_social_contracts', 'test_core', 'test_accounting', 'test_web', 'test_html_style',
'test_xhtml_style', 'cloudooo_data', 'cloudooo_web', 'erp5_configurator',
......@@ -79,7 +80,7 @@ class TestNamingConvention(ERP5TypeTestCase):
# 'erp5_accounting_l10n_pl',
# 'erp5_accounting_l10n_sn',
# 'erp5_accounting_l10n_in',
)
) + (('erp5_workflow_test',) if WITH_LEGACY_WORKFLOW else ())
def getTitle(self):
return "Naming Convention"
......
......@@ -26,7 +26,9 @@
#
##############################################################################
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type import WITH_LEGACY_WORKFLOW
import StringIO
import unittest
import urllib
import httplib
......@@ -149,3 +151,9 @@ class TestUpgradeInstanceWithOldDataFs(ERP5TypeTestCase):
self.assertEquals(
error_list, [],
msg="The following Portal Type classes could not be loaded (see zLOG.log): %r" % error_list)
def test_suite():
suite = unittest.TestSuite()
if WITH_LEGACY_WORKFLOW:
suite.addTest(unittest.makeSuite(TestUpgradeInstanceWithOldDataFs))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment