Commit ae516dea authored by Łukasz Nowak's avatar Łukasz Nowak

Check Software Instance softwareInstanceBang with simulator.

parent c598966f
......@@ -6,12 +6,32 @@ import transaction
from DateTime import DateTime
from App.Common import rfc1123_date
import os
import tempfile
# blurb to make nice XML comparisions
import xml.dom.ext.reader.Sax
import xml.dom.ext
import StringIO
import difflib
class Simulator:
def __init__(self, outfile, method):
self.outfile = outfile
self.method = method
def __call__(self, *args, **kwargs):
"""Simulation Method"""
old = open(self.outfile, 'r').read()
if old:
l = eval(old)
else:
l = []
l.append({'recmethod': self.method,
'recargs': args,
'reckwargs': kwargs})
open(self.outfile, 'w').write(repr(l))
class TestSlapOSSlapToolMixin(testSlapOSMixin):
def generateNewId(self):
return self.portal.portal_ids.generateNewId(
......@@ -108,9 +128,26 @@ class TestSlapOSSlapToolMixin(testSlapOSMixin):
self.computer.partition2.markBusy()
self.computer.partition3.markBusy()
self.hosting_subscription = self.portal.hosting_subscription_module\
.template_hosting_subscription.Base_createCloneDocument(batch_mode=1)
self.start_requested_software_instance = self.portal.software_instance_module\
.template_software_instance.Base_createCloneDocument(batch_mode=1)
self.hosting_subscription.edit(
predecessor=self.start_requested_software_instance.getRelativeUrl()
)
self.hosting_subscription.validate()
setupSoftwareInstance(self.start_requested_software_instance, **dict(
title=self.generateNewSoftwareTitle(),
reference="TESTSI-%s" % self.generateNewId(),
root_software_release_url=\
self.start_requested_software_installation.getUrlString(),
source_reference=self.generateNewSoftwareType(),
text_content=self.generateSafeXml(),
sla_xml=self.generateSafeXml(),
aggregate=self.computer.partition1.getRelativeUrl(),
specialise=self.hosting_subscription.getRelativeUrl()
))
setupSoftwareInstance(self.hosting_subscription, **dict(
title=self.generateNewSoftwareTitle(),
reference="TESTSI-%s" % self.generateNewId(),
root_software_release_url=\
......@@ -120,6 +157,8 @@ class TestSlapOSSlapToolMixin(testSlapOSMixin):
sla_xml=self.generateSafeXml(),
aggregate=self.computer.partition1.getRelativeUrl()
))
self.portal.portal_workflow._jumpToStateFor(
self.hosting_subscription, 'start_requested')
self.portal.portal_workflow._jumpToStateFor(
self.start_requested_software_instance, 'start_requested')
self.start_requested_software_instance.validate()
......@@ -833,7 +872,6 @@ class TestSlapOSSlapToolInstanceAccess(TestSlapOSSlapToolMixin):
self._makeComplexComputer()
partition_id = self.start_requested_software_instance.getAggregateValue(
portal_type='Computer Partition').getReference()
created_at = rfc1123_date(DateTime())
self.login(self.start_requested_software_instance.getReference())
response = self.portal_slap.registerComputerPartition(self.computer_id, partition_id)
self.assertEqual(200, response.status)
......@@ -946,7 +984,6 @@ class TestSlapOSSlapToolInstanceAccess(TestSlapOSSlapToolMixin):
<string>v2</string>
</dictionary>
</marshal>"""
created_at = rfc1123_date(DateTime())
self.login(self.start_requested_software_instance.getReference())
response = self.portal_slap.setComputerPartitionConnectionXml(self.computer_id,
partition_id, connection_xml)
......@@ -954,3 +991,100 @@ class TestSlapOSSlapToolInstanceAccess(TestSlapOSSlapToolMixin):
self.assertEqual({'p2': 'v2', 'p1': 'v1'},
self.start_requested_software_instance.getConnectionXmlAsDict()
)
def test_softwareInstanceError(self):
self._makeComplexComputer()
partition_id = self.start_requested_software_instance.getAggregateValue(
portal_type='Computer Partition').getReference()
self.login(self.start_requested_software_instance.getReference())
error_log = 'The error'
response = self.portal_slap.softwareInstanceError(self.computer_id,
partition_id, error_log)
self.assertEqual('None', response)
created_at = rfc1123_date(DateTime())
response = self.portal_slap.getComputerPartitionStatus(self.computer_id,
partition_id)
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<unicode>created_at</unicode>
<unicode>%(created_at)s</unicode>
<unicode>text</unicode>
<unicode>#error while instanciating</unicode>
<unicode>user</unicode>
<unicode>%(instance_guid)s</unicode>
</dictionary>
</marshal>
""" % dict(
created_at=created_at,
instance_guid=self.start_requested_software_instance.getReference(),
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
def _countBang(self, document):
return len([q for q in document.workflow_history[
'instance_slap_interface_workflow'] if q['action'] == 'bang'])
def assertInstanceBangSimulator(self, args, kwargs):
stored = eval(open(self.instance_bang_simulator).read())
# do the same translation magic as in workflow
kwargs['comment'] = kwargs.pop('comment')
self.assertEqual(stored,
[{'recargs': args, 'reckwargs': kwargs,
'recmethod': 'bang'}])
def test_softwareInstanceBang(self):
self._makeComplexComputer()
self.instance_bang_simulator = tempfile.mkstemp()[1]
try:
partition_id = self.start_requested_software_instance.getAggregateValue(
portal_type='Computer Partition').getReference()
self.login(self.start_requested_software_instance.getReference())
self.start_requested_software_instance.bang = Simulator(
self.instance_bang_simulator, 'bang')
error_log = 'Please bang me'
response = self.portal_slap.softwareInstanceBang(self.computer_id,
partition_id, error_log)
self.assertEqual('None', response)
created_at = rfc1123_date(DateTime())
response = self.portal_slap.getComputerPartitionStatus(self.computer_id,
partition_id)
# check returned XML
xml_fp = StringIO.StringIO()
xml.dom.ext.PrettyPrint(xml.dom.ext.reader.Sax.FromXml(response.body),
stream=xml_fp)
xml_fp.seek(0)
got_xml = xml_fp.read()
expected_xml = """\
<?xml version='1.0' encoding='UTF-8'?>
<marshal>
<dictionary id='i2'>
<unicode>created_at</unicode>
<unicode>%(created_at)s</unicode>
<unicode>text</unicode>
<unicode>#error bang called</unicode>
<unicode>user</unicode>
<unicode>%(instance_guid)s</unicode>
</dictionary>
</marshal>
""" % dict(
created_at=created_at,
instance_guid=self.start_requested_software_instance.getReference(),
)
self.assertEqual(expected_xml, got_xml,
'\n'.join([q for q in difflib.unified_diff(expected_xml.split('\n'), got_xml.split('\n'))]))
self.assertInstanceBangSimulator((), {'comment': error_log, 'bang_tree': True})
finally:
if os.path.exists(self.instance_bang_simulator):
os.unlink(self.instance_bang_simulator)
90
\ No newline at end of file
91
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment