############################################################################## # # Copyright (c) 2019 Nexedi SA and Contributors. All Rights Reserved. # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsibility of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # guarantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## from __future__ import unicode_literals import configparser import json import logging import os import re import subprocess import sqlite3 import time import unittest import netaddr import pexpect import psutil import requests import six from six.moves.urllib.parse import urlparse, urljoin from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass, SlapOSNodeCommandError from slapos.grid.svcbackend import getSupervisorRPC, _getSupervisordSocketPath from slapos.proxy.db_version import DB_VERSION from slapos.slap.standalone import SlapOSConfigWriter theia_software_release_url = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'software.cfg')) setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(theia_software_release_url) class TheiaTestCase(SlapOSInstanceTestCase): __partition_reference__ = 'T' # for supervisord sockets in included slapos @classmethod def getPath(cls, *components): return os.path.join(cls.computer_partition_root_path, *components) @classmethod def _getSlapos(cls): try: return cls._theia_slapos except AttributeError: cls._theia_slapos = slapos = cls.getPath('srv', 'runner', 'bin', 'slapos') return slapos @classmethod def callSlapos(cls, *command, **kwargs): return subprocess.call((cls._getSlapos(),) + command, **kwargs) @classmethod def checkSlapos(cls, *command, **kwargs): return subprocess.check_call((cls._getSlapos(),) + command, **kwargs) @classmethod def captureSlapos(cls, *command, **kwargs): kwargs.setdefault('universal_newlines', kwargs.pop('text', None)) return subprocess.check_output((cls._getSlapos(),) + command, **kwargs) @classmethod def requestInstance(cls, parameter_dict=None, state='started'): cls.slap.request( software_release=cls.getSoftwareURL(), software_type=cls.getInstanceSoftwareType(), partition_reference=cls.default_partition_reference, partition_parameter_kw=parameter_dict, state=state ) @classmethod def restartService(cls, service): with cls.slap.instance_supervisor_rpc as supervisor: for process_info in supervisor.getAllProcessInfo(): service_name = process_info['name'] if service in service_name: service_id = '%s:%s' % (process_info['group'], service_name) supervisor.stopProcess(service_id) supervisor.startProcess(service_id) break else: raise Exception("Service %s not found" % service) class TestTheia(TheiaTestCase): def setUp(self): self.connection_parameters = self.computer_partition.getConnectionParameterDict() def get(self, url, expect_code=requests.codes.ok): resp = requests.get(url, verify=False) self.assertEqual( expect_code, resp.status_code, '%s returned %d instead of %d' % (url, resp.status_code, expect_code), ) return resp def test_backend_http_get(self): backend_url = self.connection_parameters['backend-url'] self.get(backend_url, requests.codes.unauthorized) # with login/password, this is allowed parsed_url = urlparse(backend_url) authenticated_url = parsed_url._replace( netloc='{}:{}@[{}]:{}'.format( self.connection_parameters['username'], self.connection_parameters['password'], parsed_url.hostname, parsed_url.port, )).geturl() self.get(authenticated_url) def test_http_get(self): url = self.connection_parameters['url'] self.get(url, requests.codes.unauthorized) # with login/password, this is allowed parsed_url = urlparse(self.connection_parameters['url']) authenticated_url = parsed_url._replace( netloc='{}:{}@[{}]:{}'.format( self.connection_parameters['username'], self.connection_parameters['password'], parsed_url.hostname, parsed_url.port, )).geturl() self.get(authenticated_url) # there's a public folder to serve file (no need for authentication) with open(self.getPath() + '/srv/frontend-static/public/test_file', 'w') as f: f.write("hello") def get(path_info): resp = self.get(urljoin(url, path_info)) self.assertIn('Content-Security-Policy', resp.headers) return resp.text self.assertIn('test_file', get('/public/')) self.assertEqual('hello', get('/public/test_file')) # favicon is not empty self.get(urljoin(url, '/favicon.ico'), requests.codes.unauthorized) resp = self.get(urljoin(authenticated_url, '/favicon.ico')) resp.raise_for_status() self.assertTrue(resp.raw) self.get(urljoin(url, '/theia-serviceworker.js'), requests.codes.unauthorized) resp = self.get(urljoin(authenticated_url, '/theia-serviceworker.js')) resp.raise_for_status() self.assertTrue(resp.raw) self.get(urljoin(url, '/theia.webmanifest'), requests.codes.unauthorized) resp = self.get(urljoin(authenticated_url, '/theia.webmanifest')) resp.raise_for_status() self.assertIn('Theia SlapOS', resp.text) # there is a CSS referencing fonts css_text = self.get(urljoin(authenticated_url, '/css/slapos.css')).text css_urls = re.findall(r'url\([\'"]+([^\)]+)[\'"]+\)', css_text) self.assertTrue(css_urls) # and fonts are served for url in css_urls: resp = self.get(urljoin(authenticated_url, url)) self.assertTrue(resp.raw) def test_ipv6_parameter_published(self): self.assertIn('ipv6', self.connection_parameters) def test_theia_slapos(self): home = self.getPath() # Make sure we can use the shell and the integrated slapos command process = pexpect.spawnu(home + '/bin/theia-shell', env={'HOME': home}) # use a large enough terminal so that slapos proxy show table fit in the screen process.setwinsize(5000, 5000) # log process output for debugging logger = logging.getLogger('theia-shell') class DebugLogFile: def write(self, msg): logger.info("output from theia-shell: %s", msg) def flush(self): pass process.logfile = DebugLogFile() process.expect_exact('Standalone SlapOS for computer `slaprunner` activated') # try to supply and install a software to check that this slapos is usable process.sendline( 'slapos supply https://lab.nexedi.com/nexedi/slapos/raw/1.0.144/software/helloworld/software.cfg slaprunner' ) process.expect( 'Requesting software installation of https://lab.nexedi.com/nexedi/slapos/raw/1.0.144/software/helloworld/software.cfg...' ) # we pipe through cat to disable pager and prevent warnings like # WARNING: terminal is not fully functional process.sendline('slapos proxy show | cat') process.expect( 'https://lab.nexedi.com/nexedi/slapos/raw/1.0.144/software/helloworld/software.cfg' ) process.sendline('slapos node software') process.expect( 'Installing software release https://lab.nexedi.com/nexedi/slapos/raw/1.0.144/software/helloworld/software.cfg' ) # interrupt this, we don't want to actually wait for software installation process.sendcontrol('c') process.terminate() process.wait() def test_theia_shell_execute_tasks(self): home = self.getPath() # shell needs to understand -c "command" arguments for theia tasks feature test_file = home + '/test file' subprocess.check_call([ home + '/bin/theia-shell', '-c', 'touch "{}"'.format(test_file) ]) self.assertTrue(os.path.exists(test_file)) def test_theia_request_script(self): script_path = os.path.join( self.getPath(), 'srv', 'project', 'request-script-example.sh', ) self.assertTrue(os.path.exists(script_path)) def test_slapos_cli(self): self.assertIn(b'slaprunner', self.captureSlapos('proxy', 'show')) self.assertIn(b'slaprunner', self.captureSlapos('computer', 'list')) def test_ipv6_range(self): proxy_path = self.getPath('srv', 'runner', 'var', 'proxy.db') query = "SELECT partition_reference, address FROM partition_network%s" % DB_VERSION ipv6, *prefixlen = self._ipv6_address.split('/') if not prefixlen: raise unittest.SkipTest('No IPv6 range') elif int(prefixlen[0]) >= 123: raise unittest.SkipTest('IPv6 range too small: %s' % self._ipv6_address) with sqlite3.connect(proxy_path) as db: rows = db.execute(query).fetchall() partitions = set(p for p, _ in rows) ipv6 = set(addr for _, addr in rows if netaddr.valid_ipv6(addr)) # Check that each partition has a different IPv6 self.assertEqual(len(partitions), len(ipv6)) # Check that no partition has the same IPv6 as theia self.assertNotIn(self.connection_parameters['ipv6'], ipv6) class TestTheiaWithNonAsciiInstanceName(TestTheia): default_partition_reference = '💥' class TestTheiaEmbeddedSlapOSShutdown(TheiaTestCase): def test_stopping_instance_stops_embedded_slapos(self): embedded_slapos_supervisord_socket = _getSupervisordSocketPath( os.path.join( self.getPath(), 'srv', 'runner', 'instance', ), self.logger) # Wait a bit for this supervisor to be started. for _ in range(20): if os.path.exists(embedded_slapos_supervisord_socket): break time.sleep(1) # get the pid of the supervisor used to manage instances with getSupervisorRPC(embedded_slapos_supervisord_socket) as embedded_slapos_supervisor: embedded_slapos_process = psutil.Process(embedded_slapos_supervisor.getPID()) # Stop theia's services with self.slap.instance_supervisor_rpc as instance_supervisor: process_info, = [ p for p in instance_supervisor.getAllProcessInfo() if p['name'].startswith('slapos-standalone-instance-') ] instance_supervisor.stopProcessGroup(process_info['group']) # the supervisor controlling instances is also stopped self.assertFalse(embedded_slapos_process.is_running()) class TestTheiaWithEmbeddedInstance(TheiaTestCase): sr_url = os.path.abspath('dummy/software.cfg') sr_type = 'dummy-type' sr_config = {"dummy": "yes"} regexpr = re.compile(r"([\w/\-\.]+)\s+slaprunner\s+available") @classmethod def getInstanceParameterDict(cls, sr_url=None, sr_type=None, sr_config=None): return { 'initial-embedded-instance': json.dumps({ 'software-url': sr_url or cls.sr_url, 'software-type': sr_type or cls.sr_type, 'instance-parameters': sr_config or cls.sr_config, }), } def expandUrl(self, url): if url.startswith('~/'): url = os.path.join(self.getPath(), url[2:]) return url def assertSupplied(self, sr_url, info=None): info = info or self.captureSlapos('proxy', 'show', text=True) self.assertIn(sr_url, info) self.assertIn(sr_url, self.regexpr.findall(info)) def assertNotSupplied(self, sr_url, info=None): info = info or self.captureSlapos('proxy', 'show', text=True) self.assertNotIn(sr_url, info) def assertEmbedded(self, sr_url, sr_type, config): proxy_info = self.captureSlapos('proxy', 'show', text=True) self.assertSupplied(sr_url, info=proxy_info) name = 'embedded_instance' self.assertIn(name, self.captureSlapos('service', 'list', text=True)) info = json.loads(self.captureSlapos('service', 'info', name, text=True)) self.assertEqual(info['software-url'], sr_url) # XXX: slapos service info does not show the software type, so we check in # slapos proxy show output self.assertIn(sr_type, proxy_info) self.assertEqual(info['instance-parameters'], config) def assertNotEmbedded(self, sr_url, sr_type, config): sr_url = self.expandUrl(sr_url) proxy_info = self.captureSlapos('proxy', 'show', text=True) self.assertNotSupplied(sr_url, info=proxy_info) self.assertNotIn(sr_type, proxy_info) def test(self): # Check that embedded instance is supplied and requested initial_sr_url = self.expandUrl(self.sr_url) self.assertEmbedded(initial_sr_url, self.sr_type, self.sr_config) # Change parameters for embedded instance sr_url = os.path.abspath('bogus/software.cfg') sr_type = 'bogus-type' sr_config = {"bogus": "true"} self.requestInstance( self.getInstanceParameterDict(sr_url, sr_type, sr_config)) self.waitForInstance() # Check that parameters have not been taken into account self.assertNotEmbedded(sr_url, sr_type, sr_config) # Check that previous instance has not been changed self.assertEmbedded(initial_sr_url, self.sr_type, self.sr_config) class TestTheiaFrontend(TheiaTestCase): @classmethod def getInstanceParameterDict(cls): return { 'additional-frontend-guid': 'SOMETHING' } def setUp(self): self.connection_parameters = self.computer_partition.getConnectionParameterDict() def test_http_get(self): for key in ('url', 'additional-url'): resp = requests.get(self.connection_parameters[key], verify=False) self.assertEqual(requests.codes.unauthorized, resp.status_code) class TestTheiaForwardFrontendRequestsEnabled(TheiaTestCase): @classmethod def getInstanceParameterDict(cls): return {"autorun": "user-controlled"} # we interact with slapos in this test def _getRequestedInstanceList(self, query): with sqlite3.connect(os.path.join( self.computer_partition_root_path, 'srv/runner/var/proxy.db', )) as db: return [row[0] for row in db.execute(query).fetchall()] def getRequestedInstanceList(self): return self._getRequestedInstanceList( f"SELECT partition_reference FROM partition{DB_VERSION} where slap_state='busy'" ) def getForwardedInstanceList(self): return self._getRequestedInstanceList( f"SELECT partition_reference FROM forwarded_partition_request{DB_VERSION}" ) def requestEmbeddedFrontendInstance(self, state='available'): self.checkSlapos( 'request', '--state', state, 'frontend', 'http://git.erp5.org/gitweb/slapos.git/blob_plain/HEAD:/software/apache-frontend/software.cfg', ) def test(self): self.requestEmbeddedFrontendInstance() # partition requested directly by user are forwarded with user_ prefix self.assertEqual(self.getForwardedInstanceList(), ['user_frontend']) self.assertEqual(self.getRequestedInstanceList(), []) self.requestEmbeddedFrontendInstance(state='destroyed') self.requestInstance({'forward-slapos-frontend-requests': 'disabled'}) self.waitForInstance() self.requestEmbeddedFrontendInstance() self.assertEqual(self.getForwardedInstanceList(), []) self.assertEqual(self.getRequestedInstanceList(), ['frontend']) self.requestEmbeddedFrontendInstance(state='destroyed') self.checkSlapos('node', 'report') self.requestInstance({'forward-slapos-frontend-requests': 'enabled'}) self.waitForInstance() self.requestEmbeddedFrontendInstance() self.assertEqual(self.getForwardedInstanceList(), ['user_frontend']) self.assertEqual(self.getRequestedInstanceList(), []) class TestTheiaEnv(TheiaTestCase): dummy_software_path = os.path.abspath('dummy/software.cfg') @classmethod def getInstanceParameterDict(cls): return { 'initial-embedded-instance': json.dumps({ 'software-url': cls.dummy_software_path, }), 'autorun': 'stopped', } def test_theia_env(self): """Make sure environment variables are the same whether we use shell or supervisor services. """ # The path of the env.json file expected to be generated by building the dummy software release env_json_path = self.getPath('srv', 'runner', 'software', 'env.json') # Get the pid of the theia process from the test node's instance-supervisord with self.slap.instance_supervisor_rpc as supervisor: all_process_info = supervisor.getAllProcessInfo() for p in all_process_info: if p['name'].startswith('theia-instance'): theia_process = p break else: self.fail("Could not find theia process") theia_pid = theia_process['pid'] # Get the environment of the theia process theia_env = psutil.Process(theia_pid).environ() # Start a theia shell that inherits the environment of the theia process # This simulates the environment of a shell launched from the browser application theia_shell_process = pexpect.spawnu('{}/bin/theia-shell'.format(self.getPath()), env=theia_env) self.addCleanup(theia_shell_process.wait) self.addCleanup(theia_shell_process.terminate) theia_shell_process.expect_exact('Standalone SlapOS for computer `slaprunner` activated') # Launch slapos node software from theia shell theia_shell_process.sendline('slapos node software') theia_shell_process.expect('Installing software release %s' % self.dummy_software_path) theia_shell_process.expect('Finished software releases.') # Get the theia shell environment with open(env_json_path) as f: theia_shell_env = json.load(f) # Remove the env.json file to later be sure that a new one has been generated os.remove(env_json_path) # Launch slapos node software service from the embedded supervisord. # Note that we have two services, slapos-node-software and slapos-node-software-all # The later uses --all which is what we want to use here, because the software # is already installed and we want to install it again, this time from supervisor embedded_run_path = self.getPath('srv', 'runner', 'var', 'run') embedded_supervisord_socket_path = _getSupervisordSocketPath(embedded_run_path, self.logger) with getSupervisorRPC(embedded_supervisord_socket_path) as embedded_supervisor: previous_stop_time = embedded_supervisor.getProcessInfo('slapos-node-software-all')['stop'] embedded_supervisor.startProcess('slapos-node-software-all') for _retries in range(20): time.sleep(1) if embedded_supervisor.getProcessInfo('slapos-node-software-all')['stop'] != previous_stop_time: break else: self.fail("the supervisord service 'slapos-node-software-all' takes too long to finish") # Get the supervisord environment with open(env_json_path) as f: supervisord_env = json.load(f) # Compare relevant variables from both environments self.maxDiff = None self.assertEqual(theia_shell_env['PATH'].split(':'), supervisord_env['PATH'].split(':')) self.assertEqual(theia_shell_env['SLAPOS_CONFIGURATION'], supervisord_env['SLAPOS_CONFIGURATION']) self.assertEqual(theia_shell_env['SLAPOS_CLIENT_CONFIGURATION'], supervisord_env['SLAPOS_CLIENT_CONFIGURATION']) self.assertEqual(theia_shell_env['HOME'], supervisord_env['HOME']) class TestTheiaSharedPath(TheiaTestCase): bogus_path = 'bogus' @classmethod def setUpClass(cls): super(TestTheiaSharedPath, cls).setUpClass() # Change shared part list to include bogus paths cls.slap._shared_part_list.append(cls.bogus_path) SlapOSConfigWriter(cls.slap).writeConfig(cls.slap._slapos_config) # Re-instanciate cls.slap._force_slapos_node_instance_all = True try: cls.waitForInstance() finally: cls.slap._force_slapos_node_instance_all = False def test(self): theia_cfg_path = self.getPath('srv', 'runner', 'etc', 'slapos.cfg') cfg = configparser.ConfigParser() cfg.read(theia_cfg_path) self.assertTrue(cfg.has_option('slapos', 'shared_part_list')) shared_parts_string = cfg.get('slapos', 'shared_part_list') shared_parts_list = [s.strip() for s in shared_parts_string.splitlines()] self.assertIn(self.bogus_path, shared_parts_list) class ResilientTheiaMixin(object): @classmethod def setUpClass(cls): super(ResilientTheiaMixin, cls).setUpClass() # Patch the computer root path to that of the export theia instance cls.computer_partition_root_path = cls.getPartitionPath('export') # Add resiliency files to snapshot patterns cls._save_instance_file_pattern_list += ( '*/srv/export-exitcode-file', '*/srv/export-errormessage-file', '*/srv/import-exitcode-file', '*/srv/import-errormessage-file', ) @classmethod def getPartitionId(cls, instance_type): software_url = cls.getSoftwareURL() for computer_partition in cls.slap.computer.getComputerPartitionList(): partition_url = computer_partition.getSoftwareRelease()._software_release partition_type = computer_partition.getType() if partition_url == software_url and partition_type == instance_type: return computer_partition.getId() raise Exception("Theia %s partition not found" % instance_type) @classmethod def getPartitionPath(cls, instance_type='export', *paths): return os.path.join(cls.slap._instance_root, cls.getPartitionId(instance_type), *paths) @classmethod def getPath(cls, *components): # patch getPath return cls.getPartitionPath('export', *components) @classmethod def _getSlapos(cls, instance_type='export'): return cls.getPartitionPath(instance_type, 'srv', 'runner', 'bin', 'slapos') @classmethod def callSlapos(cls, *command, **kwargs): instance_type = kwargs.pop('instance_type', 'export') return subprocess.call((cls._getSlapos(instance_type),) + command, **kwargs) @classmethod def checkSlapos(cls, *command, **kwargs): instance_type = kwargs.pop('instance_type', 'export') return subprocess.check_call((cls._getSlapos(instance_type),) + command, **kwargs) @classmethod def captureSlapos(cls, *command, **kwargs): kwargs.setdefault('universal_newlines', kwargs.pop('text', None)) instance_type = kwargs.pop('instance_type', 'export') return subprocess.check_output((cls._getSlapos(instance_type),) + command, **kwargs) @classmethod def getInstanceSoftwareType(cls): return 'resilient' @classmethod def waitForInstance(cls): # process twice to propagate to all instances for _ in range(2): super(ResilientTheiaMixin, cls).waitForInstance() class TestTheiaResilientInterface(ResilientTheiaMixin, TestTheia): pass class TestTheiaResilientWithEmbeddedInstance(ResilientTheiaMixin, TestTheiaWithEmbeddedInstance): pass