Commit 27cabb78 authored by Alain Takoudjou's avatar Alain Takoudjou Committed by Rafael Monnerat

Alarm for check and change wrong allocation scope on public and friend computer

parent 141b611d
# Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
from Products.SlapOS.tests.testSlapOSMixin import testSlapOSMixin
from AccessControl import getSecurityManager
from DateTime import DateTime
import transaction
class TestAllocationScope(testSlapOSMixin):
def afterSetUp(self):
super(TestAllocationScope, self).afterSetUp()
self.user_id = getSecurityManager().getUser().getId()
def beforeTearDown(self):
transaction.abort()
def generateNewId(self):
return self.getPortalObject().portal_ids.generateNewId(
id_group=('slapos_core_test'))
def test_GroupCompany(self):
computer = self.portal.computer_module.newContent(portal_type='Computer')
computer.updateLocalRolesOnSecurityGroups()
self.assertSecurityGroup(computer,
['G-COMPANY', self.user_id], False)
self.assertRoles(computer, 'G-COMPANY', ['Assignor'])
self.assertRoles(computer, self.user_id, ['Owner'])
def test_ComputerAgent(self):
reference = 'TESTPERSON-%s' % self.generateNewId()
person = self.portal.person_module.newContent(portal_type='Person',
reference=reference)
computer = self.portal.computer_module.newContent(portal_type='Computer',
source_administration=person.getRelativeUrl())
computer.updateLocalRolesOnSecurityGroups()
self.assertSecurityGroup(computer,
[self.user_id, 'G-COMPANY', reference], False)
self.assertRoles(computer, reference, ['Assignee'])
self.assertRoles(computer, self.user_id, ['Owner'])
def test_NotAllowedAllocationScope(self):
reference = 'TESTPERSON-%s' % self.generateNewId()
computer_ref = 'TESTCOMP-%s' % self.generateNewId()
person = self.portal.person_module.newContent(portal_type='Person',
reference=reference)
assignment = person.newContent(portal_type='Assignment')
assignment.setRole('role/member')
assignment.setStartDate(DateTime())
assignment.open()
assignment.updateLocalRolesOnSecurityGroups()
computer = self.portal.computer_module.newContent(portal_type='Computer',
source_administration=person.getRelativeUrl(),
reference=computer_ref)
# open/public
computer.edit(allocation_scope='open/public')
computer.updateLocalRolesOnSecurityGroups()
computer.Computer_checkAndUpdateAllocationScope()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
# open/personal
computer.edit(allocation_scope='open/personal',
source_administration=person.getRelativeUrl()
)
computer.updateLocalRolesOnSecurityGroups()
computer.Computer_checkAndUpdateAllocationScope()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
# open/friend
friend_reference = 'TESTPERSON-%s' % self.generateNewId()
friend_person = self.portal.person_module.newContent(portal_type='Person',
reference=friend_reference)
computer.edit(allocation_scope='open/friend',
destination_section=friend_person.getRelativeUrl()
)
computer.updateLocalRolesOnSecurityGroups()
computer.Computer_checkAndUpdateAllocationScope()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
def test_AllowedAllocationScope(self):
# Test computer allocation scope with 'service_provider' user assignment
reference = 'TESTPERSON-%s' % self.generateNewId()
person = self.portal.person_module.newContent(portal_type='Person',
reference=reference)
assignment = person.newContent(portal_type='Assignment')
assignment.setRole('role/service_provider')
assignment.setStartDate(DateTime())
assignment.open()
assignment.updateLocalRolesOnSecurityGroups()
computer = self.portal.computer_module.newContent(portal_type='Computer',
source_administration=person.getRelativeUrl())
# open/public
computer.edit(allocation_scope='open/public')
computer.updateLocalRolesOnSecurityGroups()
computer.Computer_checkAndUpdateAllocationScope()
self.assertEquals(computer.getAllocationScope(), 'open/public')
# open/personal
computer.edit(allocation_scope='open/personal',
source_administration=person.getRelativeUrl()
)
computer.updateLocalRolesOnSecurityGroups()
computer.Computer_checkAndUpdateAllocationScope()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
# open/friend
friend_reference = 'TESTPERSON-%s' % self.generateNewId()
friend_person = self.portal.person_module.newContent(portal_type='Person',
reference=friend_reference)
computer.edit(allocation_scope='open/friend',
destination_section=friend_person.getRelativeUrl()
)
computer.updateLocalRolesOnSecurityGroups()
computer.Computer_checkAndUpdateAllocationScope()
self.assertEquals(computer.getAllocationScope(), 'open/friend')
\ No newline at end of file
...@@ -53,11 +53,6 @@ ...@@ -53,11 +53,6 @@
<value> <string>from DateTime import DateTime\n <value> <string>from DateTime import DateTime\n
\n \n
portal = context.getPortalObject()\n portal = context.getPortalObject()\n
\n
if portal.ERP5Site_isSupportRequestCreationClosed():\n
# Stop ticket creation\n
return\n
\n
source_project_value = portal.restrictedTraverse(source_relative_url)\n source_project_value = portal.restrictedTraverse(source_relative_url)\n
\n \n
if source_project_value.getPortalType() == "Computer":\n if source_project_value.getPortalType() == "Computer":\n
...@@ -67,6 +62,10 @@ elif source_project_value.getPortalType() == "Software Instance":\n ...@@ -67,6 +62,10 @@ elif source_project_value.getPortalType() == "Software Instance":\n
else:\n else:\n
destination_decision = None\n destination_decision = None\n
\n \n
if portal.ERP5Site_isSupportRequestCreationClosed(destination_decision):\n
# Stop ticket creation\n
return\n
\n
support_request_in_progress = portal.portal_catalog.getResultValue(\n support_request_in_progress = portal.portal_catalog.getResultValue(\n
portal_type = \'Support Request\',\n portal_type = \'Support Request\',\n
title = title,\n title = title,\n
......
...@@ -57,6 +57,7 @@ from DateTime import DateTime\n ...@@ -57,6 +57,7 @@ from DateTime import DateTime\n
computer = context\n computer = context\n
portal = context.getPortalObject()\n portal = context.getPortalObject()\n
allocation_scope = computer.getAllocationScope()\n allocation_scope = computer.getAllocationScope()\n
computer_reference = computer.getReference()\n
\n \n
if allocation_scope != \'open/public\' and \\\n if allocation_scope != \'open/public\' and \\\n
allocation_scope != \'open/friend\':\n allocation_scope != \'open/friend\':\n
...@@ -66,7 +67,6 @@ is_service_provider = False\n ...@@ -66,7 +67,6 @@ is_service_provider = False\n
person = computer.getSourceAdministrationValue(portal_type="Person")\n person = computer.getSourceAdministrationValue(portal_type="Person")\n
\n \n
for assignment in person.contentValues(portal_type="Assignment"):\n for assignment in person.contentValues(portal_type="Assignment"):\n
assignment = assignment.getObject()\n
if assignment.getRole() == \'service_provider\':\n if assignment.getRole() == \'service_provider\':\n
is_service_provider = True\n is_service_provider = True\n
break\n break\n
...@@ -75,67 +75,64 @@ if not is_service_provider:\n ...@@ -75,67 +75,64 @@ if not is_service_provider:\n
#Turn this computer allocation scope to \'open/personal\'\n #Turn this computer allocation scope to \'open/personal\'\n
edit_kw = {\n edit_kw = {\n
\'allocation_scope\': \'open/personal\',\n \'allocation_scope\': \'open/personal\',\n
\'subject_list\': [\'\'],\n }\n
\'destination_section_value\': person,\n
} \n
computer.edit(**edit_kw)\n computer.edit(**edit_kw)\n
\n \n
# Create a ticket (or re-open it) for this issue!\n # Create a ticket (or re-open it) for this issue!\n
ticket_title = \'Allocation scope has been changed for %s\' % computer.\\\n request_title = \'Allocation scope has been changed for %s\' % computer_reference\n
getReference()\n request_description = \'Allocation scope has been changed back to \' \\\n
message = """Dear user,\n \'open/personal for %s\' % computer_reference\n
\n message_title = \'We have changed allocation scope for %s\' % computer_reference\n
Your computer %s with ID %s was configured with allocation scope \'%s\', but you are\n notification_reference = \'slapos-crm-computer_allocation_scope.notification\'\n
not allowed to turn computer to \'public\' or \'friend\' state because you are not a service provider.\n notification_message = portal.portal_notifications.getDocumentValue(\n
The computer allocation scope has been changed to \'open/personal\'. Please contact us for more informations.\n reference=notification_reference)\n
\n
Regards,\n
\n
SlapOS Team\n
""" % (computer.getTitle(), computer.getReference(),\n
allocation_scope)\n
ressource = portal.service_module.slapos_crm_information.getRelativeUrl()\n ressource = portal.service_module.slapos_crm_information.getRelativeUrl()\n
mapping_dict = {\'computer_title\':computer.getTitle(),\n
\'computer_id\':computer_reference,\n
\'allocation_scope\':allocation_scope}\n
message = notification_message.asText(\n
substitution_method_parameter_dict={\'mapping_dict\':mapping_dict})\n
\n \n
support_request_in_progress = portal.portal_catalog.getResultValue(\n support_request_url = context.Base_generateSupportRequestForSlapOS(\n
portal_type = \'Support Request\',\n request_title,\n
title = ticket_title,\n request_description,\n
simulation_state = ["validated","submitted","suspended"],\n computer.getRelativeUrl()\n
source_project_uid = computer.getUid()\n
)\n )\n
if support_request_in_progress is None:\n if support_request_url:\n
support_request = portal.support_request_module.\\\n support_request = portal.restrictedTraverse(support_request_url)\n
slapos_crm_support_request_template.\\\n
Base_createCloneDocument(batch_mode=1)\n
support_request.edit(\n
title = ticket_title,\n
resource = ressource,\n
description = ticket_title,\n
start_date = DateTime(),\n
destination_decision=computer.getSourceAdministration(),\n
source_project_value = computer.getRelativeUrl()\n
)\n
support_request.validate()\n
support_request.suspend()\n support_request.suspend()\n
else:\n else:\n
support_request = support_request_in_progress.getObject()\n # XXX - Base_generateSupportRequestForSlapOS return None if the \n
# support_request already exist, but we want to use it again so...\n
support_request = portal.portal_catalog.getResultValue(\n
portal_type = \'Support Request\',\n
title = request_title,\n
simulation_state = \'suspended\',\n
source_project_uid = computer.getUid()\n
)\n
if support_request is None:\n
# Existing ticket not found, can not create event for the moment\n
return\n
support_request_url = support_request.getRelativeUrl()\n
\n \n
# create Web message if needed for this ticket\n # create Web message if needed for this ticket\n
event_list = context.portal_catalog(follow_up_uid=support_request.getUid(), \n last_event = context.portal_catalog.getResultValue(\n
follow_up_uid=support_request.getUid(), \n
sort_on=[(\'delivery.start_date\', \'DESC\')],\n sort_on=[(\'delivery.start_date\', \'DESC\')],\n
)\n )\n
if len(event_list) > 0 and \\\n if last_event and \\\n
(event_list[0].getStartDate() - DateTime() < 1):\n (DateTime() - last_event.getStartDate() < 1):\n
# User has already been notified this day.\n # User has already been notified this last 24h.\n
return\n return\n
event = portal.event_module.slapos_crm_web_message_template.\\\n event = portal.event_module.slapos_crm_web_message_template.\\\n
Base_createCloneDocument(batch_mode=1)\n Base_createCloneDocument(batch_mode=1)\n
event.edit(\n event.edit(\n
title=\'We have changed allocation scope for %s\' % computer.getReference(),\n title=message_title,\n
text_content=message,\n text_content=message,\n
start_date = DateTime(),\n start_date = DateTime(),\n
resource = ressource,\n resource = ressource,\n
source=person.getRelativeUrl(),\n source=person.getRelativeUrl(),\n
follow_up=support_request.getRelativeUrl(),\n follow_up=support_request_url,\n
)\n )\n
event.stop()\n event.stop()\n
event.deliver()\n event.deliver()\n
......
...@@ -51,20 +51,23 @@ ...@@ -51,20 +51,23 @@
<item> <item>
<key> <string>_body</string> </key> <key> <string>_body</string> </key>
<value> <string># HARDCODED LIMIT TO BE MOVED TO GLOBAL PREFERENCES\n <value> <string># HARDCODED LIMIT TO BE MOVED TO GLOBAL PREFERENCES\n
limit = 3\n limit = 5\n
\n \n
support_request_list = context.portal_catalog(\n kw[\'limit\'] = limit\n
portal_type = \'Support Request\',\n kw[\'portal_type\'] = \'Support Request\'\n
simulation_state = ["validated","submitted", "suspended"],\n kw[\'simulation_state\'] = ["validated","submitted", "suspended"]\n
limit=limit\n if destination_decision:\n
)\n kw[\'default_destination_decision_uid\'] = context.restrictedTraverse(\n
destination_decision).getUid()\n
\n
support_request_list = context.portal_catalog(**kw)\n
\n \n
return len(support_request_list) == limit\n return len(support_request_list) == limit\n
</string> </value> </string> </value>
</item> </item>
<item> <item>
<key> <string>_params</string> </key> <key> <string>_params</string> </key>
<value> <string></string> </value> <value> <string>destination_decision=None, **kw</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
......
...@@ -46,8 +46,30 @@ class TestSlapOSCloudSupportRequestGeneration(testSlapOSMixin): ...@@ -46,8 +46,30 @@ class TestSlapOSCloudSupportRequestGeneration(testSlapOSMixin):
title="Test Support Request %", title="Test Support Request %",
simulation_state="validated"): simulation_state="validated"):
support_request.invalidate() support_request.invalidate()
for support_request in self.portal.portal_catalog(
portal_type="Support Request",
title="Allocation scope has been changed for TESTCOMPT%",
simulation_state="suspended"):
support_request.invalidate()
self.tic() self.tic()
def _updatePersonAssignment(self, person, role='role/member'):
for assignment in person.contentValues(portal_type="Assignment"):
assignment.cancel()
assignment = person.newContent(portal_type='Assignment')
assignment.setRole(role)
assignment.setStartDate(DateTime())
assignment.open()
return assignment
def _makePerson(self, new_id):
# Clone computer document
person = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
person.edit(reference='TESTPERSON-%s' % (new_id, ))
person.immediateReindexObject()
return person
def _makeComputer(self,new_id): def _makeComputer(self,new_id):
# Clone computer document # Clone computer document
person = self.portal.person_module.template_member\ person = self.portal.person_module.template_member\
...@@ -184,21 +206,35 @@ class TestSlapOSCloudSupportRequestGeneration(testSlapOSMixin): ...@@ -184,21 +206,35 @@ class TestSlapOSCloudSupportRequestGeneration(testSlapOSMixin):
def test_ERP5Site_isSupportRequestCreationClosed(self): def test_ERP5Site_isSupportRequestCreationClosed(self):
person = self._makePerson(self.new_id)
other_person = self._makePerson('other-%s' % self.new_id)
url = person.getRelativeUrl()
self.assertFalse(self.portal.ERP5Site_isSupportRequestCreationClosed(url))
self.assertFalse(self.portal.ERP5Site_isSupportRequestCreationClosed()) self.assertFalse(self.portal.ERP5Site_isSupportRequestCreationClosed())
def newSupportRequest(): def newSupportRequest():
sr = self.portal.support_request_module.newContent(\ sr = self.portal.support_request_module.newContent(\
title="Test Support Request POIUY") title="Test Support Request POIUY",
destination_decision=url)
sr.validate() sr.validate()
sr.immediateReindexObject() sr.immediateReindexObject()
newSupportRequest() newSupportRequest()
self.assertFalse(self.portal.ERP5Site_isSupportRequestCreationClosed()) self.assertFalse(self.portal.ERP5Site_isSupportRequestCreationClosed(url))
newSupportRequest() newSupportRequest()
self.assertFalse(self.portal.ERP5Site_isSupportRequestCreationClosed()) self.assertFalse(self.portal.ERP5Site_isSupportRequestCreationClosed(url))
newSupportRequest()
self.assertFalse(self.portal.ERP5Site_isSupportRequestCreationClosed(url))
newSupportRequest()
self.assertFalse(self.portal.ERP5Site_isSupportRequestCreationClosed(url))
newSupportRequest() newSupportRequest()
self.assertTrue(self.portal.ERP5Site_isSupportRequestCreationClosed(url))
self.assertTrue(self.portal.ERP5Site_isSupportRequestCreationClosed()) self.assertTrue(self.portal.ERP5Site_isSupportRequestCreationClosed())
self.assertFalse(self.portal.ERP5Site_isSupportRequestCreationClosed(
other_person.getRelativeUrl()))
def test_Base_generateSupportRequestForSlapOS_recreate_if_closed(self): def test_Base_generateSupportRequestForSlapOS_recreate_if_closed(self):
title = "Test Support Request %s" % self.new_id title = "Test Support Request %s" % self.new_id
computer = self._makeComputer(self.new_id) computer = self._makeComputer(self.new_id)
...@@ -656,3 +692,48 @@ portal_workflow.doActionFor(context, action='edit_action', comment='Visited by S ...@@ -656,3 +692,48 @@ portal_workflow.doActionFor(context, action='edit_action', comment='Visited by S
self.assertEqual('Visited by SoftwareRelease_testForAllocation', self.assertEqual('Visited by SoftwareRelease_testForAllocation',
software_release.workflow_history['edit_workflow'][-1]['comment']) software_release.workflow_history['edit_workflow'][-1]['comment'])
def test_Alarm_notAllowedAllocationScope(self):
computer = self._makeComputer(self.new_id)
person = computer.getSourceAdministrationValue()
self._updatePersonAssignment(person, 'role/member')
computer.edit(allocation_scope='open/public')
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
computer.edit(allocation_scope='open/personal')
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
friend_person = self._makePerson(self.new_id)
computer.edit(allocation_scope='open/friend',
destination_section=friend_person.getRelativeUrl())
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
def test_Alarm_allowedAllocationScope(self):
computer = self._makeComputer(self.new_id)
person = computer.getSourceAdministrationValue()
self._updatePersonAssignment(person, 'role/service_provider')
computer.edit(allocation_scope='open/public')
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/public')
computer.edit(allocation_scope='open/personal')
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/personal')
friend_person = self._makePerson(self.new_id)
computer.edit(allocation_scope='open/friend',
destination_section=friend_person.getRelativeUrl())
self.portal.portal_alarms.slapos_check_update_allocation_scope.activeSense()
self.tic()
self.assertEquals(computer.getAllocationScope(), 'open/friend')
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment