Commit 40ef5503 authored by Thomas Gambier's avatar Thomas Gambier 🚴🏼

Fixes for "slapos service info"

 * xml2dict and dict2xml support the JSON format of parameter dict
 * since xml2dict doesn't support unicode, we convert unicode to str in
getInformation

See merge request !198
parents c5c597de 583ebce5
...@@ -46,7 +46,7 @@ import six ...@@ -46,7 +46,7 @@ import six
from .exception import ResourceNotReady, ServerError, NotFoundError, \ from .exception import ResourceNotReady, ServerError, NotFoundError, \
ConnectionError ConnectionError
from .hateoas import SlapHateoasNavigator, ConnectionHelper from .hateoas import SlapHateoasNavigator, ConnectionHelper
from slapos.util import loads, dumps, bytes2str, xml2dict, dict2xml, calculate_dict_hash from slapos.util import loads, dumps, bytes2str, unicode2str, xml2dict, dict2xml, calculate_dict_hash
from xml.sax import saxutils from xml.sax import saxutils
from zope.interface import implementer from zope.interface import implementer
...@@ -279,7 +279,7 @@ class OpenOrder(SlapRequester): ...@@ -279,7 +279,7 @@ class OpenOrder(SlapRequester):
setattr(software_instance, '_%s' % key, value) setattr(software_instance, '_%s' % key, value)
if raw_information["data"].get("text_content", None) is not None: if raw_information["data"].get("text_content", None) is not None:
setattr(software_instance, '_parameter_dict', xml2dict(raw_information["data"]['text_content'])) setattr(software_instance, '_parameter_dict', xml2dict(unicode2str(raw_information["data"]['text_content'])))
else: else:
setattr(software_instance, '_parameter_dict', {}) setattr(software_instance, '_parameter_dict', {})
......
...@@ -402,7 +402,7 @@ class TestCliNode(CliMixin): ...@@ -402,7 +402,7 @@ class TestCliNode(CliMixin):
class TestCliList(CliMixin): class TestCliList(CliMixin):
def test_list(self): def test_list(self):
""" """
Test "slapos list" command output. Test "slapos service list" command output.
""" """
return_value = { return_value = {
'instance1': slapos.slap.SoftwareInstance(_title='instance1', _software_release_url='SR1'), 'instance1': slapos.slap.SoftwareInstance(_title='instance1', _software_release_url='SR1'),
...@@ -424,7 +424,7 @@ class TestCliList(CliMixin): ...@@ -424,7 +424,7 @@ class TestCliList(CliMixin):
class TestCliInfo(CliMixin): class TestCliInfo(CliMixin):
def test_info(self, _): def test_info(self, _):
""" """
Test "slapos info" command output. Test "slapos service info" command output.
""" """
setattr(self.conf, 'reference', 'instance1') setattr(self.conf, 'reference', 'instance1')
instance = slapos.slap.SoftwareInstance( instance = slapos.slap.SoftwareInstance(
...@@ -444,7 +444,7 @@ class TestCliInfo(CliMixin): ...@@ -444,7 +444,7 @@ class TestCliInfo(CliMixin):
def test_unknownReference(self, _): def test_unknownReference(self, _):
""" """
Test "slapos info" command output in case reference Test "slapos service info" command output in case reference
of service is not known. of service is not known.
""" """
setattr(self.conf, 'reference', 'instance1') setattr(self.conf, 'reference', 'instance1')
...@@ -456,7 +456,7 @@ class TestCliInfo(CliMixin): ...@@ -456,7 +456,7 @@ class TestCliInfo(CliMixin):
class TestCliComputerList(CliMixin): class TestCliComputerList(CliMixin):
def test_computer_list(self): def test_computer_list(self):
""" """
Test "slapos list" command output. Test "slapos computer list" command output.
""" """
return_value = { return_value = {
'computer1': slapos.slap.hateoas.TempDocument(title='computer1', _reference='COMP-1'), 'computer1': slapos.slap.hateoas.TempDocument(title='computer1', _reference='COMP-1'),
...@@ -492,8 +492,8 @@ class TestComputerCliInfo(CliMixin): ...@@ -492,8 +492,8 @@ class TestComputerCliInfo(CliMixin):
def test_computer_unknownReference(self, _): def test_computer_unknownReference(self, _):
""" """
Test "slapos info" command output in case reference Test "slapos computer info" command output in case reference
of service is not known. of computer is not known.
""" """
setattr(self.conf, 'reference', 'COMP-0') setattr(self.conf, 'reference', 'COMP-0')
with patch.object(slapos.slap.Computer, 'getInformation', side_effect=raiseNotFoundError): with patch.object(slapos.slap.Computer, 'getInformation', side_effect=raiseNotFoundError):
......
...@@ -39,7 +39,7 @@ import httmock ...@@ -39,7 +39,7 @@ import httmock
import mock import mock
import slapos.slap import slapos.slap
from slapos.util import dumps, calculate_dict_hash from slapos.util import dumps, calculate_dict_hash, dict2xml
class UndefinedYetException(Exception): class UndefinedYetException(Exception):
...@@ -196,11 +196,10 @@ class TestSlap(SlapMixin): ...@@ -196,11 +196,10 @@ class TestSlap(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' if url.path == '/registerComputerPartition' and qs == {
and qs == {
'computer_reference': [computer_guid], 'computer_reference': [computer_guid],
'computer_partition_reference': [partition_id] 'computer_partition_reference': [partition_id]
}): }:
partition = slapos.slap.ComputerPartition(computer_guid, partition_id) partition = slapos.slap.ComputerPartition(computer_guid, partition_id)
return { return {
'status_code': 200, 'status_code': 200,
...@@ -237,11 +236,10 @@ class TestSlap(SlapMixin): ...@@ -237,11 +236,10 @@ class TestSlap(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' if url.path == '/registerComputerPartition' and qs == {
and qs == { 'computer_reference': [computer_guid],
'computer_reference': [computer_guid], 'computer_partition_reference': [partition_id]
'computer_partition_reference': [partition_id] }:
}):
return {'status_code': 404} return {'status_code': 404}
else: else:
return {'status_code': 0} return {'status_code': 0}
...@@ -329,8 +327,9 @@ class TestSlap(SlapMixin): ...@@ -329,8 +327,9 @@ class TestSlap(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/getSoftwareReleaseListFromSoftwareProduct' if url.path == '/getSoftwareReleaseListFromSoftwareProduct' and qs == {
and qs == {'software_product_reference': [software_product_reference]}): 'software_product_reference': [software_product_reference]
}:
return { return {
'status_code': 200, 'status_code': 200,
'content': dumps(software_release_url_list) 'content': dumps(software_release_url_list)
...@@ -355,8 +354,9 @@ class TestSlap(SlapMixin): ...@@ -355,8 +354,9 @@ class TestSlap(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/getSoftwareReleaseListFromSoftwareProduct' if url.path == '/getSoftwareReleaseListFromSoftwareProduct' and qs == {
and qs == {'software_release_url': [software_release_url]}): 'software_release_url': [software_release_url]
}:
return { return {
'status_code': 200, 'status_code': 200,
'content': dumps(software_release_url_list) 'content': dumps(software_release_url_list)
...@@ -396,7 +396,7 @@ class TestSlap(SlapMixin): ...@@ -396,7 +396,7 @@ class TestSlap(SlapMixin):
hateoas_url = 'foo' hateoas_url = 'foo'
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/getHateoasUrl'): if url.path == '/getHateoasUrl':
return { return {
'status_code': 200, 'status_code': 200,
'content': hateoas_url 'content': hateoas_url
...@@ -417,7 +417,7 @@ class TestSlap(SlapMixin): ...@@ -417,7 +417,7 @@ class TestSlap(SlapMixin):
hateoas_url = 'foo' hateoas_url = 'foo'
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/getHateoasUrl'): if url.path == '/getHateoasUrl':
self.fail('slap should not have contacted master to get Hateoas URL.') self.fail('slap should not have contacted master to get Hateoas URL.')
with httmock.HTTMock(handler): with httmock.HTTMock(handler):
...@@ -435,7 +435,7 @@ class TestSlap(SlapMixin): ...@@ -435,7 +435,7 @@ class TestSlap(SlapMixin):
hateoas_url = 'foo' hateoas_url = 'foo'
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/getHateoasUrl'): if url.path == '/getHateoasUrl':
return { return {
'status_code': 404, 'status_code': 404,
} }
...@@ -461,9 +461,9 @@ class TestComputer(SlapMixin): ...@@ -461,9 +461,9 @@ class TestComputer(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' if url.path == '/registerComputerPartition' and \
and 'computer_reference' in qs 'computer_reference' in qs and \
and 'computer_partition_reference' in qs): 'computer_partition_reference' in qs:
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
qs['computer_reference'][0], qs['computer_reference'][0],
qs['computer_partition_reference'][0]) qs['computer_partition_reference'][0])
...@@ -471,8 +471,7 @@ class TestComputer(SlapMixin): ...@@ -471,8 +471,7 @@ class TestComputer(SlapMixin):
'status_code': 200, 'status_code': 200,
'content': dumps(slap_partition) 'content': dumps(slap_partition)
} }
elif (url.path == '/getFullComputerInformation' elif url.path == '/getFullComputerInformation' and 'computer_id' in qs:
and 'computer_id' in qs):
slap_computer = slapos.slap.Computer(qs['computer_id'][0]) slap_computer = slapos.slap.Computer(qs['computer_id'][0])
slap_computer._software_release_list = [] slap_computer._software_release_list = []
slap_computer._computer_partition_list = [] slap_computer._computer_partition_list = []
...@@ -531,18 +530,16 @@ class TestComputer(SlapMixin): ...@@ -531,18 +530,16 @@ class TestComputer(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' if url.path == '/registerComputerPartition' and qs == {
and qs == {
'computer_reference': [self.computer_guid], 'computer_reference': [self.computer_guid],
'computer_partition_reference': [partition_id] 'computer_partition_reference': [partition_id]
}): }:
partition = slapos.slap.ComputerPartition(self.computer_guid, partition_id) partition = slapos.slap.ComputerPartition(self.computer_guid, partition_id)
return { return {
'status_code': 200, 'status_code': 200,
'content': dumps(partition) 'content': dumps(partition)
} }
elif (url.path == '/getFullComputerInformation' elif url.path == '/getFullComputerInformation' and 'computer_id' in qs:
and 'computer_id' in qs):
slap_computer = slapos.slap.Computer(qs['computer_id'][0]) slap_computer = slapos.slap.Computer(qs['computer_id'][0])
slap_computer._computer_partition_list = [] slap_computer._computer_partition_list = []
return { return {
...@@ -613,9 +610,9 @@ class TestComputerPartition(SlapMixin): ...@@ -613,9 +610,9 @@ class TestComputerPartition(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' if url.path == '/registerComputerPartition' and \
and 'computer_reference' in qs 'computer_reference' in qs and \
and 'computer_partition_reference' in qs): 'computer_partition_reference' in qs:
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
qs['computer_reference'][0], qs['computer_reference'][0],
qs['computer_partition_reference'][0]) qs['computer_partition_reference'][0])
...@@ -623,8 +620,7 @@ class TestComputerPartition(SlapMixin): ...@@ -623,8 +620,7 @@ class TestComputerPartition(SlapMixin):
'status_code': 200, 'status_code': 200,
'content': dumps(slap_partition) 'content': dumps(slap_partition)
} }
elif (url.path == '/getComputerInformation' elif url.path == '/getComputerInformation' and 'computer_id' in qs:
and 'computer_id' in qs):
slap_computer = slapos.slap.Computer(qs['computer_id'][0]) slap_computer = slapos.slap.Computer(qs['computer_id'][0])
slap_computer._software_release_list = [] slap_computer._software_release_list = []
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
...@@ -658,9 +654,9 @@ class TestComputerPartition(SlapMixin): ...@@ -658,9 +654,9 @@ class TestComputerPartition(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' if url.path == '/registerComputerPartition' and \
and 'computer_reference' in qs 'computer_reference' in qs and \
and 'computer_partition_reference' in qs): 'computer_partition_reference' in qs:
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
qs['computer_reference'][0], qs['computer_reference'][0],
qs['computer_partition_reference'][0]) qs['computer_partition_reference'][0])
...@@ -668,8 +664,7 @@ class TestComputerPartition(SlapMixin): ...@@ -668,8 +664,7 @@ class TestComputerPartition(SlapMixin):
'status_code': 200, 'status_code': 200,
'content': dumps(slap_partition) 'content': dumps(slap_partition)
} }
elif (url.path == '/getComputerInformation' elif url.path == '/getComputerInformation' and 'computer_id' in qs:
and 'computer_id' in qs):
slap_computer = slapos.slap.Computer(qs['computer_id'][0]) slap_computer = slapos.slap.Computer(qs['computer_id'][0])
slap_computer._software_release_list = [] slap_computer._software_release_list = []
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
...@@ -702,9 +697,9 @@ class TestComputerPartition(SlapMixin): ...@@ -702,9 +697,9 @@ class TestComputerPartition(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' and if url.path == '/registerComputerPartition' and \
'computer_reference' in qs and 'computer_reference' in qs and \
'computer_partition_reference' in qs): 'computer_partition_reference' in qs:
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
qs['computer_reference'][0], qs['computer_reference'][0],
qs['computer_partition_reference'][0]) qs['computer_partition_reference'][0])
...@@ -712,8 +707,7 @@ class TestComputerPartition(SlapMixin): ...@@ -712,8 +707,7 @@ class TestComputerPartition(SlapMixin):
'status_code': 200, 'status_code': 200,
'content': dumps(slap_partition) 'content': dumps(slap_partition)
} }
elif (url.path == '/getComputerInformation' elif url.path == '/getComputerInformation' and 'computer_id' in qs:
and 'computer_id' in qs):
slap_computer = slapos.slap.Computer(qs['computer_id'][0]) slap_computer = slapos.slap.Computer(qs['computer_id'][0])
slap_computer._software_release_list = [] slap_computer._software_release_list = []
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
...@@ -751,9 +745,9 @@ class TestComputerPartition(SlapMixin): ...@@ -751,9 +745,9 @@ class TestComputerPartition(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' and if url.path == '/registerComputerPartition' and \
'computer_reference' in qs and 'computer_reference' in qs and \
'computer_partition_reference' in qs): 'computer_partition_reference' in qs:
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
qs['computer_reference'][0], qs['computer_reference'][0],
qs['computer_partition_reference'][0]) qs['computer_partition_reference'][0])
...@@ -761,7 +755,7 @@ class TestComputerPartition(SlapMixin): ...@@ -761,7 +755,7 @@ class TestComputerPartition(SlapMixin):
'status_code': 200, 'status_code': 200,
'content': dumps(slap_partition) 'content': dumps(slap_partition)
} }
elif (url.path == '/getComputerInformation' and 'computer_id' in qs): elif url.path == '/getComputerInformation' and 'computer_id' in qs:
slap_computer = slapos.slap.Computer(qs['computer_id'][0]) slap_computer = slapos.slap.Computer(qs['computer_id'][0])
slap_computer._software_release_list = [] slap_computer._software_release_list = []
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
...@@ -812,9 +806,9 @@ class TestComputerPartition(SlapMixin): ...@@ -812,9 +806,9 @@ class TestComputerPartition(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' if url.path == '/registerComputerPartition' and \
and 'computer_reference' in qs 'computer_reference' in qs and \
and 'computer_partition_reference' in qs): 'computer_partition_reference' in qs:
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
qs['computer_reference'][0], qs['computer_reference'][0],
qs['computer_partition_reference'][0]) qs['computer_partition_reference'][0])
...@@ -822,8 +816,7 @@ class TestComputerPartition(SlapMixin): ...@@ -822,8 +816,7 @@ class TestComputerPartition(SlapMixin):
'status_code': 200, 'status_code': 200,
'content': dumps(slap_partition) 'content': dumps(slap_partition)
} }
elif (url.path == '/getComputerInformation' elif url.path == '/getComputerInformation' and 'computer_id' in qs:
and 'computer_id' in qs):
slap_computer = slapos.slap.Computer(qs['computer_id'][0]) slap_computer = slapos.slap.Computer(qs['computer_id'][0])
slap_computer._software_release_list = [] slap_computer._software_release_list = []
slap_partition = slapos.slap.ComputerPartition( slap_partition = slapos.slap.ComputerPartition(
...@@ -890,9 +883,9 @@ class TestComputerPartition(SlapMixin): ...@@ -890,9 +883,9 @@ class TestComputerPartition(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' and if url.path == '/registerComputerPartition' and \
qs['computer_reference'][0] == computer_guid and qs['computer_reference'][0] == computer_guid and \
qs['computer_partition_reference'][0] == partition_id): qs['computer_partition_reference'][0] == partition_id:
partition = slapos.slap.ComputerPartition( partition = slapos.slap.ComputerPartition(
computer_guid, partition_id) computer_guid, partition_id)
return { return {
...@@ -934,9 +927,9 @@ class TestComputerPartition(SlapMixin): ...@@ -934,9 +927,9 @@ class TestComputerPartition(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(url.query) qs = parse.parse_qs(url.query)
if (url.path == '/registerComputerPartition' and if url.path == '/registerComputerPartition' and \
qs['computer_reference'][0] == computer_guid and qs['computer_reference'][0] == computer_guid and \
qs['computer_partition_reference'][0] == partition_id): qs['computer_partition_reference'][0] == partition_id:
partition = slapos.slap.ComputerPartition( partition = slapos.slap.ComputerPartition(
computer_guid, partition_id) computer_guid, partition_id)
return { return {
...@@ -948,8 +941,8 @@ class TestComputerPartition(SlapMixin): ...@@ -948,8 +941,8 @@ class TestComputerPartition(SlapMixin):
# XXX: why do we have computer_id and not computer_reference? # XXX: why do we have computer_id and not computer_reference?
# XXX: why do we have computer_partition_id and not # XXX: why do we have computer_partition_id and not
# computer_partition_reference? # computer_partition_reference?
if (parsed_qs_body['computer_id'][0] == computer_guid and if (parsed_qs_body['computer_id'][0] == computer_guid and \
parsed_qs_body['computer_partition_id'][0] == partition_id and parsed_qs_body['computer_partition_id'][0] == partition_id and \
parsed_qs_body['error_log'][0] == 'some error'): parsed_qs_body['error_log'][0] == 'some error'):
return {'status_code': 200} return {'status_code': 200}
...@@ -1100,10 +1093,10 @@ class TestSoftwareRelease(SlapMixin): ...@@ -1100,10 +1093,10 @@ class TestSoftwareRelease(SlapMixin):
def handler(url, req): def handler(url, req):
qs = parse.parse_qs(req.body) qs = parse.parse_qs(req.body)
if (url.path == '/softwareReleaseError' and if url.path == '/softwareReleaseError' and \
qs['computer_id'][0] == computer_guid and qs['computer_id'][0] == computer_guid and \
qs['url'][0] == software_release_uri and qs['url'][0] == software_release_uri and \
qs['error_log'][0] == 'some error'): qs['error_log'][0] == 'some error':
return { return {
'status_code': 200 'status_code': 200
} }
...@@ -1253,6 +1246,213 @@ class TestOpenOrder(SlapMixin): ...@@ -1253,6 +1246,213 @@ class TestOpenOrder(SlapMixin):
self.assertEqual("URL_CONNECTION_PARAMETER", self.assertEqual("URL_CONNECTION_PARAMETER",
computer_partition.getConnectionParameter('url')) computer_partition.getConnectionParameter('url'))
def test_getInformation(self):
self.slap = slapos.slap.slap()
parameter_dict = {
"_": {
"param1": "value1",
"param2_dict": {
"param2_param1": "",
"param2_param2_dict": {},
"param2_param3_dict": {"param": "value"}
}
}
}
link_keys = {
"type": {
"href": "urn:jio:get:portal_types/Hosting Subscription",
"name": "Hosting Subscription"
},
}
hosting_subscription_info_dict = {
"connection_parameter_list": {
"title": "my_connection_parameter_dict",
"default": [
{
"connection_key": "key1",
"connection_value": "value1"
},
{
"connection_key": "key2",
"connection_value": "value2"
},
],
"key": "field_my_connection_parameter_list",
"type": "StringField"
},
"title": {
"title": "Title",
"default": "myrefe",
"key": "field_my_title",
"type": "StringField"
},
"text_content": {
"title": "Parameter XML",
"default": dict2xml(parameter_dict),
"key": "field_my_text_content",
"type": "TextAreaField"
},
"slap_state": {
"title": "Slap State",
"default": "stop_requested",
"key": "field_my_slap_state",
"type": "StringField"
},
"source_title": {
"title": "Current Organisation",
"default": "",
"key": "field_my_source_title",
"type": "StringField"
},
"url_string": {
"title": "Url String",
"default": "https://lab.nexedi.com/nexedi/slapos/raw/1.0.115/software/kvm/software.cfg",
"key": "field_my_url_string",
"type": "StringField"
}
}
view_dict = {}
for k in hosting_subscription_info_dict:
view_dict["my_%s" % k] = hosting_subscription_info_dict[k]
view_dict["_embedded"] = {
"form_definition": {
"update_action_title": "",
"_debug": "traverse",
"pt": "form_view_editable",
"title": "HostingSubscription_viewAsHateoas",
"_links": {
"portal": {
"href": "urn:jio:get:erp5",
"name": "ERP5"
},
},
"action": "HostingSubscription_editWebMode",
"update_action": ""
}
}
view_dict["form_id"] = {
"title": "form_id",
"default": "HostingSubscription_viewAsHateoas",
"key": "form_id",
"type": "StringField"
}
view_dict["_links"] = {
"traversed_document": {
"href": "urn:jio:get:hosting_subscription_module/my_refe_id",
"name": "hosting_subscription_module/my_refe_id",
"title": "myrefe"
},
"self": {
"href": "https://localhost/hosting_subscription_module/my_refe_id/HostingSubscription_viewAsHateoas"
},
"form_definition": {
"href": "urn:jio:get:portal_skins/slapos_hal_json_style/HostingSubscription_viewAsHateoas",
"name": "HostingSubscription_viewAsHateoas"
}
}
hateoas_url = "/custom_hateoas_url"
def handler(url, req):
qs = parse.parse_qs(url.query)
if url.path == '/getHateoasUrl':
return {
'status_code': 200,
'content': "http://localhost" + hateoas_url
}
elif url.path == hateoas_url:
return {
'status_code': 200,
'content': {
"default_view": "view",
"_links": {
"traverse": {
"href": "https://localhost/SLAPTEST_getHateoas?mode=traverse{&relative_url,view}",
"name": "Traverse",
"templated": True
},
"raw_search": {
"href": "https://localhost/SLAPTEST_getHateoas?mode=search{&query,select_list*,limit*,group_by*,sort_on*,local_roles*,selection_domain*}",
"name": "Raw Search",
"templated": True
},
},
"_debug": "root",
"title": "Hateoas"
}
}
elif url.path == '/SLAPTEST_getHateoas':
if ("mode=search" in url.query):
return {
'status_code': 200,
'content': {
"_sort_on": "",
"_embedded": {
"contents": [
{
"_links": {
"self": {
"href": "urn:jio:get:hosting_subscription_module/my_refe_id"
}
},
"relative_url": "hosting_subscription_module/my_refe_id",
"title": "myrefe"
}
]
},
"_debug": "search",
"_limit": "200",
"_local_roles": "",
"_query": "portal_type:\"Hosting Subscription\" AND validation_state:validated AND title:=\"myrefe\"",
"_select_list": [
"title",
"relative_url"
],
"_links": {
"self": {
"href": "https://localhost/SLAPTEST_getHateoas"
},
"portal": {
"href": "urn:jio:get:erp5",
"name": "ERP5"
},
"site_root": {
"href": "urn:jio:get:web_site_module/hateoas",
"name": "Hateoas"
}
},
"_group_by": "",
"_selection_domain": ""
}
}
elif ("mode=traverse" in url.query):
return {
'status_code': 200,
'content': {
"_embedded": {
"_view": view_dict
},
"_links": link_keys,
"_debug": "traverse",
"title": "myrefe"
}
}
with httmock.HTTMock(handler):
self.slap.initializeConnection('http://%s' % self.id())
open_order = self.slap.registerOpenOrder()
computer_guid = self._getTestComputerId()
requested_partition_id = self.id()
software_instance = open_order.getInformation('myrefe')
self.assertIsInstance(software_instance, slapos.slap.SoftwareInstance)
for key in hosting_subscription_info_dict:
if key not in link_keys:
self.assertEqual(getattr(software_instance, '_' + key), hosting_subscription_info_dict[key]["default"])
self.assertEqual(software_instance._parameter_dict, parameter_dict)
self.assertEqual(software_instance._requested_state, hosting_subscription_info_dict['slap_state']["default"])
self.assertEqual(software_instance._connection_dict, hosting_subscription_info_dict['connection_parameter_list']["default"])
self.assertEqual(software_instance._software_release_url, hosting_subscription_info_dict['url_string']["default"])
class TestSoftwareProductCollection(SlapMixin): class TestSoftwareProductCollection(SlapMixin):
def setUp(self): def setUp(self):
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
############################################################################## ##############################################################################
import os import os
import slapos.util import slapos.util
from slapos.util import string_to_boolean from slapos.util import string_to_boolean, unicode2str
import tempfile import tempfile
import unittest import unittest
import shutil import shutil
...@@ -120,7 +120,7 @@ class TestUtil(unittest.TestCase): ...@@ -120,7 +120,7 @@ class TestUtil(unittest.TestCase):
for value in [True, False, 1, '1', 't', 'tru', 'truelle', 'f', 'fals', 'falsey']: for value in [True, False, 1, '1', 't', 'tru', 'truelle', 'f', 'fals', 'falsey']:
self.assertRaises(ValueError, string_to_boolean, value) self.assertRaises(ValueError, string_to_boolean, value)
xml2dict_xml = slapos.util.bytes2str(b"""<?xml version='1.0' encoding='utf-8'?> xml2dict0_xml = slapos.util.bytes2str(b"""<?xml version='1.0' encoding='utf-8'?>
<instance> <instance>
<parameter id="badstr">\xc5\x81</parameter> <parameter id="badstr">\xc5\x81</parameter>
<parameter id="badu">\xc5\x81</parameter> <parameter id="badu">\xc5\x81</parameter>
...@@ -134,7 +134,7 @@ class TestUtil(unittest.TestCase): ...@@ -134,7 +134,7 @@ class TestUtil(unittest.TestCase):
</instance> </instance>
""") """)
xml2dict_indict = { xml2dict0_indict = {
u'ukey': u'ustr', u'ukey': u'ustr',
'key': 'str', 'key': 'str',
'int': 1, 'int': 1,
...@@ -146,7 +146,7 @@ class TestUtil(unittest.TestCase): ...@@ -146,7 +146,7 @@ class TestUtil(unittest.TestCase):
'badu': u'\u0141' 'badu': u'\u0141'
} }
xml2dict_outdict = { xml2dict0_outdict = {
'badstr': u'\u0141', 'badstr': u'\u0141',
'badu': u'\u0141', 'badu': u'\u0141',
'emptystr': None, 'emptystr': None,
...@@ -157,17 +157,61 @@ class TestUtil(unittest.TestCase): ...@@ -157,17 +157,61 @@ class TestUtil(unittest.TestCase):
'none': 'None', 'none': 'None',
'ukey': 'ustr'} 'ukey': 'ustr'}
def test_xml2dict(self): def test_xml2dict0(self):
self.assertEqual( self.assertEqual(
self.xml2dict_outdict, dict,
slapos.util.xml2dict(self.xml2dict_xml) type(slapos.util.xml2dict(self.xml2dict0_xml))
)
self.assertEqual(
self.xml2dict0_outdict,
slapos.util.xml2dict(self.xml2dict0_xml)
)
def test_dict2xml0(self):
self.maxDiff = None
self.assertEqual(
self.xml2dict0_xml,
slapos.util.dict2xml(self.xml2dict0_indict)
)
xml2dict1_xml = """<?xml version='1.0' encoding='utf-8'?>
<instance>
<parameter id="_">{
"param1": "value1",
"param2_dict": {
"param2_param1": "",
"param2_param2_dict": {},
"param2_param3_dict": {
"param": "value"
}
}
}</parameter>
</instance>
"""
xml2dict1_dict = {
"_": {
"param1": "value1",
"param2_dict": {
"param2_param1": "",
"param2_param2_dict": {},
"param2_param3_dict": {"param": "value"}
}
}
}
def test_xml2dict1(self):
self.assertEqual(
self.xml2dict1_dict,
slapos.util.xml2dict(self.xml2dict1_xml)
) )
def test_dict2xml(self): def test_dict2xml1(self):
self.maxDiff = None self.maxDiff = None
self.assertEqual( self.assertEqual(
self.xml2dict_xml, self.xml2dict1_xml,
slapos.util.dict2xml(self.xml2dict_indict) slapos.util.dict2xml(self.xml2dict1_dict)
) )
......
...@@ -33,6 +33,7 @@ import socket ...@@ -33,6 +33,7 @@ import socket
import struct import struct
import subprocess import subprocess
import sqlite3 import sqlite3
import json
from xml_marshaller.xml_marshaller import dumps, loads from xml_marshaller.xml_marshaller import dumps, loads
from lxml import etree from lxml import etree
import six import six
...@@ -162,14 +163,22 @@ else: ...@@ -162,14 +163,22 @@ else:
def dict2xml(dictionary): def dict2xml(dictionary):
instance = etree.Element('instance') instance = etree.Element('instance')
for k, v in sorted(six.iteritems(dictionary)): if len(dictionary) == 1 and '_' in dictionary:
if isinstance(k, bytes):
k = k.decode('utf-8')
if isinstance(v, bytes):
v = v.decode('utf-8')
elif not isinstance(v, six.text_type):
v = str(v)
etree.SubElement(instance, "parameter", etree.SubElement(instance, "parameter",
attrib={'id': '_'}).text = json.dumps(
dictionary['_'],
separators=(',', ': '),
sort_keys=True,
indent=4)
else:
for k, v in sorted(six.iteritems(dictionary)):
if isinstance(k, bytes):
k = k.decode('utf-8')
if isinstance(v, bytes):
v = v.decode('utf-8')
elif not isinstance(v, six.text_type):
v = str(v)
etree.SubElement(instance, "parameter",
attrib={'id': k}).text = v attrib={'id': k}).text = v
return bytes2str(etree.tostring(instance, return bytes2str(etree.tostring(instance,
pretty_print=True, pretty_print=True,
...@@ -189,6 +198,8 @@ def xml2dict(xml): ...@@ -189,6 +198,8 @@ def xml2dict(xml):
else: else:
value = element.text value = element.text
result_dict[key] = value result_dict[key] = value
if len(result_dict) == 1 and '_' in result_dict:
result_dict['_'] = json.loads(result_dict['_'])
return result_dict return result_dict
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment