Commit 8d31c98b authored by Alain Takoudjou's avatar Alain Takoudjou Committed by Rafael Monnerat

Test script for Alarm checkAndUpdateAllocationScope and H_S partial allocation instances

parent c2f92080
......@@ -1782,3 +1782,177 @@ class TestSlapOSRegularisationRequest_deleteHostingSubscriptionList(
self.assertFalse(result)
self.tic()
class TestSlapOSComputer_notifyWrongAllocationScope(testSlapOSMixin):
def beforeTearDown(self):
transaction.abort()
def afterSetUp(self):
super(TestSlapOSComputer_notifyWrongAllocationScope, self).afterSetUp()
self.new_id = self.generateNewId()
self._cancelTestSupportRequestList()
def _cancelTestSupportRequestList(self):
for support_request in self.portal.portal_catalog(
portal_type="Support Request",
title="Allocation scope has been changed for TESTCOMPT%",
simulation_state="suspended"):
support_request.invalidate()
self.tic()
def _makeComputer(self,new_id):
# Clone computer document
person = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
computer = self.portal.computer_module\
.template_computer.Base_createCloneDocument(batch_mode=1)
computer.edit(
title="computer ticket %s" % (new_id, ),
reference="TESTCOMPT-%s" % (new_id, ),
source_administration_value=person
)
computer.validate()
return computer
def _makePerson(self, new_id):
# Clone computer document
person = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
person.edit(reference='TESTPERSON-%s' % (new_id, ))
person.immediateReindexObject()
return person
def _updatePersonAssignment(self, person, role='role/member'):
for assignment in person.contentValues(portal_type="Assignment"):
assignment.cancel()
assignment = person.newContent(portal_type='Assignment')
assignment.setRole(role)
assignment.setStartDate(DateTime())
assignment.open()
return assignment
def _getGeneratedSupportRequest(self, computer):
request_title = 'Allocation scope has been changed for %s' % computer.getReference()
support_request = self.portal.portal_catalog.getResultValue(
portal_type = 'Support Request',
title = request_title,
simulation_state = 'suspended',
source_project_uid = computer.getUid()
)
return support_request
def _makeNotificationMessage(self, reference):
notification_message = self.portal.notification_message_module.newContent(
portal_type="Notification Message",
title='We have changed allocation scope for %s' % reference,
text_content='Test NM content<br/>%s<br/>' % reference,
content_type='text/html',
)
return notification_message.getRelativeUrl()
@simulate('NotificationTool_getDocumentValue',
'reference=None',
'assert reference == "slapos-crm-computer_allocation_scope.notification"\n' \
'return context.restrictedTraverse(' \
'context.REQUEST["test_computerNotAllowedAllocationScope_OpenPublic"])')
@simulate('SupportRequest_trySendNotificationMessage',
'message_title, message, source_relative_url',
'context.portal_workflow.doActionFor(' \
'context, action="edit_action", ' \
'comment="Visited by SupportRequest_trySendNotificationMessage ' \
'%s %s %s" % (message_title, message, source_relative_url))')
def test_computerNotAllowedAllocationScope_OpenPublic(self):
new_id = self.generateNewId()
computer = self._makeComputer(self.new_id)
person = computer.getSourceAdministrationValue()
self._updatePersonAssignment(person, 'role/member')
self.portal.REQUEST['test_computerNotAllowedAllocationScope_OpenPublic'] = \
self._makeNotificationMessage(computer.getReference())
computer.edit(allocation_scope='open/public')
computer.Computer_checkAndUpdateAllocationScope()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
ticket = self._getGeneratedSupportRequest(computer)
self.assertEquals(ticket.getSimulationState(), 'suspended')
self.assertEqual('Visited by SupportRequest_trySendNotificationMessage ' \
'%s %s %s' % \
('We have changed allocation scope for %s' % computer.getReference(),
'Test NM content\n%s\n' % computer.getReference(), person.getRelativeUrl()),
ticket.workflow_history['edit_workflow'][-1]['comment'])
@simulate('NotificationTool_getDocumentValue',
'reference=None',
'assert reference == "slapos-crm-computer_allocation_scope.notification"\n' \
'return context.restrictedTraverse(' \
'context.REQUEST["test_computerNotAllowedAllocationScope_OpenFriend"])')
@simulate('SupportRequest_trySendNotificationMessage',
'message_title, message, source_relative_url',
'context.portal_workflow.doActionFor(' \
'context, action="edit_action", ' \
'comment="Visited by SupportRequest_trySendNotificationMessage ' \
'%s %s %s" % (message_title, message, source_relative_url))')
def test_computerNotAllowedAllocationScope_OpenFriend(self):
new_id = self.generateNewId()
computer = self._makeComputer(self.new_id)
person = computer.getSourceAdministrationValue()
self._updatePersonAssignment(person, 'role/member')
self.portal.REQUEST['test_computerNotAllowedAllocationScope_OpenFriend'] = \
self._makeNotificationMessage(computer.getReference())
friend_person = self._makePerson(self.generateNewId())
computer.edit(allocation_scope='open/friend',
destination_section=friend_person.getRelativeUrl())
computer.Computer_checkAndUpdateAllocationScope()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
ticket = self._getGeneratedSupportRequest(computer)
self.assertEquals(ticket.getSimulationState(), 'suspended')
self.assertEqual('Visited by SupportRequest_trySendNotificationMessage ' \
'%s %s %s' % \
('We have changed allocation scope for %s' % computer.getReference(),
'Test NM content\n%s\n' % computer.getReference(), person.getRelativeUrl()),
ticket.workflow_history['edit_workflow'][-1]['comment'])
def test_computerNormalAllocationScope_OpenPersonal(self):
computer = self._makeComputer(self.new_id)
person = computer.getSourceAdministrationValue()
self._updatePersonAssignment(person, 'role/member')
computer.edit(allocation_scope='open/personal')
computer.Computer_checkAndUpdateAllocationScope()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
def test_computerAllowedAllocationScope_OpenPublic(self):
computer = self._makeComputer(self.new_id)
person = computer.getSourceAdministrationValue()
self._updatePersonAssignment(person, 'role/service_provider')
computer.edit(allocation_scope='open/public')
computer.Computer_checkAndUpdateAllocationScope()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/public')
def test_computerAllowedAllocationScope_OpenFriend(self):
computer = self._makeComputer(self.new_id)
person = computer.getSourceAdministrationValue()
self._updatePersonAssignment(person, 'role/service_provider')
friend_person = self._makePerson(self.generateNewId())
computer.edit(allocation_scope='open/friend',
destination_section=friend_person.getRelativeUrl())
computer.Computer_checkAndUpdateAllocationScope()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/friend')
\ No newline at end of file
......@@ -693,47 +693,101 @@ portal_workflow.doActionFor(context, action='edit_action', comment='Visited by S
self.assertEqual('Visited by SoftwareRelease_testForAllocation',
software_release.workflow_history['edit_workflow'][-1]['comment'])
def test_Alarm_notAllowedAllocationScope(self):
def _simulateComputer_checkAndUpdateAllocationScope(self):
script_name = 'Computer_checkAndUpdateAllocationScope'
if script_name in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_name)
createZODBPythonScript(self.portal.portal_skins.custom,
script_name,
'*args, **kw',
'# Script body\n'
"""portal_workflow = context.portal_workflow
portal_workflow.doActionFor(context, action='edit_action', comment='Visited by Computer_checkAndUpdateAllocationScope') """ )
transaction.commit()
def _dropComputer_checkAndUpdateAllocationScope(self):
script_name = 'Computer_checkAndUpdateAllocationScope'
if script_name in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
def test_Alarm_notAllowedAllocationScope_OpenPublic(self):
computer = self._makeComputer(self.new_id)
person = computer.getSourceAdministrationValue()
self._updatePersonAssignment(person, 'role/member')
computer.edit(allocation_scope = 'open/public')
computer.edit(allocation_scope='open/public')
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
self._simulateComputer_checkAndUpdateAllocationScope()
computer.edit(allocation_scope='open/personal')
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
try:
self.portal.portal_alarms.slapos_crm_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
finally:
self._dropComputer_checkAndUpdateAllocationScope()
friend_person = self._makePerson(self.new_id)
computer.edit(allocation_scope='open/friend',
destination_section=friend_person.getRelativeUrl())
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
self.assertEqual('Visited by Computer_checkAndUpdateAllocationScope',
computer.workflow_history['edit_workflow'][-1]['comment'])
def test_Alarm_allowedAllocationScope(self):
def test_Alarm_notAllowedAllocationScope_OpenFriend(self):
computer = self._makeComputer(self.new_id)
person = computer.getSourceAdministrationValue()
self._updatePersonAssignment(person, 'role/service_provider')
computer.edit(allocation_scope = 'open/friend')
self._simulateComputer_checkAndUpdateAllocationScope()
computer.edit(allocation_scope='open/public')
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
try:
self.portal.portal_alarms.slapos_crm_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/public')
finally:
self._dropComputer_checkAndUpdateAllocationScope()
self.assertEqual('Visited by Computer_checkAndUpdateAllocationScope',
computer.workflow_history['edit_workflow'][-1]['comment'])
def test_Alarm_notAllowedAllocationScope_OpenPersonal(self):
computer = self._makeComputer(self.new_id)
computer.edit(allocation_scope = 'open/personal')
computer.edit(allocation_scope='open/personal')
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
self._simulateComputer_checkAndUpdateAllocationScope()
try:
self.portal.portal_alarms.slapos_crm_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
finally:
self._dropComputer_checkAndUpdateAllocationScope()
self.assertNotEqual('Visited by Computer_checkAndUpdateAllocationScope',
computer.workflow_history['edit_workflow'][-1]['comment'])
def _simulateHostingSubscription_checkSofwareInstanceState(self):
script_name = 'HostingSubscription_checkSofwareInstanceState'
if script_name in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_name)
createZODBPythonScript(self.portal.portal_skins.custom,
script_name,
'*args, **kw',
'# Script body\n'
"""portal_workflow = context.portal_workflow
portal_workflow.doActionFor(context, action='edit_action', comment='Visited by HostingSubscription_checkSofwareInstanceState') """ )
transaction.commit()
friend_person = self._makePerson(self.new_id)
computer.edit(allocation_scope='open/friend',
destination_section=friend_person.getRelativeUrl())
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
def _dropHostingSubscription_checkSofwareInstanceState(self):
script_name = 'HostingSubscription_checkSofwareInstanceState'
if script_name in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
def test_Alarm_findAndNofitiyUnallocatedSoftwareInstance(self):
host_sub = self._makeHostingSubscription(self.new_id)
self._makeSoftwareInstance(host_sub, self.generateNewSoftwareReleaseUrl())
instance = host_sub.getPredecessorValue()
self.assertEqual(instance.getAggregate(""), "")
self._simulateHostingSubscription_checkSofwareInstanceState()
try:
self.portal.portal_alarms.slapos_crm_check_partially_allocated_instance.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/friend')
finally:
self._dropHostingSubscription_checkSofwareInstanceState()
self.assertEqual('Visited by HostingSubscription_checkSofwareInstanceState',
host_sub.workflow_history['edit_workflow'][-1]['comment'])
\ No newline at end of file
35
\ No newline at end of file
36
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment