Commit c2c76f5b authored by Alain Takoudjou's avatar Alain Takoudjou Committed by Rafael Monnerat

Move not related scipts and tests from slapos_cloud to slapos_pdm

parent 6e0d89c3
321
\ No newline at end of file
322
\ No newline at end of file
......@@ -3,5 +3,4 @@ testSlapOSCloudConstraint
testSlapOSCloudSecurityGroup
testSlapOSCloudShadow
testSlapOSCloudWorkflow
testSlapOSCloudDocument
testSlapOSSoftwareProduct
\ No newline at end of file
testSlapOSCloudDocument
\ No newline at end of file
......@@ -55,17 +55,17 @@ portal = context.getPortalObject()\n
\n
software_instance = hosting_subscription.getPredecessorValue()\n
if not software_instance:\n
return ""\n
return None\n
software_release_list = context.SoftwareProduct_getSortedSoftwareReleaseList(\n
software_release_url=software_instance.getUrlString())\n
\n
if not software_release_list:\n
return ""\n
return None\n
latest_software_release = software_release_list[0]\n
if software_release_list[0].getUrlString() == software_instance.getUrlString():\n
return ""\n
if latest_software_release.getUrlString() == software_instance.getUrlString():\n
return None\n
else:\n
return latest_software_release.getUrlString()\n
return latest_software_release\n
</string> </value>
</item>
<item>
......
......@@ -50,42 +50,46 @@
</item>
<item>
<key> <string>_body</string> </key>
<value> <string>hosting_subscription = context\n
<value> <string>"""\n
Check if this hosting subscription is upgradable to the latest version,\n
and return the software release to upgrade with.\n
"""\n
\n
hosting_subscription = context\n
portal = context.getPortalObject()\n
\n
slap_state = [\'start_requested\', \'stop_requested\']\n
\n
if not hosting_subscription.getSlapState() in slap_state:\n
return False\n
return None\n
\n
source_instance = hosting_subscription.getPredecessorValue()\n
if not source_instance or source_instance.getSlapState() not in slap_state:\n
return False\n
return None\n
\n
latest_software_url = hosting_subscription.HostingSubscription_getNewerSofwareRelease()\n
if latest_software_url == "":\n
return False\n
software_release = hosting_subscription.HostingSubscription_getNewerSofwareRelease()\n
if not software_release:\n
return None\n
\n
computer = source_instance.getAggregateValue().getParentValue()\n
if computer.getValidationState() != \'validated\':\n
return False\n
isUpgradable = True\n
return None\n
\n
#Find Software Installation\n
software_installation_list = portal.portal_catalog(\n
portal_type="Software Installation",\n
validation_state="validated",\n
url_string=latest_software_url,\n
url_string=software_release.getUrlString(),\n
default_aggregate_uid=computer.getUid(),\n
#XXX - don\'t select destroyed Software Installation\n
slap_state=\'start_requested\'\n
)\n
# check again slap_state because it might be ignored in previous request!\n
if not \'start_requested\' in [software_installation.getSlapState() \\\n
if \'start_requested\' in [software_installation.getSlapState() \\\n
for software_installation in software_installation_list]:\n
isUpgradable = False\n
return software_release\n
\n
return isUpgradable\n
return None\n
</string> </value>
</item>
<item>
......@@ -94,7 +98,7 @@ return isUpgradable\n
</item>
<item>
<key> <string>id</string> </key>
<value> <string>HostingSubscription_isUpgradable</string> </value>
<value> <string>HostingSubscription_getUpgradableSoftwareRelease</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -121,6 +121,7 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
for partition in computer.contentValues(portal_type='Computer Partition'):
if partition.getSlapState() == 'free':
software_instance.edit(aggregate=partition.getRelativeUrl())
partition.markBusy()
break;
def _makeSoftwareProduct(self, new_id):
......@@ -157,6 +158,15 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
)
software_release.publish()
return software_release
def _makeCustomSoftwareRelease(self, new_id, software_product_url, software_url):
software_release = self._makeSoftwareRelease(new_id)
software_release.edit(
aggregate_value=software_product_url,
url_string=software_url
)
software_release.publish()
return software_release
def _makeSoftwareInstallation(self, new_id, computer, software_release_url):
software_installation = self.portal\
......@@ -247,11 +257,168 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
upgrade_decision_module.newContent(
portal_type="Upgrade Decision",
title="TESTUPDE-%s" % self.new_id)
def _makeUpgradeDecisionLine(self, upgrade_decision):
return upgrade_decision.newContent(
portal_type="Upgrade Decision Line",
title="TESTUPDE-%s" % self.new_id)
def test_getSortedSoftwareReleaseListFromSoftwareProduct(self):
new_id = self.generateNewId()
software_product = self._makeSoftwareProduct(new_id)
release_list = software_product.SoftwareProduct_getSortedSoftwareReleaseList(
software_product.getReference())
self.assertEqual(release_list, [])
# published software release
software_release1 = self._makeSoftwareRelease(new_id)
software_release1.edit(aggregate_value=software_product.getRelativeUrl(),
url_string='http://example.org/1-%s.cfg' % new_id,
effective_date=(DateTime() + 5)
)
software_release1.publish()
software_release2 = self._makeSoftwareRelease(self.generateNewId())
software_release2.edit(aggregate_value=software_product.getRelativeUrl(),
url_string='http://example.org/2-%s.cfg' % new_id
)
software_release2.publish()
# 1 released software release, should not appear
software_release3 = self._makeSoftwareRelease(new_id)
self.assertTrue(software_release3.getValidationState() == 'released')
software_release3.edit(aggregate_value=software_product.getRelativeUrl(),
url_string='http://example.org/3-%s.cfg' % new_id
)
self.tic()
release_list = software_product.SoftwareProduct_getSortedSoftwareReleaseList(
software_product.getReference())
self.assertEquals([release.getUrlString() for release in release_list],
['http://example.org/2-%s.cfg' % new_id, 'http://example.org/1-%s.cfg' % new_id])
def test_getSortedSoftwareReleaseListFromSoftwareProduct_Changed(self):
new_id = self.generateNewId()
software_product = self._makeSoftwareProduct(new_id)
release_list = software_product.SoftwareProduct_getSortedSoftwareReleaseList(
software_product.getReference())
self.assertEqual(release_list, [])
# 2 published software releases
software_release2 = self._makeSoftwareRelease(self.generateNewId())
software_release2.publish()
software_release2.edit(aggregate_value=software_product.getRelativeUrl(),
url_string='http://example.org/2-%s.cfg' % new_id,
effective_date=(DateTime() - 2)
)
# Newest software release
software_release1 = self._makeSoftwareRelease(new_id)
software_release1.publish()
software_release1.edit(aggregate_value=software_product.getRelativeUrl(),
url_string='http://example.org/1-%s.cfg' % new_id,
effective_date=DateTime()
)
self.tic()
release_list = software_product.SoftwareProduct_getSortedSoftwareReleaseList(
software_product.getReference())
self.assertEquals([release.getUrlString() for release in release_list],
['http://example.org/1-%s.cfg' % new_id, 'http://example.org/2-%s.cfg' % new_id])
release_list = software_product.SoftwareProduct_getSortedSoftwareReleaseList(
software_release_url='http://example.org/1-%s.cfg' % new_id)
self.assertEquals([release.getUrlString() for release in release_list],
['http://example.org/1-%s.cfg' % new_id, 'http://example.org/2-%s.cfg' % new_id])
def test_HostingSubscription_getNewerSofwareRelease(self):
person = self._makePerson(self.new_id)
computer = self._makeComputer(self.new_id)
computer.edit(source_administration_value=person)
software_product = self._makeSoftwareProduct(self.new_id)
oldest_software_url = 'http://example.org/oldest-%s.cfg' % self.new_id
newest_software_url = 'http://example.org/newest-%s.cfg' % self.new_id
self._makeCustomSoftwareRelease(self.new_id,
software_product.getRelativeUrl(),
oldest_software_url)
self._makeCustomSoftwareRelease(self.generateNewId(),
software_product.getRelativeUrl(),
newest_software_url)
self._makeSoftwareInstallation(self.new_id, computer, oldest_software_url)
hosting_subscription = self._makeFullHostingSubscription(self.new_id,
oldest_software_url, person)
self.tic()
self.assertEqual(hosting_subscription.HostingSubscription_getNewerSofwareRelease(),
None)
self._makeFullSoftwareInstance(hosting_subscription, oldest_software_url)
self.tic()
release = hosting_subscription.HostingSubscription_getNewerSofwareRelease()
self.assertEqual(release.getUrlString(), newest_software_url)
def testHostingSubscription_getUpgradableSoftwareRelease_no_installation(self):
person = self._makePerson(self.new_id)
computer = self._makeComputer(self.new_id)
computer.edit(source_administration_value=person)
self._makeComputerPartitions(computer)
software_product = self._makeSoftwareProduct(self.new_id)
oldest_software_url = 'http://example.org/oldest-%s.cfg' % self.new_id
newest_software_url = 'http://example.org/newest-%s.cfg' % self.new_id
self._makeCustomSoftwareRelease(self.new_id,
software_product.getRelativeUrl(),
oldest_software_url)
self._makeSoftwareInstallation(self.new_id, computer, oldest_software_url)
hs = self._makeFullHostingSubscription(self.new_id,
oldest_software_url, person)
self.tic()
self.assertEqual(hs.HostingSubscription_getUpgradableSoftwareRelease(),
None)
self._makeFullSoftwareInstance(hs, oldest_software_url)
self._markComputerPartitionBusy(computer, hs.getPredecessorValue())
self._makeCustomSoftwareRelease(self.generateNewId(),
software_product.getRelativeUrl(),
newest_software_url)
self.tic()
self.assertEqual(hs.HostingSubscription_getUpgradableSoftwareRelease(),
None)
def testHostingSubscription_getUpgradableSoftwareRelease(self):
person = self._makePerson(self.new_id)
computer = self._makeComputer(self.new_id)
computer.edit(source_administration_value=person)
self._makeComputerPartitions(computer)
software_product = self._makeSoftwareProduct(self.new_id)
oldest_software_url = 'http://example.org/oldest-%s.cfg' % self.new_id
newest_software_url = 'http://example.org/newest-%s.cfg' % self.new_id
self._makeCustomSoftwareRelease(self.new_id,
software_product.getRelativeUrl(),
oldest_software_url)
self._makeSoftwareInstallation(self.new_id, computer, oldest_software_url)
hs = self._makeFullHostingSubscription(self.new_id,
oldest_software_url, person)
self._makeFullSoftwareInstance(hs, oldest_software_url)
self._markComputerPartitionBusy(computer, hs.getPredecessorValue())
self._makeCustomSoftwareRelease(self.generateNewId(),
software_product.getRelativeUrl(),
newest_software_url)
self._makeSoftwareInstallation(self.generateNewId(), computer,
newest_software_url)
# software_release should be ignored!
software_release = self._makeSoftwareRelease(self.generateNewId())
self._makeSoftwareInstallation(self.generateNewId(),
computer, software_release.getUrlString())
self.tic()
release = hs.HostingSubscription_getUpgradableSoftwareRelease()
self.assertEqual(release.getUrlString(), newest_software_url)
self.portal.portal_workflow._jumpToStateFor(hs, 'destroy_requested')
self.tic()
self.assertEqual(hs.HostingSubscription_getUpgradableSoftwareRelease(),
None)
def testUpgradeDecision_getComputer(self):
computer = self._makeComputer(self.new_id)
......@@ -811,7 +978,8 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
def testComputer_checkAndCreateUpgradeDecision(self):
person = self._makePerson(self.new_id)
computer = self._makeComputer(self.new_id)
computer.edit(source_administration_value=person)
computer.edit(source_administration_value=person,
allocation_scope="open/public")
software_product = self._makeSoftwareProduct(self.new_id)
software_release = self._requestSoftwareRelease(self.new_id,
software_product.getRelativeUrl())
......@@ -831,7 +999,7 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
upgrade_decision = computer.Computer_checkAndCreateUpgradeDecision()
self.assertEqual(len(upgrade_decision), 1)
self.assertEqual(upgrade_decision[0].getSimulationState(), 'confirmed')
self.assertEqual(upgrade_decision[0].getSimulationState(), 'started')
computer_aggregate = upgrade_decision[0].UpgradeDecision_getComputer()
self.assertEqual(computer_aggregate.getReference(),
......@@ -843,10 +1011,11 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
upgrade_decision2 = computer.Computer_checkAndCreateUpgradeDecision()
self.assertEqual(len(upgrade_decision2), 0)
def testComputer_checkAndCreateUpgradeDecision_with_exist(self):
def testComputer_checkAndCreateUpgradeDecision_personal_with_exist(self):
person = self._makePerson(self.new_id)
computer = self._makeComputer(self.new_id)
computer.edit(source_administration_value=person)
computer.edit(source_administration_value=person,
allocation_scope="open/personal")
software_product = self._makeSoftwareProduct(self.new_id)
software_release = self._requestSoftwareRelease(self.new_id,
software_product.getRelativeUrl())
......@@ -857,7 +1026,7 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
self.tic()
upgrade_decision = computer.Computer_checkAndCreateUpgradeDecision()[0]
self.assertEqual(upgrade_decision.getSimulationState(), 'confirmed')
self.assertEqual(upgrade_decision.getSimulationState(), 'planned')
software_release3 = self._requestSoftwareRelease(self.generateNewId(),
software_product.getRelativeUrl())
......@@ -866,10 +1035,36 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
upgrade_decision2 = computer.Computer_checkAndCreateUpgradeDecision()[0]
self.assertEqual(upgrade_decision.getSimulationState(), 'cancelled')
self.assertEqual(upgrade_decision2.getSimulationState(), 'confirmed')
self.assertEqual(upgrade_decision2.getSimulationState(), 'planned')
release = upgrade_decision2.UpgradeDecision_getSoftwareRelease()
self.assertEqual(release.getUrlString(),
software_release3.getUrlString())
def testComputer_checkAndCreateUpgradeDecision_public_with_exist(self):
person = self._makePerson(self.new_id)
computer = self._makeComputer(self.new_id)
computer.edit(source_administration_value=person,
allocation_scope="open/public")
software_product = self._makeSoftwareProduct(self.new_id)
software_release = self._requestSoftwareRelease(self.new_id,
software_product.getRelativeUrl())
self._makeSoftwareInstallation(self.new_id,
computer, software_release.getUrlString())
self._requestSoftwareRelease(self.generateNewId(),
software_product.getRelativeUrl())
self.tic()
upgrade_decision = computer.Computer_checkAndCreateUpgradeDecision()[0]
self.assertEqual(upgrade_decision.getSimulationState(), 'started')
self._requestSoftwareRelease(self.generateNewId(),
software_product.getRelativeUrl())
self.tic()
upgrade_decision2 = computer.Computer_checkAndCreateUpgradeDecision()
self.assertEqual(len(upgrade_decision2), 0)
self.assertEqual(upgrade_decision.getSimulationState(), 'started')
def testComputer_hostingSubscriptionCreateUpgradeDecision_no_newer(self):
......@@ -933,7 +1128,7 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
self.tic()
up_decision = computer.Computer_hostingSubscriptionCreateUpgradeDecision()[0]
self.assertEqual(up_decision.getSimulationState(), 'confirmed')
self.assertEqual(up_decision.getSimulationState(), 'planned')
self.assertEqual(up_decision.UpgradeDecision_getHostingSubscription().\
getReference(), hosting_subscription.getReference())
......@@ -973,7 +1168,7 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
self.tic()
up_decision = computer.Computer_hostingSubscriptionCreateUpgradeDecision()[0]
self.assertEqual(up_decision.getSimulationState(), 'confirmed')
self.assertEqual(up_decision.getSimulationState(), 'planned')
# Install the another software release
software_release3 = self._requestSoftwareRelease(self.generateNewId(),
......@@ -983,7 +1178,7 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
self.tic()
up_decision2 = computer.Computer_hostingSubscriptionCreateUpgradeDecision()[0]
self.assertEqual(up_decision2.getSimulationState(), 'confirmed')
self.assertEqual(up_decision2.getSimulationState(), 'planned')
self.assertEqual(up_decision.getSimulationState(), 'cancelled')
release = up_decision2.UpgradeDecision_getSoftwareRelease()
self.assertEqual(release.getUrlString(),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment