Commit db98a521 authored by Xavier Thompson's avatar Xavier Thompson

SlapObject: Give each process group its own file

Give each supervisor process group its own separate configuration file.
parent 2a49396e
......@@ -39,6 +39,9 @@ import subprocess
import tarfile
import tempfile
import time
from collections import defaultdict
from six.moves import xmlrpc_client as xmlrpclib, range
from six.moves.configparser import ConfigParser
......@@ -64,8 +67,13 @@ REQUIRED_COMPUTER_PARTITION_PERMISSION = 0o750
CP_STORAGE_FOLDER_NAME = 'DATA'
# XXX not very clean. this is changed when testing
PROGRAM_PARTITION_TEMPLATE = bytes2str(pkg_resources.resource_string(__name__,
'templates/program_partition_supervisord.conf.in'))
PROGRAM_PARTITION_TEMPLATE = bytes2str(
pkg_resources.resource_string(
__name__, 'templates/program_partition_supervisord.conf.in'))
GROUP_PARTITION_TEMPLATE = bytes2str(
pkg_resources.resource_string(
__name__, 'templates/group_partition_supervisord.conf.in'))
def free_space(path, fn):
......@@ -409,7 +417,7 @@ class Partition(object):
software_path,
instance_path,
shared_part_list,
supervisord_partition_configuration_path,
supervisord_partition_configuration_dir,
supervisord_socket,
computer_partition,
computer_id,
......@@ -436,8 +444,8 @@ class Partition(object):
self.run_path = os.path.join(self.instance_path, 'etc', 'run')
self.service_path = os.path.join(self.instance_path, 'etc', 'service')
self.prerm_path = os.path.join(self.instance_path, 'etc', 'prerm')
self.supervisord_partition_configuration_path = \
supervisord_partition_configuration_path
self.supervisord_partition_configuration_dir = \
supervisord_partition_configuration_dir
self.supervisord_socket = supervisord_socket
self.computer_partition = computer_partition
self.computer_id = computer_id
......@@ -524,53 +532,24 @@ class Partition(object):
gid = stat_info.st_gid
return (uid, gid)
def addProgramToGroup(self, partition_id, program_id, name, command,
as_user=True):
if as_user:
uid, gid = self.getUserGroupId()
else:
uid, gid = 0, 0
self.partition_supervisor_configuration += '\n' + \
PROGRAM_PARTITION_TEMPLATE % {
'program_id': '{}_{}'.format(partition_id, program_id),
'program_directory': self.instance_path,
'program_command': command,
'program_name': name,
'instance_path': self.instance_path,
'user_id': uid,
'group_id': gid,
# As supervisord has no environment to inherit, setup a minimalistic one
'HOME': pwd.getpwuid(uid).pw_dir,
'USER': pwd.getpwuid(uid).pw_name,
}
def addCustomGroup(self, group_suffix, partition_id, program_list):
group_partition_template = bytes2str(pkg_resources.resource_string(__name__,
'templates/group_partition_supervisord.conf.in'))
group_id = '{}-{}'.format(partition_id, group_suffix)
def getGroupIdFromSuffix(self, suffix=None):
partition_id = self.partition_id
return '%s-%s' % (partition_id, suffix) if suffix else partition_id
self.supervisor_configuration_group += group_partition_template % {
'instance_id': group_id,
'program_list': ','.join(['{}_{}'.format(group_id, program_id)
for program_id in program_list]),
}
def addProgramToGroup(self, suffix, program_id, name, command, as_user=True):
group = self.getGroupIdFromSuffix(suffix)
self.supervisor_conf[group][program_id] = (name, command, as_user)
return group_id
def addServicesToGroup(self, runner_list, path, extension=''):
self.addServicesToCustomGroup(None, runner_list, path, extension)
def addServiceToGroup(self, partition_id, runner_list, path, extension=''):
def addServicesToCustomGroup(self, suffix, runner_list, path, extension=''):
"""Add new services to supervisord that belong to specific group"""
for runner in runner_list:
program_id = runner
program_name = runner + extension
program_command = os.path.join(path, runner)
self.addProgramToGroup(partition_id, program_id, program_name,
program_command)
def addServiceToCustomGroup(self, group_suffix, partition_id, runner_list,
path, extension=''):
"""Add new services to supervisord that belong to specific group"""
group_id = self.addCustomGroup(group_suffix, partition_id,
runner_list)
return self.addServiceToGroup(group_id, runner_list, path, extension)
self.addProgramToGroup(suffix, program_id, program_name, program_command)
def updateSymlink(self, sr_symlink, software_path):
if os.path.lexists(sr_symlink):
......@@ -586,7 +565,7 @@ class Partition(object):
installs the software partition with the help of buildout
"""
self.logger.info("Installing Computer Partition %s..."
% self.computer_partition.getId())
% self.partition_id)
self.check_free_space()
......@@ -723,8 +702,7 @@ class Partition(object):
"""
runner_list = []
service_list = []
self.partition_supervisor_configuration = ""
self.supervisor_configuration_group = ""
self.supervisor_conf = defaultdict(dict)
if os.path.exists(self.run_path):
if os.path.isdir(self.run_path):
runner_list = sorted(os.listdir(self.run_path))
......@@ -735,70 +713,92 @@ class Partition(object):
self.logger.warning('No runners nor services found for partition %r' %
self.partition_id)
else:
partition_id = self.computer_partition.getId()
group_partition_template = bytes2str(pkg_resources.resource_string(__name__,
'templates/group_partition_supervisord.conf.in'))
self.supervisor_configuration_group = group_partition_template % {
'instance_id': partition_id,
'program_list': ','.join(['_'.join([partition_id, runner])
for runner in runner_list + service_list])
}
# Same method to add to service and run
self.addServiceToGroup(partition_id, runner_list, self.run_path)
self.addServiceToGroup(partition_id, service_list, self.service_path,
extension=WATCHDOG_MARK)
self.addServicesToGroup(runner_list, self.run_path)
self.addServicesToGroup(
service_list, self.service_path, extension=WATCHDOG_MARK)
def writeSupervisorConfigurationFile(self):
def writeSupervisorConfigurationFiles(self):
"""
Write supervisord configuration file and update supervisord
Write supervisord configuration files and update supervisord
"""
if self.supervisor_configuration_group and \
self.partition_supervisor_configuration:
updateFile(self.supervisord_partition_configuration_path,
self.supervisor_configuration_group +
self.partition_supervisor_configuration)
remaining = set(
f for f in os.listdir(self.supervisord_partition_configuration_dir)
if f.startswith(self.partition_id)
)
for group, programs in self.supervisor_conf.items():
filename = '%s.conf' % group
filepath = os.path.join(
self.supervisord_partition_configuration_dir, filename)
supervisor_conf = GROUP_PARTITION_TEMPLATE % {
'instance_id': group,
'program_list': ','.join('%s_%s' % (group, p) for p in programs.keys()),
}
for program_id, (name, command, as_user) in programs.items():
uid, gid = self.getUserGroupId() if as_user else (0, 0)
supervisor_conf += '\n' + \
PROGRAM_PARTITION_TEMPLATE % {
'program_id': '%s_%s' % (group, program_id),
'program_directory': self.instance_path,
'program_command': command,
'program_name': name,
'instance_path': self.instance_path,
'user_id': uid,
'group_id': gid,
# As supervisord has no environment to inherit, setup a minimalistic one
'HOME': pwd.getpwuid(uid).pw_dir,
'USER': pwd.getpwuid(uid).pw_name,
}
remaining.discard(filename)
updateFile(filepath, supervisor_conf)
for filename in remaining:
filepath = os.path.join(
self.supervisord_partition_configuration_dir, filename)
os.unlink(filepath)
if self.supervisor_conf or remaining:
self.updateSupervisor()
else:
self.removeSupervisorConfigurationFile()
def removeSupervisorConfigurationFile(self):
def removeSupervisorConfigurationFiles(self):
"""
Remove supervisord configuration file if it exists and update supervisord
Remove supervisord configuration files if any exist and update supervisord
"""
try:
os.unlink(self.supervisord_partition_configuration_path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
self.updateSupervisor()
filenames = [
f for f in os.listdir(self.supervisord_partition_configuration_dir)
if f.startswith(self.partition_id)
]
for filename in filenames:
filepath = os.path.join(
self.supervisord_partition_configuration_dir, filename)
os.unlink(filepath)
if filenames:
self.updateSupervisor()
def updateSupervisorConfiguration(self):
"""
update supervisord with new processes
"""
self.generateSupervisorConfiguration()
self.writeSupervisorConfigurationFile()
self.writeSupervisorConfigurationFiles()
def start(self):
"""Asks supervisord to start the instance. If this instance is not
installed, we install it.
"""
partition_id = self.computer_partition.getId()
partition_id = self.partition_id
try:
with self.getSupervisorRPC() as supervisor:
supervisor.startProcessGroup(partition_id, False)
except xmlrpclib.Fault as exc:
if exc.faultString.startswith('BAD_NAME:'):
self.logger.info("Nothing to start on %s..." %
self.computer_partition.getId())
self.logger.info("Nothing to start on %s..." % partition_id)
else:
raise
else:
self.logger.info("Requested start of %s..." % self.computer_partition.getId())
self.logger.info("Requested start of %s..." % partition_id)
def stop(self):
"""Asks supervisord to stop the instance."""
partition_id = self.computer_partition.getId()
partition_id = self.partition_id
try:
with self.getSupervisorRPC() as supervisor:
supervisor.stopProcessGroup(partition_id, False)
......@@ -808,13 +808,13 @@ class Partition(object):
else:
raise
else:
self.logger.info("Requested stop of %s..." % self.computer_partition.getId())
self.logger.info("Requested stop of %s..." % partition_id)
def destroy(self):
"""Destroys the partition and makes it available for subsequent use."
"""
self.logger.info("Destroying Computer Partition %s..."
% self.computer_partition.getId())
% self.partition_id)
self.createRetentionLockDate()
if not self.checkRetentionIsAuthorized():
......@@ -867,7 +867,7 @@ class Partition(object):
# Cleanup all Data storage location of this partition
self.removeSupervisorConfigurationFile()
self.removeSupervisorConfigurationFiles()
except IOError as exc:
raise IOError("I/O error while freeing partition (%s): %s" % (self.instance_path, exc))
......
......@@ -1077,9 +1077,8 @@ stderr_logfile_backups=1
software_path=software_path,
instance_path=instance_path,
shared_part_list=self.shared_part_list,
supervisord_partition_configuration_path=os.path.join(
_getSupervisordConfigurationDirectory(self.instance_root),
computer_partition_id + '.conf'),
supervisord_partition_configuration_dir=(
_getSupervisordConfigurationDirectory(self.instance_root)),
supervisord_socket=self.supervisord_socket,
computer_partition=computer_partition,
computer_id=self.computer_id,
......@@ -1174,9 +1173,8 @@ stderr_logfile_backups=1
software_path=software_path,
instance_path=instance_path,
shared_part_list=self.shared_part_list,
supervisord_partition_configuration_path=os.path.join(
_getSupervisordConfigurationDirectory(self.instance_root), '%s.conf' %
computer_partition_id),
supervisord_partition_configuration_dir=(
_getSupervisordConfigurationDirectory(self.instance_root)),
supervisord_socket=self.supervisord_socket,
computer_partition=computer_partition,
computer_id=self.computer_id,
......@@ -1836,9 +1834,8 @@ stderr_logfile_backups=1
instance_path=os.path.join(self.instance_root,
computer_partition.getId()),
shared_part_list=self.shared_part_list,
supervisord_partition_configuration_path=os.path.join(
_getSupervisordConfigurationDirectory(self.instance_root), '%s.conf' %
computer_partition_id),
supervisord_partition_configuration_dir=(
_getSupervisordConfigurationDirectory(self.instance_root)),
supervisord_socket=self.supervisord_socket,
computer_partition=computer_partition,
computer_id=self.computer_id,
......
......@@ -156,17 +156,15 @@ class Manager(object):
# Generate supervisord configuration with socat processes added
partition.generateSupervisorConfiguration()
group_id = partition.addCustomGroup('socat', partition.partition_id,
[program['name']
for program in socat_programs])
for program in socat_programs:
partition.addProgramToGroup(group_id, program['name'], program['name'],
partition.addProgramToGroup('socat', program['name'], program['name'],
program['command'],
as_user=program['as_user'])
partition.writeSupervisorConfigurationFile()
partition.writeSupervisorConfigurationFiles()
# Start processes
group_id = partition.getGroupIdFromSuffix('socat')
with partition.getSupervisorRPC() as supervisor:
for program in socat_programs:
process_name = '{}:{}'.format(group_id, program['name'])
......
......@@ -73,11 +73,9 @@ class Manager(object):
group_suffix = "prerm"
logger.info("Adding pre-delete scripts to supervisord...")
partition.generateSupervisorConfiguration()
partition.addServiceToCustomGroup(group_suffix,
partition_id,
wrapper_list,
partition.prerm_path)
partition.writeSupervisorConfigurationFile()
partition.addServicesToCustomGroup(
group_suffix, wrapper_list, partition.prerm_path)
partition.writeSupervisorConfigurationFiles()
# check the state of all process, if the process is not started yes, start it
with partition.getSupervisorRPC() as supervisor:
......
......@@ -170,8 +170,7 @@ class MasterMixin(BasicMixin, unittest.TestCase):
software_path=software_path,
instance_path=instance_path,
shared_part_list=shared_part_list,
supervisord_partition_configuration_path=os.path.join(
supervisor_configuration_path, partition_id),
supervisord_partition_configuration_dir=supervisor_configuration_path,
supervisord_socket=os.path.join(
supervisor_configuration_path, 'supervisor.sock'),
computer_partition=slap_computer_partition,
......@@ -507,39 +506,41 @@ class TestPartitionSupervisorConfig(MasterMixin, unittest.TestCase):
utils.launchBuildout = FakeCallAndNoop()
def test_grouped_program(self):
self.assertEqual(self.partition.supervisor_configuration_group, '')
self.assertEqual(self.partition.partition_supervisor_configuration, '')
self.partition.addProgramToGroup('test', 'sample-1', 'sample-1', '/bin/ls')
self.partition.writeSupervisorConfigurationFiles()
partition_id = self.partition.partition_id
group_id = self.partition.getGroupIdFromSuffix('test')
group_id = self.partition.addCustomGroup('test', partition_id,
['sample-1'])
self.assertIn('group:{}-test'.format(partition_id),
self.partition.supervisor_configuration_group)
filepath = os.path.join(
self.partition.supervisord_partition_configuration_dir,
group_id + '.conf'
)
self.partition.addProgramToGroup(group_id, 'sample-1', 'sample-1',
'/bin/ls')
with open(filepath) as f:
supervisor_conf = f.read()
self.assertIn('program:{}-test_sample-1'.format(partition_id),
self.partition.partition_supervisor_configuration)
self.assertIn('group:' + group_id, supervisor_conf)
self.assertIn('program:%s_sample-1' % group_id, supervisor_conf)
def test_simple_service(self):
self.assertEqual(self.partition.supervisor_configuration_group, '')
self.assertEqual(self.partition.partition_supervisor_configuration, '')
runners = ['runner-' + str(i) for i in range(3)]
path = os.path.join(self.partition.instance_path, 'etc/run')
self.partition.addServicesToGroup(runners, path)
self.partition.writeSupervisorConfigurationFiles()
partition_id = self.partition.partition_id
group_id = self.partition.getGroupIdFromSuffix()
runners = ['runner-{}'.format(i) for i in range(3)]
path = os.path.join(self.partition.instance_path, 'etc/run')
self.partition.addServiceToGroup(partition_id, runners, path)
filepath = os.path.join(
self.partition.supervisord_partition_configuration_dir,
group_id + '.conf'
)
with open(filepath) as f:
supervisor_conf = f.read()
for i in range(3):
self.assertIn('program:{}_runner-{}'.format(partition_id, i),
self.partition.partition_supervisor_configuration)
self.assertIn('program:%s_runner-%s' % (group_id, i), supervisor_conf)
runner_path = os.path.join(self.partition.instance_path, 'etc/run',
'runner-{}'.format(i))
class TestPartitionDestructionLock(MasterMixin, unittest.TestCase):
def setUp(self):
......
......@@ -3030,13 +3030,17 @@ exit 0
stat_info = os.stat(partition.partition_path)
uid = stat_info.st_uid
gid = stat_info.st_gid
supervisor_conf_file = os.path.join(self.instance_root,
partition_supervisor_conf_file = os.path.join(self.instance_root,
'etc/supervisord.conf.d',
'%s.conf' % partition.name)
self.assertTrue(os.path.exists(supervisor_conf_file))
prerm_supervisor_conf_file = os.path.join(self.instance_root,
'etc/supervisord.conf.d',
'%s-prerm.conf' % partition.name)
self.assertFalse(os.path.exists(partition_supervisor_conf_file))
self.assertTrue(os.path.exists(prerm_supervisor_conf_file))
regex_user = r"user=(\d+)"
regex_group = r"group=(\d+)"
with open(supervisor_conf_file) as f:
with open(prerm_supervisor_conf_file) as f:
config = f.read()
# search user uid in conf file
result = re.search(regex_user, config, re.DOTALL)
......@@ -3113,8 +3117,8 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
self.computer = self.getTestComputerClass()(self.software_root, self.instance_root)
self.partition = self.computer.instance_list[0]
self.instance_supervisord_config_path = os.path.join(
self.instance_root, 'etc/supervisord.conf.d/0.conf')
self.socat_supervisord_config_path = os.path.join(
self.instance_root, 'etc/supervisord.conf.d/0-socat.conf')
self.port_redirect_path = os.path.join(self.partition.partition_path,
slapmanager.portredir.Manager.port_redirect_filename)
......@@ -3122,8 +3126,8 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
def _mock_requests(self):
return httmock.HTTMock(self.computer.request_handler)
def _read_instance_supervisord_config(self):
with open(self.instance_supervisord_config_path) as f:
def _read_socat_supervisord_config(self):
with open(self.socat_supervisord_config_path) as f:
return f.read()
def _setup_instance(self, config):
......@@ -3151,9 +3155,9 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertIn('socat-tcp-{}'.format(1234), partition_supervisord_config)
self.assertIn('socat TCP4-LISTEN:1234,fork TCP4:127.0.0.1:4321', partition_supervisord_config)
socat_supervisord_config = self._read_socat_supervisord_config()
self.assertIn('socat-tcp-{}'.format(1234), socat_supervisord_config)
self.assertIn('socat TCP4-LISTEN:1234,fork TCP4:127.0.0.1:4321', socat_supervisord_config)
def test_ipv6_port_redirection(self):
with self._mock_requests():
......@@ -3166,9 +3170,9 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertIn('socat-tcp-{}'.format(1234), partition_supervisord_config)
self.assertIn('socat TCP4-LISTEN:1234,fork TCP6:[::1]:4321', partition_supervisord_config)
socat_supervisord_config = self._read_socat_supervisord_config()
self.assertIn('socat-tcp-{}'.format(1234), socat_supervisord_config)
self.assertIn('socat TCP4-LISTEN:1234,fork TCP6:[::1]:4321', socat_supervisord_config)
def test_udp_port_redirection(self):
with self._mock_requests():
......@@ -3182,9 +3186,9 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertIn('socat-udp-{}'.format(1234), partition_supervisord_config)
self.assertIn('socat UDP4-LISTEN:1234,fork UDP4:127.0.0.1:4321', partition_supervisord_config)
socat_supervisord_config = self._read_socat_supervisord_config()
self.assertIn('socat-udp-{}'.format(1234), socat_supervisord_config)
self.assertIn('socat UDP4-LISTEN:1234,fork UDP4:127.0.0.1:4321', socat_supervisord_config)
def test_portredir_config_change(self):
# We want the partition to just get updated, not recreated
......@@ -3200,9 +3204,9 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertIn('socat-tcp-{}'.format(1234), partition_supervisord_config)
self.assertIn('socat TCP4-LISTEN:1234,fork TCP4:127.0.0.1:4321', partition_supervisord_config)
socat_supervisord_config = self._read_socat_supervisord_config()
self.assertIn('socat-tcp-{}'.format(1234), socat_supervisord_config)
self.assertIn('socat TCP4-LISTEN:1234,fork TCP4:127.0.0.1:4321', socat_supervisord_config)
# Remove the port binding from config
with open(self.port_redirect_path, 'w+') as f:
......@@ -3219,9 +3223,7 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
self.assertEqual(self.partition.state, 'started')
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-tcp-{}'.format(1234), partition_supervisord_config)
self.assertNotIn('socat TCP4-LISTEN:1234,fork TCP4:127.0.0.1:4321', partition_supervisord_config)
self.assertFalse(os.path.exists(self.socat_supervisord_config_path))
def test_port_redirection_config_bad_source_port(self):
with self._mock_requests():
......@@ -3234,9 +3236,7 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-tcp-bad', partition_supervisord_config)
self.assertNotIn('socat TCP4-LISTEN:bad,fork TCP4:127.0.0.1:4321', partition_supervisord_config)
self.assertFalse(os.path.exists(self.socat_supervisord_config_path))
def test_port_redirection_config_bad_dest_port(self):
with self._mock_requests():
......@@ -3249,9 +3249,7 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-tcp-1234', partition_supervisord_config)
self.assertNotIn('socat TCP4-LISTEN:1234,fork TCP4:127.0.0.1:wolf', partition_supervisord_config)
self.assertFalse(os.path.exists(self.socat_supervisord_config_path))
def test_port_redirection_config_bad_source_address(self):
with self._mock_requests():
......@@ -3265,9 +3263,7 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-tcp-1234', partition_supervisord_config)
self.assertNotIn('socat TCP4-LISTEN:1234,bind=bad,fork TCP4:127.0.0.1:4321', partition_supervisord_config)
self.assertFalse(os.path.exists(self.socat_supervisord_config_path))
def test_port_redirection_config_bad_dest_address(self):
with self._mock_requests():
......@@ -3280,9 +3276,7 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-tcp-1234', partition_supervisord_config)
self.assertNotIn('socat TCP4-LISTEN:1234,fork TCP4:wolf:4321', partition_supervisord_config)
self.assertFalse(os.path.exists(self.socat_supervisord_config_path))
def test_port_redirection_config_bad_redir_type(self):
with self._mock_requests():
......@@ -3296,9 +3290,7 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-htcpcp-1234', partition_supervisord_config)
self.assertNotIn('socat HTCPCP4-LISTEN:1234,fork HTCPCP4:127.0.0.1:4321', partition_supervisord_config)
self.assertFalse(os.path.exists(self.socat_supervisord_config_path))
class TestSlapgridWithDevPermLsblk(MasterMixin, unittest.TestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment