############################################################################## # # Copyright (c) 2013 Vifib SARL and Contributors. All Rights Reserved. # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsibility of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # guarantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## import functools import json import logging import os import shutil import tempfile import textwrap import unittest import warnings from pwd import getpwnam from six.moves import SimpleHTTPServer import six import slapos.util from slapos.testing.utils import ManagedHTTPServer from slapos.util import ( SoftwareReleaseSchema, SoftwareReleaseSerialisation, SoftwareReleaseSchemaValidationError, string_to_boolean, ) class TestUtil(unittest.TestCase): """ Tests methods available in the slapos.util module. """ def test_mkdir_p_new_directory(self): """ Test that mkdir_p recursively creates a directory. """ root_directory = tempfile.mkdtemp() wanted_directory = os.path.join(root_directory, 'foo', 'bar') slapos.util.mkdir_p(wanted_directory) self.assertTrue(os.path.isdir(wanted_directory)) shutil.rmtree(root_directory) def test_mkdir_already_existing(self): """ Check that mkdir_p doesn't raise if directory already exist. """ root_directory = tempfile.mkdtemp() slapos.util.mkdir_p(root_directory) self.assertTrue(os.path.isdir(root_directory)) shutil.rmtree(root_directory) def test_chown_directory(self): """ Test that slapos.util.chownDirectory correctly changes owner. Note: requires root privileges. """ root_slaptest = tempfile.mkdtemp() wanted_directory0 = os.path.join(root_slaptest, 'slap-write0') wanted_directory1 = os.path.join(root_slaptest, 'slap-write0', 'write-slap1') wanted_directory2 = os.path.join(root_slaptest, 'slap-write0', 'write-slap1', 'write-teste2') wanted_directory_mkdir0 = os.makedirs(wanted_directory0, mode=0o777) wanted_directory_mkdir1 = os.makedirs(wanted_directory1, mode=0o777) wanted_directory_mkdir2 = os.makedirs(wanted_directory2, mode=0o777) create_file_txt = tempfile.mkstemp(suffix='.txt', prefix='tmp', dir=wanted_directory2, text=True) user = 'nobody' try: uid = getpwnam(user)[2] gid = getpwnam(user)[3] except KeyError: raise unittest.SkipTest("user %s doesn't exist." % user) if os.getuid() != 0: raise unittest.SkipTest("No root privileges, impossible to chown.") slapos.util.chownDirectory(root_slaptest, uid, gid) uid_check_root_slaptest = os.stat(root_slaptest)[4] gid_check_root_slaptest = os.stat(root_slaptest)[5] self.assertEqual(uid, uid_check_root_slaptest) self.assertEqual(gid, gid_check_root_slaptest) uid_check_wanted_directory0 = os.stat(wanted_directory0)[4] gid_check_wanted_directory0 = os.stat(wanted_directory0)[5] self.assertEqual(uid, uid_check_wanted_directory0) self.assertEqual(gid, gid_check_wanted_directory0) uid_check_wanted_directory1 = os.stat(wanted_directory1)[4] gid_check_wanted_directory1 = os.stat(wanted_directory1)[5] self.assertEqual(uid, uid_check_wanted_directory1) self.assertEqual(gid, gid_check_wanted_directory1) uid_check_wanted_directory2 = os.stat(wanted_directory2)[4] gid_check_wanted_directory2 = os.stat(wanted_directory2)[5] self.assertEqual(uid, uid_check_wanted_directory2) self.assertEqual(gid, gid_check_wanted_directory2) uid_check_file_txt = os.stat(create_file_txt[1])[4] gid_check_file_txt = os.stat(create_file_txt[1])[5] self.assertEqual(uid, uid_check_file_txt) self.assertEqual(gid, gid_check_file_txt) shutil.rmtree(root_slaptest) def test_string_to_boolean_with_true_values(self): for value in ['true', 'True', 'TRUE']: self.assertTrue(string_to_boolean(value)) def test_string_to_boolean_with_false_values(self): for value in ['false', 'False', 'False']: self.assertFalse(string_to_boolean(value)) def test_string_to_boolean_with_incorrect_values(self): for value in [True, False, 1, '1', 't', 'tru', 'truelle', 'f', 'fals', 'falsey']: self.assertRaises(ValueError, string_to_boolean, value) xml2dict0_xml = slapos.util.bytes2str(b"""<?xml version='1.0' encoding='utf-8'?> <instance> <parameter id="badstr">\xc5\x81</parameter> <parameter id="badu">\xc5\x81</parameter> <parameter id="emptystr"></parameter> <parameter id="int">1</parameter> <parameter id="intstr">1</parameter> <parameter id="key">str</parameter> <parameter id="list">['one', 2]</parameter> <parameter id="none">None</parameter> <parameter id="ukey">ustr</parameter> </instance> """) xml2dict0_indict = { u'ukey': u'ustr', 'key': 'str', 'int': 1, 'intstr': '1', 'emptystr': '', 'none': None, 'list': ['one', 2], 'badstr': u'\u0141'.encode('utf-8'), 'badu': u'\u0141' } xml2dict0_outdict = { 'badstr': u'\u0141', 'badu': u'\u0141', 'emptystr': None, 'int': '1', 'intstr': '1', 'key': 'str', 'list': "['one', 2]", 'none': 'None', 'ukey': 'ustr'} def test_xml2dict0(self): self.assertEqual( dict, type(slapos.util.xml2dict(self.xml2dict0_xml)) ) self.assertEqual( self.xml2dict0_outdict, slapos.util.xml2dict(self.xml2dict0_xml) ) def test_dict2xml0(self): self.maxDiff = None self.assertEqual( self.xml2dict0_xml, slapos.util.dict2xml(self.xml2dict0_indict) ) xml2dict1_xml = """<?xml version='1.0' encoding='utf-8'?> <instance> <parameter id="_">{ "param1": "value1", "param2_dict": { "param2_param1": "", "param2_param2_dict": {}, "param2_param3_dict": { "param": "value" } } }</parameter> </instance> """ xml2dict1_dict = { "_": '''{ "param1": "value1", "param2_dict": { "param2_param1": "", "param2_param2_dict": {}, "param2_param3_dict": { "param": "value" } } }''' } def test_xml2dict1(self): self.maxDiff = None self.assertEqual( self.xml2dict1_dict, slapos.util.xml2dict(self.xml2dict1_xml) ) def test_dict2xml1(self): self.maxDiff = None self.assertEqual( self.xml2dict1_xml, slapos.util.dict2xml(self.xml2dict1_dict) ) def test_dumps_loads(self): simple_object = {"ok": [True]} self.assertEqual(simple_object, slapos.util.loads(slapos.util.dumps(simple_object))) self.assertRaises( Exception, slapos.util.loads, b'<marshal><object id="i2" module="nasty" class="klass">' b'<tuple></tuple><dictionary id="i3"/></object></marshal>') class Nasty(object): pass self.assertRaises(Exception, slapos.util.dumps, Nasty()) class SoftwareReleaseSchemaTestXmlSerialisationMixin: serialisation = SoftwareReleaseSerialisation.Xml serialisation_alt = SoftwareReleaseSerialisation.JsonInXml class SoftwareReleaseSchemaTestJsonInXmlSerialisationMixin: serialisation = SoftwareReleaseSerialisation.JsonInXml serialisation_alt = SoftwareReleaseSerialisation.Xml class SoftwareReleaseSchemaTestMixin(object): """Mixin with test methods """ software_url = None # type: str serialisation = None # type: SoftwareReleaseSerialisation serialisation_alt = None # type: SoftwareReleaseSerialisation def test_software_schema(self): schema = SoftwareReleaseSchema(self.software_url, None) software_schema = schema.getSoftwareSchema() self.assertEqual(software_schema['name'], 'Test Software') self.assertEqual(len(software_schema['software-type']), 2) def test_serialisation(self): schema = SoftwareReleaseSchema(self.software_url, None) self.assertEqual(schema.getSerialisation(strict=True), self.serialisation) def test_serialisation_alternate_software_type(self): schema = SoftwareReleaseSchema(self.software_url, 'alternate') self.assertEqual(schema.getSerialisation(strict=True), self.serialisation_alt) def test_instance_request_parameter_schema_default_software_type(self): schema = SoftwareReleaseSchema(self.software_url, None) self.assertTrue(schema.getInstanceRequestParameterSchemaURL()) instance_parameter_schema = schema.getInstanceRequestParameterSchema() self.assertEqual(instance_parameter_schema['description'], "Simple instance parameters schema for tests") def test_connection_parameter_schema(self): schema = SoftwareReleaseSchema(self.software_url, None) self.assertTrue(schema.getInstanceConnectionParameterSchemaURL()) instance_parameter_schema = schema.getInstanceConnectionParameterSchema() self.assertEqual(instance_parameter_schema['description'], "Simple connection parameters schema for tests") def test_instance_request_parameter_validate_default_software_type(self): schema = SoftwareReleaseSchema(self.software_url, None) self.assertTrue(schema.getInstanceRequestParameterSchemaURL()) instance_ok = {'key': 'value', 'type': 'default'} schema.validateInstanceParameterDict(instance_ok) if self.serialisation == SoftwareReleaseSerialisation.JsonInXml: # already serialized values are also tolerated schema.validateInstanceParameterDict({'_': json.dumps(instance_ok)}) with self.assertRaises(SoftwareReleaseSchemaValidationError): schema.validateInstanceParameterDict({"wrong": True}) instance_ok['key'] = False # wrong type with self.assertRaises(SoftwareReleaseSchemaValidationError): schema.validateInstanceParameterDict(instance_ok) with self.assertRaises(SoftwareReleaseSchemaValidationError): schema.validateInstanceParameterDict({'_': json.dumps(instance_ok)}) def test_instance_request_parameter_validate_alternate_software_type(self): schema = SoftwareReleaseSchema(self.software_url, 'alternate') self.assertTrue(schema.getInstanceRequestParameterSchemaURL()) instance_ok = {'key': 'value', 'type': 'alternate'} schema.validateInstanceParameterDict(instance_ok) if self.serialisation_alt == SoftwareReleaseSerialisation.JsonInXml: # already serialized values are also tolerated schema.validateInstanceParameterDict({'_': json.dumps(instance_ok)}) with self.assertRaises(SoftwareReleaseSchemaValidationError): schema.validateInstanceParameterDict({"wrong": True}) instance_ok['type'] = 'wrong' with self.assertRaises(SoftwareReleaseSchemaValidationError): schema.validateInstanceParameterDict(instance_ok) with self.assertRaises(SoftwareReleaseSchemaValidationError): schema.validateInstanceParameterDict({'_': json.dumps(instance_ok)}) def test_instance_request_parameter_schema_alternate_software_type(self): schema = SoftwareReleaseSchema(self.software_url, 'alternate') self.assertTrue(schema.getInstanceRequestParameterSchemaURL()) instance_parameter_schema = schema.getInstanceRequestParameterSchema() self.assertEqual(instance_parameter_schema['description'], "Simple instance parameters schema for tests") class SoftwareReleaseSchemaTestFileSoftwareReleaseMixin(SoftwareReleaseSchemaTestMixin): """Mixin with tests and software release profiles and schema in a temporary directory. """ def setUp(self): self.tmpdir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmpdir) tmpfile = functools.partial(os.path.join, self.tmpdir) with open(tmpfile('software.cfg'), 'w') as f: f.write( textwrap.dedent("""\ [buildout] """)) with open(tmpfile('software.cfg.json'), 'w') as f: json.dump( { "name": "Test Software", "description": "Dummy software for Test", "serialisation": self.serialisation, "software-type": { 'default': { "title": "Default", "description": "Default type", "request": "instance-default-input-schema.json", "response": "instance-default-output-schema.json", "index": 0 }, "alternate": { "title": "Alternate", "description": "Alternate type", "serialisation": self.serialisation_alt, "request": "instance-alternate-input-schema.json", "response": "instance-alternate-output-schema.json", "index": 0 }, } }, f) for software_type in ('default', 'alternate'): with open(tmpfile('instance-%s-input-schema.json' % software_type), 'w') as f: json.dump( { "$schema": "http://json-schema.org/draft-07/schema", "description": "Simple instance parameters schema for tests", "required": ["key", "type"], "properties": { "key": { "$ref": "./schemas-definitions.json#/key" }, "type": { "type": "string", "const": software_type } }, "type": "object" }, f) with open(tmpfile('instance-%s-output-schema.json' % software_type), 'w') as f: json.dump( { "$schema": "http://json-schema.org/draft-07/schema", "description": "Simple connection parameters schema for tests", }, f) with open(tmpfile('schemas-definitions.json'), 'w') as f: json.dump({"key": {"type": "string"}}, f) self.software_url = tmpfile('software.cfg') class SoftwareReleaseSchemaTestHTTPSoftwareReleaseMixin(SoftwareReleaseSchemaTestFileSoftwareReleaseMixin): """Mixin serving software release files over http. """ def setUp(self): super(SoftwareReleaseSchemaTestHTTPSoftwareReleaseMixin, self).setUp() class ProfileHTTPServer(ManagedHTTPServer): hostname = os.environ['SLAPOS_TEST_IPV4'] working_directory = self.tmpdir RequestHandler = SimpleHTTPServer.SimpleHTTPRequestHandler self.logger = logging.getLogger(self.id()) self.logger.propagate = False server = ProfileHTTPServer(self, 'server') server.open() self.addCleanup(server.close) self.software_url = server.url + '/software.cfg' class TestSoftwareReleaseSchemaFileSoftwareReleaseXmlSerialisation( SoftwareReleaseSchemaTestXmlSerialisationMixin, SoftwareReleaseSchemaTestFileSoftwareReleaseMixin, unittest.TestCase): pass class TestSoftwareReleaseSchemaFileSoftwareReleaseJsonInXmlSerialisation( SoftwareReleaseSchemaTestJsonInXmlSerialisationMixin, SoftwareReleaseSchemaTestFileSoftwareReleaseMixin, unittest.TestCase): pass class TestSoftwareReleaseSchemaHTTPSoftwareReleaseXmlSerialisation( SoftwareReleaseSchemaTestXmlSerialisationMixin, SoftwareReleaseSchemaTestHTTPSoftwareReleaseMixin, unittest.TestCase): pass class TestSoftwareReleaseSchemaHTTPSoftwareReleaseJsonInXmlSerialisation( SoftwareReleaseSchemaTestJsonInXmlSerialisationMixin, SoftwareReleaseSchemaTestHTTPSoftwareReleaseMixin, unittest.TestCase): pass class TestSoftwareReleaseSchemaEdgeCases(unittest.TestCase): def test_software_schema_file_not_exist(self): schema = SoftwareReleaseSchema('/file/not/exist', None) with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") self.assertIsNone(schema.getSoftwareSchema()) self.assertEqual(len(w), 1) self.assertIn("Unable to load JSON", str(w[0].message)) def test_software_schema_wrong_URL(self): schema = SoftwareReleaseSchema('http://slapos.invalid/wrong-url/software.cfg', None) with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") self.assertIsNone(schema.getSoftwareSchema()) self.assertEqual(len(w), 1) self.assertIn("Unable to load JSON", str(w[0].message)) def test_software_schema_download_does_no_log(self): schema = SoftwareReleaseSchema('http://slapos.invalid/no-log/software.cfg', None) debug_level_log_stream = six.StringIO() debug_level_handler = logging.StreamHandler(debug_level_log_stream) debug_level_handler.setLevel(logging.DEBUG) default_level_log_stream = six.StringIO() default_level_handler = logging.StreamHandler(default_level_log_stream) default_level_handler.setLevel(logging.INFO) logger = logging.getLogger() self.addCleanup(functools.partial(logger.setLevel, logger.getEffectiveLevel())) logger.setLevel(logging.DEBUG) logger.addHandler(debug_level_handler) self.addCleanup(functools.partial(logger.removeHandler, debug_level_handler)) logger.addHandler(default_level_handler) self.addCleanup(functools.partial(logger.removeHandler, default_level_handler)) self.assertIsNone(schema.getSoftwareSchema()) self.assertEqual(default_level_log_stream.getvalue(), "") self.assertEqual(debug_level_log_stream.getvalue(), "Downloading http://slapos.invalid/no-log/software.cfg.json\n") if __name__ == '__main__': unittest.main()