############################################################################## # # Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved. # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsibility of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # guarantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 3 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## import os import subprocess import json import glob import ConfigParser import utils from slapos.recipe.librecipe import generateHashFromFiles # for development: debugging logs and install Ctrl+C handler if os.environ.get('SLAPOS_TEST_DEBUG'): import logging logging.basicConfig(level=logging.DEBUG) import unittest unittest.installHandler() def subprocess_status_output(*args, **kwargs): prc = subprocess.Popen( stdout=subprocess.PIPE, stderr=subprocess.STDOUT, *args, **kwargs) out, err = prc.communicate() return prc.returncode, out class InstanceTestCase(utils.SlapOSInstanceTestCase): @classmethod def getSoftwareURLList(cls): return (os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'software.cfg')), ) class ServicesTestCase(InstanceTestCase): def test_process_list(self): hash_list = [ 'software_release/buildout.cfg', ] expected_process_names = [ 'bootstrap-monitor', 'turnserver-{hash}-on-watch', 'certificate_authority-{hash}-on-watch', 'crond-{hash}-on-watch', 'monitor-httpd-{hash}-on-watch', 'monitor-httpd-graceful', ] supervisor = self.getSupervisorRPCServer().supervisor process_name_list = [process['name'] for process in supervisor.getAllProcessInfo()] hash_file_list = [os.path.join(self.computer_partition_root_path, path) for path in hash_list] for name in expected_process_names: h = generateHashFromFiles(hash_file_list) expected_process_name = name.format(hash=h) self.assertIn(expected_process_name, process_name_list) def test_default_deployment(self): partition_path_list = glob.glob(os.path.join(self.instance_path, '*')) instance_folder = None for partition_path in partition_path_list: if os.path.exists(os.path.join(partition_path, 'etc/turnserver.conf')): instance_folder = partition_path break secret_file = os.path.join(instance_folder, 'etc/.turnsecret') self.assertTrue(os.path.exists(instance_folder)) self.assertTrue(os.path.exists(secret_file)) config = ConfigParser.ConfigParser() config.readfp(open(secret_file)) secret = config.get('turnserver', 'secret') expected_config = """listening-port=3478 tls-listening-port=5349 fingerprint lt-cred-mech use-auth-secret static-auth-secret=%(secret)s listening-ip=%(ipv4)s server-name=turn.example.com realm=turn.example.com total-quota=100 bps-capacity=0 stale-nonce=600 cert=%(instance_path)s/etc/ssl/cert.pem pkey=%(instance_path)s/etc/ssl/key.pem dh-file=%(instance_path)s/etc/ssl/dhparam.pem cipher-list="ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+3DES:DH+3DES:RSA+AES:RSA+3DES:!ADH:!AECDH:!MD5" no-loopback-peers no-multicast-peers mobility no-tlsv1 no-tlsv1_1 no-stdout-log log-file=%(instance_path)s/var/log/turnserver.log userdb=%(instance_path)s/srv/turndb pidfile=%(instance_path)s/var/run/turnserver.pid verbose""" % {'instance_path': instance_folder, 'secret': secret, 'ipv4': self.config['ipv4_address']} with open(os.path.join(instance_folder, 'etc/turnserver.conf')) as f: current_config = f.read().strip() self.assertEqual(current_config, expected_config) def test_turnserver_promises(self): partition_path_list = glob.glob(os.path.join(self.instance_path, '*')) instance_folder = None for partition_path in partition_path_list: if os.path.exists(os.path.join(partition_path, 'etc/turnserver.conf')): instance_folder = partition_path break self.assertTrue(os.path.exists(instance_folder)) promise_path_list = glob.glob(os.path.join(instance_folder, 'etc/plugin/*.py')) promise_name_list = [x for x in os.listdir(os.path.join(instance_folder, 'etc/plugin')) if not x.endswith('.pyc')] partition_name = os.path.basename(instance_folder.rstrip('/')) self.assertEqual(sorted(promise_name_list), sorted([ "__init__.py", "check-free-disk-space.py", "monitor-http-frontend.py", "buildout-%s-status.py" % partition_name, "monitor-bootstrap-status.py", "monitor-httpd-listening-on-tcp.py", "turnserver-port-listening.py", "turnserver-tls-port-listening.py", ])) ignored_plugin_list = [ '__init__.py', 'monitor-http-frontend.py', ] runpromise_bin = os.path.join( self.software_path, 'bin', 'monitor.runpromise') monitor_conf = os.path.join(instance_folder, 'etc', 'monitor.conf') msg = [] status = 0 for plugin_path in promise_path_list: plugin_name = os.path.basename(plugin_path) if plugin_name in ignored_plugin_list: continue plugin_status, plugin_result = subprocess_status_output([ runpromise_bin, '-c', monitor_conf, '--run-only', plugin_name, '--force', '--check-anomaly' ]) status += plugin_status if plugin_status == 1: msg.append(plugin_result) # sanity check if 'Checking promise %s' % plugin_name not in plugin_result: plugin_status = 1 msg.append(plugin_result) msg = ''.join(msg).strip() self.assertEqual(status, 0, msg) class ParametersTestCase(InstanceTestCase): @classmethod def getInstanceParameterDict(cls): return { 'server-name': "turn.site.com", 'port': 3488, 'tls-port': 5369, 'external-ip': '127.0.0.1', 'listening-ip': '127.0.0.1' } def test_turnserver_with_parameters(self): partition_path_list = glob.glob(os.path.join(self.instance_path, '*')) instance_folder = None for partition_path in partition_path_list: if os.path.exists(os.path.join(partition_path, 'etc/turnserver.conf')): instance_folder = partition_path break secret_file = os.path.join(instance_folder, 'etc/.turnsecret') self.assertTrue(os.path.exists(instance_folder)) self.assertTrue(os.path.exists(secret_file)) config = ConfigParser.ConfigParser() config.readfp(open(secret_file)) secret = config.get('turnserver', 'secret') expected_config = """listening-port=%(port)s tls-listening-port=%(tls_port)s fingerprint lt-cred-mech use-auth-secret static-auth-secret=%(secret)s listening-ip=%(ipv4)s external-ip=%(external_ip)s server-name=%(name)s realm=%(name)s total-quota=100 bps-capacity=0 stale-nonce=600 cert=%(instance_path)s/etc/ssl/cert.pem pkey=%(instance_path)s/etc/ssl/key.pem dh-file=%(instance_path)s/etc/ssl/dhparam.pem cipher-list="ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+3DES:DH+3DES:RSA+AES:RSA+3DES:!ADH:!AECDH:!MD5" no-loopback-peers no-multicast-peers mobility no-tlsv1 no-tlsv1_1 no-stdout-log log-file=%(instance_path)s/var/log/turnserver.log userdb=%(instance_path)s/srv/turndb pidfile=%(instance_path)s/var/run/turnserver.pid verbose""" % {'instance_path': instance_folder, 'secret': secret, 'ipv4': '127.0.0.1', 'name': 'turn.site.com', 'external_ip': '127.0.0.1', 'port': 3488, 'tls_port': 5369,} with open(os.path.join(instance_folder, 'etc/turnserver.conf')) as f: current_config = f.read().strip() self.assertEqual(current_config, expected_config)