Commit ffdb9a87 authored by Rafael Monnerat's avatar Rafael Monnerat

slapos_cloud: Add test for network_slap_interface_workflow

parent d00185f7
......@@ -24,189 +24,6 @@ from time import sleep
from zExceptions import Unauthorized
class TestComputerNetworkcreateMovement(SlapOSTestCaseMixin):
def _makeProject(self):
project = self.portal.project_module.newContent()
project.edit(reference="TESTPROJ-%s" % project.getId())
project.validate()
self.tic()
return project
def _makeComputerNetwork(self):
network = self.portal.computer_network_module.newContent()
network.edit(reference="TESTNET-%s" % network.getId())
network.validate()
self.tic()
return network
def _makeOrganisation(self):
organisation = self.portal.organisation_module.newContent()
organisation.edit(reference="TESTSITE-%s" % organisation.getId())
organisation.validate()
self.tic()
return organisation
def test_project(self):
network = self._makeComputerNetwork()
source_administrator = self.makePerson(user=1)
network.setSourceAdministrationValue(source_administrator)
project = self._makeProject()
other_project = self._makeProject()
self.tic()
self.login(source_administrator.getUserId())
self.assertEqual(network.Item_getCurrentProjectValue(), None)
self.assertEqual(network.Item_getCurrentOwnerValue(), None)
# Place in a project
self.assertEqual(network.ComputerNetwork_createMovement(
destination_project=project.getRelativeUrl()), None)
self.tic()
self.assertEqual(network.Item_getCurrentProjectValue(), project)
self.assertEqual(network.Item_getCurrentOwnerValue(), source_administrator)
self.assertEqual(1,
len(network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
self.login(source_administrator.getUserId())
# We don't remove from Project if destination project is not provided
self.assertEqual(network.ComputerNetwork_createMovement(), None)
self.tic()
self.assertEqual(network.Item_getCurrentProjectValue(), project)
self.assertEqual(network.Item_getCurrentOwnerValue(), source_administrator)
self.assertEqual(2,
len(network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
# Ensure that we don't have 2 new Internal Packing lists in the same second
sleep(1)
# Place in another project
self.assertEqual(network.ComputerNetwork_createMovement(
destination_project=other_project.getRelativeUrl()), None)
self.tic()
self.assertEqual(network.Item_getCurrentProjectValue(), other_project)
self.assertEqual(network.Item_getCurrentOwnerValue(), source_administrator)
self.assertEqual(3,
len(network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
self.login(source_administrator.getUserId())
# Ensure that we don't have 2 new Internal Packing lists in the same second
sleep(1)
# We don't remove from Project if destination project is not provided
self.assertEqual(network.ComputerNetwork_createMovement(), None)
self.tic()
self.assertEqual(network.Item_getCurrentProjectValue(), other_project)
self.assertEqual(network.Item_getCurrentOwnerValue(), source_administrator)
self.assertEqual(4,
len(network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
def test_owner(self):
network = self._makeComputerNetwork()
source_administrator = self.makePerson(user=1)
network.setSourceAdministrationValue(source_administrator)
organisation = self._makeOrganisation()
other_organisation = self._makeOrganisation()
self.tic()
self.login(source_administrator.getUserId())
self.assertEqual(network.Item_getCurrentProjectValue(), None)
self.assertEqual(network.Item_getCurrentOwnerValue(), None)
self.assertEqual(network.ComputerNetwork_createMovement(
destination_section=organisation.getRelativeUrl()), None)
self.tic()
self.assertEqual(network.Item_getCurrentProjectValue(), None)
self.assertEqual(network.Item_getCurrentOwnerValue(), organisation)
self.assertEqual(1,
len(network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
self.login(source_administrator.getUserId())
# Ensure that we don't have 2 new Internal Packing lists in the same second
sleep(1)
# We don't remove from Project if destination project is not provided
self.assertEqual(network.ComputerNetwork_createMovement(), None)
self.tic()
self.assertEqual(network.Item_getCurrentProjectValue(), None)
self.assertEqual(network.Item_getCurrentOwnerValue(), organisation)
# Ensure that we don't have 2 new Internal Packing lists in the same second
sleep(1)
# Place in another project
self.assertEqual(network.ComputerNetwork_createMovement(
destination_section=other_organisation.getRelativeUrl()), None)
self.tic()
self.assertEqual(3,
len(network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
self.assertEqual(network.Item_getCurrentProjectValue(), None)
self.assertEqual(network.Item_getCurrentOwnerValue(), other_organisation)
self.assertEqual(3,
len(network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
self.login(source_administrator.getUserId())
# Ensure that we don't have 2 new Internal Packing lists in the same second
sleep(1)
# We don't remove from Project if destination project is not provided
self.assertEqual(network.ComputerNetwork_createMovement(), None)
self.tic()
self.assertEqual(network.Item_getCurrentProjectValue(), None)
self.assertEqual(network.Item_getCurrentOwnerValue(), other_organisation)
self.assertEqual(4,
len(network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
def testUnauthorized(self):
network = self._makeComputerNetwork()
self.assertRaises(Unauthorized, network.ComputerNetwork_createMovement)
source_administrator = self.makePerson(user=1)
self.assertEqual(1 , len(source_administrator.objectValues( portal_type="ERP5 Login")))
self.login(source_administrator.getUserId())
self.assertRaises(Unauthorized, network.ComputerNetwork_createMovement)
self.login()
other_user = self.makePerson(user=1)
self.assertEqual(1 , len(other_user.objectValues(portal_type="ERP5 Login")))
network.setSourceAdministrationValue(source_administrator)
self.tic()
self.assertRaises(Unauthorized, network.ComputerNetwork_createMovement)
self.login(other_user.getUserId())
self.assertRaises(Unauthorized, network.ComputerNetwork_createMovement)
self.login(source_administrator.getUserId())
self.assertEqual(network.ComputerNetwork_createMovement(), None)
class TestComputeNodecreateMovement(SlapOSTestCaseMixin):
def _makeComputeNode(self, owner=None, allocation_scope='open/public'):
......
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2002-2012 Nexedi SA and Contributors. All Rights Reserved.
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from erp5.component.test.SlapOSTestCaseMixin import SlapOSTestCaseMixin
from time import sleep
from zExceptions import Unauthorized
import transaction
class TestSlapOSCoreNetworkSlapInterfaceWorkflow(SlapOSTestCaseMixin):
def afterSetUp(self):
SlapOSTestCaseMixin.afterSetUp(self)
portal = self.getPortalObject()
person_user = self.makePerson()
self.tic()
# Login as new user
self.login(person_user.getUserId())
new_person = self.portal.portal_membership.getAuthenticatedMember().getUserValue()
self.assertEqual(person_user.getRelativeUrl(), new_person.getRelativeUrl())
self.network = portal.computer_network_module.newContent(
portal_type="Computer Network"
)
self.tic()
self.assertEqual(
self.network.getReference(), None)
def beforeTearDown(self):
transaction.abort()
def test_network_approveRegistration_with_reference(self):
reference = "TEST-%s" % self.generateNewId()
self.network.setReference(reference)
self.network.approveRegistration()
self.assertEqual(self.network.getReference(), reference)
def test_organisation_approveRegistration_already_validated(self):
# Login as admin since user cannot re-approve a validated organisation
self.login()
self.network.setReference(None)
self.network.validate()
# Don't raise if network is validated
self.assertEqual(self.network.approveRegistration(), None)
def _makeProject(self):
project = self.portal.project_module.newContent()
project.edit(reference="TESTPROJ-%s" % project.getId())
project.validate()
self.tic()
return project
def _makeOrganisation(self):
organisation = self.portal.organisation_module.newContent()
organisation.edit(reference="TESTSITE-%s" % organisation.getId())
organisation.validate()
self.tic()
return organisation
def test_ComputerNetwork_requestTransfer_project(self):
source_administrator = self.portal.portal_membership.getAuthenticatedMember().getUserValue()
self.network.setSourceAdministrationValue(source_administrator)
self.login()
self.network.approveRegistration()
project = self._makeProject()
other_project = self._makeProject()
self.tic()
self.login(source_administrator.getUserId())
self.assertEqual(self.network.Item_getCurrentProjectValue(), None)
self.assertEqual(self.network.Item_getCurrentOwnerValue(), None)
# Place in a project
self.network.requestTransfer(
destination_section=None,
destination_project=project.getRelativeUrl())
self.tic()
self.assertEqual(self.network.Item_getCurrentProjectValue(), project)
self.assertEqual(self.network.Item_getCurrentOwnerValue(), source_administrator)
self.assertEqual(1,
len(self.network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
self.login(source_administrator.getUserId())
# We don't remove from Project if destination project is not provided
self.network.requestTransfer(
destination_project=None,
destination_section=None
)
self.tic()
self.assertEqual(self.network.Item_getCurrentProjectValue(), project)
self.assertEqual(self.network.Item_getCurrentOwnerValue(), source_administrator)
self.assertEqual(2,
len(self.network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
# Ensure that we don't have 2 new Internal Packing lists in the same second
sleep(1)
# Place in another project
self.network.requestTransfer(
destination_section=None,
destination_project=other_project.getRelativeUrl())
self.tic()
self.assertEqual(self.network.Item_getCurrentProjectValue(), other_project)
self.assertEqual(self.network.Item_getCurrentOwnerValue(), source_administrator)
self.assertEqual(3,
len(self.network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
self.login(source_administrator.getUserId())
# Ensure that we don't have 2 new Internal Packing lists in the same second
sleep(1)
# We don't remove from Project if destination project is not provided
self.network.requestTransfer(
destination_project=None,
destination_section=None
)
self.tic()
self.assertEqual(self.network.Item_getCurrentProjectValue(), other_project)
self.assertEqual(self.network.Item_getCurrentOwnerValue(), source_administrator)
self.assertEqual(4,
len(self.network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
def test_ComputerNetwork_requestTransfer_owner(self):
source_administrator = self.portal.portal_membership.getAuthenticatedMember().getUserValue()
self.network.setSourceAdministrationValue(source_administrator)
self.login()
self.network.approveRegistration()
organisation = self._makeOrganisation()
other_organisation = self._makeOrganisation()
self.tic()
self.login(source_administrator.getUserId())
self.assertEqual(self.network.Item_getCurrentProjectValue(), None)
self.assertEqual(self.network.Item_getCurrentOwnerValue(), None)
self.network.requestTransfer(
destination_project=None,
destination_section=organisation.getRelativeUrl())
self.tic()
self.assertEqual(self.network.Item_getCurrentProjectValue(), None)
self.assertEqual(self.network.Item_getCurrentOwnerValue(), organisation)
self.assertEqual(1,
len(self.network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
self.login(source_administrator.getUserId())
# Ensure that we don't have 2 new Internal Packing lists in the same second
sleep(1)
# We don't remove from Project if destination project is not provided
self.network.requestTransfer(
destination_project=None,
destination_section=None)
self.tic()
self.assertEqual(self.network.Item_getCurrentProjectValue(), None)
self.assertEqual(self.network.Item_getCurrentOwnerValue(), organisation)
# Ensure that we don't have 2 new Internal Packing lists in the same second
sleep(1)
# Place in another project
self.network.requestTransfer(
destination_project=None,
destination_section=other_organisation.getRelativeUrl())
self.tic()
self.assertEqual(3,
len(self.network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
self.assertEqual(self.network.Item_getCurrentProjectValue(), None)
self.assertEqual(self.network.Item_getCurrentOwnerValue(), other_organisation)
self.assertEqual(3,
len(self.network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
self.login(source_administrator.getUserId())
# Ensure that we don't have 2 new Internal Packing lists in the same second
sleep(1)
# We don't remove from Project if destination project is not provided
self.network.requestTransfer(
destination_project=None,
destination_section=None
)
self.tic()
self.assertEqual(self.network.Item_getCurrentProjectValue(), None)
self.assertEqual(self.network.Item_getCurrentOwnerValue(), other_organisation)
self.assertEqual(4,
len(self.network.getAggregateRelatedList(portal_type="Internal Packing List Line"))
)
def test_ComputerNetwork_requestTransfer_Unauthorized(self):
self.network.approveRegistration()
self.login()
self.assertRaises(Unauthorized, self.network.requestTransfer)
source_administrator = self.makePerson(user=1)
self.assertEqual(1 , len(source_administrator.objectValues( portal_type="ERP5 Login")))
self.login(source_administrator.getUserId())
self.assertRaises(Unauthorized, self.network.requestTransfer)
self.login()
other_user = self.makePerson(user=1)
self.assertEqual(1 , len(other_user.objectValues(portal_type="ERP5 Login")))
self.network.setSourceAdministrationValue(source_administrator)
self.tic()
self.assertRaises(Unauthorized, self.network.requestTransfer)
self.login(other_user.getUserId())
self.assertRaises(Unauthorized, self.network.requestTransfer)
self.login(source_administrator.getUserId())
self.network.requestTransfer(
destination_project=None,
destination_section=None
)
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Test Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testSlapOSCloudNetworkSlapInterfaceWorkflow</string> </value>
</item>
<item>
<key> <string>default_source_reference</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testSlapOSCloudNetworkSlapInterfaceWorkflow</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Test Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.Workflow"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_log</string> </key>
<value>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>error_message</string> </key>
<value>
<list>
<persistent> <string encoding="base64">AAAAAAAAAAU=</string> </persistent>
</list>
</value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="5" aka="AAAAAAAAAAU=">
<pickle>
<global name="Message" module="Products.ERP5Type.Message"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>default</string> </key>
<value> <string>ID is invalid, should be \'${id_prefix}.VERSION.REFERENCE\'</string> </value>
</item>
<item>
<key> <string>domain</string> </key>
<value> <string>erp5_ui</string> </value>
</item>
<item>
<key> <string>mapping</string> </key>
<value>
<dictionary>
<item>
<key> <string>id_prefix</string> </key>
<value> <string>test</string> </value>
</item>
</dictionary>
</value>
</item>
<item>
<key> <string>message</string> </key>
<value> <string>ID is invalid, should be \'${id_prefix}.VERSION.REFERENCE\'</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -10,4 +10,7 @@ if computer_network.getReference() in [None, ""]:
computer_network.setReference(reference)
if computer_network.getValidationState() != "draft":
return
computer_network.validate()
......@@ -7,6 +7,10 @@ portal = computer_network.getPortalObject()
# Get required arguments
kwargs = state_change.kwargs
user = portal.portal_membership.getAuthenticatedMember().getUserValue()
if user is None or user.getRelativeUrl() != computer_network.getSourceAdministration():
raise Unauthorized("Only the Computer Network owner can transfer it from one location to another.")
# Required args
# Raise TypeError if all parameters are not provided
try:
......@@ -16,10 +20,6 @@ try:
except KeyError:
raise TypeError("ComputerNetwork_requestTransfer takes exactly 2 arguments")
user = portal.portal_membership.getAuthenticatedMember().getUserValue()
if user is None or user.getRelativeUrl() != computer_network.getSourceAdministration():
raise Unauthorized("Only the Computer Network owner can transfer it from one location to another.")
tag = "%s_%s_%s_inProgress" % (computer_network.getUid(), destination_section, destination_project)
if (portal.portal_activities.countMessageWithTag(tag) > 0):
# The software instance is already under creation but can not be fetched from catalog
......
......@@ -3,6 +3,7 @@ test.erp5.testSlapOSCloudAllocationAlarm
test.erp5.testSlapOSCloudComputePartitionSlapInterfaceWorkflow
test.erp5.testSlapOSCloudInteractionWorkflow
test.erp5.testSlapOSCloudPersonSlapInterfaceWorkflow
test.erp5.testSlapOSCloudNetworkSlapInterfaceWorkflow
test.erp5.testSlapOSCloudComputeNodeSlapInterfaceWorkflow
test.erp5.testSlapOSCloudInstanceSlapInterfaceWorkflow
test.erp5.testSlapOSCloudProjectSlapInterfaceWorkflow
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment