Commit 3bb36903 authored by Xavier Thompson's avatar Xavier Thompson

slapgrid: Process promises with SR python

See merge request nexedi/slapos.core!329
parents b1dd96b1 45320b33
......@@ -45,7 +45,8 @@ from six.moves.configparser import ConfigParser
from supervisor import xmlrpc
from slapos.grid.utils import (md5digest, getCleanEnvironment,
SlapPopen, dropPrivileges, updateFile)
SlapPopen, dropPrivileges, updateFile,
getPythonExecutableFromSoftwarePath)
from slapos.grid import utils # for methods that could be mocked, access them through the module
from slapos.slap.slap import NotFoundError
from slapos.grid.svcbackend import getSupervisorRPC
......@@ -472,6 +473,8 @@ class Partition(object):
self.instance_min_free_space = instance_min_free_space
self.instance_python = getPythonExecutableFromSoftwarePath(self.software_path)
def check_free_space(self):
required = self.instance_min_free_space or 0
......@@ -705,6 +708,7 @@ class Partition(object):
debug=self.buildout_debug)
self.generateSupervisorConfigurationFile()
self.createRetentionLockDelay()
self.instance_python = getPythonExecutableFromSoftwarePath(self.software_path)
def generateSupervisorConfiguration(self):
"""
......
......@@ -44,7 +44,7 @@ import hashlib
from datetime import datetime
from multiprocessing import Process, Queue as MQueue
from six.moves import queue, reload_module
from slapos.util import str2bytes, mkdir_p, chownDirectory
from slapos.util import str2bytes, mkdir_p, chownDirectory, listifdir
from slapos.grid.utils import dropPrivileges, killProcessTree
from slapos.grid.promise import interface
from slapos.grid.promise.generic import (GenericPromise, PromiseQueueResult,
......@@ -731,65 +731,63 @@ class PromiseLauncher(object):
error = 0
success = 0
promise_name_list = []
if os.path.exists(self.promise_folder) and os.path.isdir(self.promise_folder):
for promise_name in os.listdir(self.promise_folder):
for suffix in ['.pyc', '.pyo']:
if promise_name.endswith(suffix):
promise_path = os.path.join(self.promise_folder, promise_name)
if not os.path.exists(promise_path[:-1]):
try:
os.unlink(promise_path)
except Exception as e:
self.logger.warning('Failed to remove %r because of %s', promise_path, e)
else:
self.logger.debug('Removed stale %r', promise_path)
if promise_name.startswith('__init__') or \
not promise_name.endswith('.py'):
continue
promise_name_list.append(promise_name)
if self.run_only_promise_list is not None and not \
promise_name in self.run_only_promise_list:
continue
for promise_name in listifdir(self.promise_folder):
if promise_name.endswith(('.pyc', '.pyo')):
promise_path = os.path.join(self.promise_folder, promise_name)
config = {
'path': promise_path,
'name': promise_name
}
config.update(base_config)
promise_result = self._launchPromise(promise_name, promise_path, config)
if promise_result:
change_date = promise_result.date.strftime('%Y-%m-%dT%H:%M:%S+0000')
if promise_result.hasFailed():
promise_status = 'FAILED'
if not os.path.exists(promise_path[:-1]):
try:
os.unlink(promise_path)
except Exception as e:
self.logger.warning('Failed to remove %r because of %s', promise_path, e)
else:
self.logger.debug('Removed stale %r', promise_path)
if promise_name.startswith('__init__') or \
not promise_name.endswith('.py'):
continue
promise_name_list.append(promise_name)
if self.run_only_promise_list is not None and not \
promise_name in self.run_only_promise_list:
continue
promise_path = os.path.join(self.promise_folder, promise_name)
config = {
'path': promise_path,
'name': promise_name
}
config.update(base_config)
promise_result = self._launchPromise(promise_name, promise_path, config)
if promise_result:
change_date = promise_result.date.strftime('%Y-%m-%dT%H:%M:%S+0000')
if promise_result.hasFailed():
promise_status = 'FAILED'
error += 1
else:
promise_status = "OK"
success += 1
if promise_name in previous_state_dict:
status, previous_change_date, _ = previous_state_dict[promise_name]
if promise_status == status:
change_date = previous_change_date
message = promise_result.message if promise_result.message else ""
new_state_dict[promise_name] = [
promise_status,
change_date,
hashlib.md5(str2bytes(message)).hexdigest()]
if promise_result.hasFailed() and not failed_promise_name:
failed_promise_name = promise_name
failed_promise_output = promise_result.message
else:
# The promise was skip, so for statistic point of view we preserve
# the previous result
if promise_name in new_state_dict:
if new_state_dict[promise_name][0] == "FAILED":
error += 1
else:
promise_status = "OK"
success += 1
if promise_name in previous_state_dict:
status, previous_change_date, _ = previous_state_dict[promise_name]
if promise_status == status:
change_date = previous_change_date
message = promise_result.message if promise_result.message else ""
new_state_dict[promise_name] = [
promise_status,
change_date,
hashlib.md5(str2bytes(message)).hexdigest()]
if promise_result.hasFailed() and not failed_promise_name:
failed_promise_name = promise_name
failed_promise_output = promise_result.message
else:
# The promise was skip, so for statistic point of view we preserve
# the previous result
if promise_name in new_state_dict:
if new_state_dict[promise_name][0] == "FAILED":
error += 1
else:
success += 1
if not self.run_only_promise_list and len(promise_name_list) > 0:
# cleanup stale json files
......@@ -816,10 +814,9 @@ class PromiseLauncher(object):
if key not in promise_name_list:
new_state_dict.pop(key, None)
if not self.run_only_promise_list and os.path.exists(self.legacy_promise_folder) \
and os.path.isdir(self.legacy_promise_folder):
if not self.run_only_promise_list:
# run legacy promise styles
for promise_name in os.listdir(self.legacy_promise_folder):
for promise_name in listifdir(self.legacy_promise_folder):
promise_path = os.path.join(self.legacy_promise_folder, promise_name)
if not os.path.isfile(promise_path) or \
not os.access(promise_path, os.X_OK):
......
from __future__ import print_function
import argparse
import ast
import os
import sys
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('--promise-folder', required=True)
parser.add_argument('--legacy-promise-folder', default=None)
parser.add_argument('--promise-timeout', type=int, default=20)
parser.add_argument('--partition-folder', default=None)
parser.add_argument('--log-folder', default=None)
parser.add_argument('--force', action='store_true')
parser.add_argument('--check-anomaly', action='store_true')
parser.add_argument('--debug', action='store_true')
parser.add_argument('--master-url', default=None)
parser.add_argument('--partition-cert', default=None)
parser.add_argument('--partition-key', default=None)
parser.add_argument('--partition-id', default=None)
parser.add_argument('--computer-id', default=None)
args = parser.parse_args()
# Extract slapos.core path and all dependencies from first promise found
# to import slapos.core
promise_folder = args.promise_folder
promise_file = next(
p for p in os.listdir(promise_folder)
if p.endswith('.py') and not p.startswith('__init__')
)
with open(os.path.join(promise_folder, promise_file)) as f:
promise_content = f.read()
tree = ast.parse(promise_content, mode='exec')
sys.path[0:0] = eval(compile(ast.Expression(tree.body[1].value), '', 'eval'))
from slapos.grid.promise import PromiseLauncher, PromiseError
from slapos.cli.entry import SlapOSApp
# Configure promise launcher
# with the same logger as standard slapos command
app = SlapOSApp()
app.options, _ = app.parser.parse_known_args([])
app.configure_logging()
config = {k.replace('_', '-') : v for k, v in vars(args).items()}
promise_checker = PromiseLauncher(config=config, logger=app.log)
# Run promises
# Redirect stdout to stderr (logger only uses stderr already)
# to reserve stdout for error reporting
out = os.dup(1)
os.dup2(2, 1)
try:
promise_checker.run()
except Exception as e:
os.write(out, str(e))
sys.exit(2 if isinstance(e, PromiseError) else 1)
......@@ -33,7 +33,6 @@ import pkg_resources
import random
import socket
from io import BytesIO
import subprocess
import sys
import tempfile
import time
......@@ -45,6 +44,11 @@ import shutil
import six
import errno
if six.PY3:
import subprocess
else:
import subprocess32 as subprocess
if sys.version_info < (2, 6):
warnings.warn('Used python version (%s) is old and has problems with'
' IPv6 connections' % sys.version.split('\n')[0])
......@@ -55,14 +59,18 @@ from slapos import manager as slapmanager
from slapos.slap.slap import NotFoundError
from slapos.slap.slap import ServerError
from slapos.slap.slap import COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME
from slapos.util import mkdir_p, chownDirectory, string_to_boolean
from slapos.util import mkdir_p, chownDirectory, string_to_boolean, listifdir
from slapos.grid.exception import BuildoutFailedError
from slapos.grid.SlapObject import Software, Partition
from slapos.grid.svcbackend import (launchSupervisord,
createSupervisordConfiguration,
_getSupervisordConfigurationDirectory,
_getSupervisordSocketPath)
from slapos.grid.utils import (md5digest, dropPrivileges, SlapPopen, updateFile)
from slapos.grid.utils import (md5digest,
dropPrivileges,
killProcessTree,
SlapPopen,
updateFile)
from slapos.grid.promise import PromiseLauncher, PromiseError
from slapos.grid.promise.generic import PROMISE_LOG_FOLDER_NAME
from slapos.human import human2bytes
......@@ -302,10 +310,12 @@ def check_required_only_partitions(existing, required):
"""
Verify the existence of partitions specified by the --only parameter
"""
missing = set(required) - set(existing)
required = set(required)
missing = required.difference(existing)
if missing:
plural = ['s', ''][len(missing) == 1]
raise ValueError('Unknown partition%s: %s' % (plural, ', '.join(sorted(missing))))
return required
class Slapgrid(object):
......@@ -549,6 +559,18 @@ stderr_logfile_backups=1
self.logger.fatal(exc)
raise
def getRequiredComputerPartitionList(self):
"""Return the computer partitions that should be processed.
"""
cp_list = self.getComputerPartitionList()
cp_id_list = [cp.getId() for cp in cp_list]
required_cp_id_set = check_required_only_partitions(
cp_id_list, self.computer_partition_filter_list)
busy_cp_list = self.FilterComputerPartitionList(cp_list)
if required_cp_id_set:
return [cp for cp in busy_cp_list if cp.getId() in required_cp_id_set]
return busy_cp_list
def processSoftwareReleaseList(self):
"""Will process each Software Release.
"""
......@@ -655,18 +677,18 @@ stderr_logfile_backups=1
return SLAPGRID_SUCCESS
def _checkPromiseList(self, partition, force=True, check_anomaly=False):
instance_path = os.path.join(self.instance_root, partition.partition_id)
partition_id = partition.partition_id
self.logger.info("Checking %s promises...", partition_id)
instance_path = os.path.join(self.instance_root, partition_id)
promise_log_path = os.path.join(instance_path, PROMISE_LOG_FOLDER_NAME)
promise_dir = os.path.join(instance_path, 'etc', 'plugin')
legacy_promise_dir = os.path.join(instance_path, 'etc', 'promise')
self.logger.info("Checking %s promises..." % partition.partition_id)
uid, gid = None, None
stat_info = os.stat(instance_path)
#stat sys call to get statistics informations
uid = stat_info.st_uid
gid = stat_info.st_gid
promise_dir = os.path.join(instance_path, 'etc', 'plugin')
legacy_promise_dir = os.path.join(instance_path, 'etc', 'promise')
promise_config = {
'promise-folder': promise_dir,
'legacy-promise-folder': legacy_promise_dir,
......@@ -680,12 +702,56 @@ stderr_logfile_backups=1
'master-url': partition.server_url,
'partition-cert': partition.cert_file,
'partition-key': partition.key_file,
'partition-id': partition.partition_id,
'partition-id': partition_id,
'computer-id': self.computer_id,
}
promise_checker = PromiseLauncher(config=promise_config, logger=self.logger)
return promise_checker.run()
plugins = sum(
1 for p in listifdir(promise_dir)
if p.endswith('.py') and not p.startswith('__init__')
)
instance_python = partition.instance_python
if instance_python is not None and plugins:
self.logger.info(
"Switching to %s's python at %s", partition_id, instance_python)
runpromise_script = os.path.join(
os.path.dirname(__file__), 'promise', 'runpromises.py')
command = [instance_python, runpromise_script]
for option, value in promise_config.items():
if option in ('uid', 'gid'):
continue
if isinstance(value, bool):
if value:
command.append('--' + option)
else:
command.append('--' + option)
command.append(str(value))
process = subprocess.Popen(
command,
preexec_fn=lambda: dropPrivileges(uid, gid, logger=self.logger),
cwd=instance_path,
stdout=subprocess.PIPE)
promises = plugins + len(listifdir(legacy_promise_dir))
# Add a timeout margin to let the process kill the promises and cleanup
timeout = promises * self.promise_timeout + 10
try:
# The logger logs everything to stderr, so runpromise redirects
# stdout to stderr in case a promise prints to stdout
# and reserves stdout to progagate exception messages.
out, _ = process.communicate(timeout=timeout)
if process.returncode == 2:
raise PromiseError(out)
elif process.returncode:
raise Exception(out)
elif out:
self.logger.warn('Promise runner unexpected output:\n%s', out)
except subprocess.TimeoutExpired:
killProcessTree(process.pid, self.logger)
# The timeout margin was exceeded but this should be infrequent
raise Exception('Promise runner timed out')
else:
return PromiseLauncher(config=promise_config, logger=self.logger).run()
def _endInstallationTransaction(self, computer_partition):
partition_id = computer_partition.getId()
......@@ -972,12 +1038,6 @@ stderr_logfile_backups=1
if not computer_partition_id:
raise ValueError('Computer Partition id is empty.')
# Check if we defined explicit list of partitions to process.
# If so, if current partition not in this list, skip.
if len(self.computer_partition_filter_list) > 0 and \
(computer_partition_id not in self.computer_partition_filter_list):
return
instance_path = os.path.join(self.instance_root, computer_partition_id)
os.environ['SLAPGRID_INSTANCE_ROOT'] = self.instance_root
try:
......@@ -1038,12 +1098,6 @@ stderr_logfile_backups=1
if not computer_partition_id:
raise ValueError('Computer Partition id is empty.')
# Check if we defined explicit list of partitions to process.
# If so, if current partition not in this list, skip.
if len(self.computer_partition_filter_list) > 0 and \
(computer_partition_id not in self.computer_partition_filter_list):
return
self.logger.debug('Check if %s requires processing...' % computer_partition_id)
instance_path = os.path.join(self.instance_root, computer_partition_id)
......@@ -1357,12 +1411,7 @@ stderr_logfile_backups=1
# Boolean to know if every promises correctly passed
clean_run_promise = True
check_required_only_partitions([cp.getId() for cp in self.getComputerPartitionList()],
self.computer_partition_filter_list)
# Filter all dummy / empty partitions
computer_partition_list = self.FilterComputerPartitionList(
self.getComputerPartitionList())
computer_partition_list = self.getRequiredComputerPartitionList()
process_error_partition_list = []
promise_error_partition_list = []
......@@ -1438,12 +1487,8 @@ stderr_logfile_backups=1
self.logger.info('Processing promises...')
# Return success value
clean_run_promise = True
check_required_only_partitions([cp.getId() for cp in self.getComputerPartitionList()],
self.computer_partition_filter_list)
# Filter all dummy / empty partitions
computer_partition_list = self.FilterComputerPartitionList(
self.getComputerPartitionList())
computer_partition_list = self.getRequiredComputerPartitionList()
promise_error_partition_list = []
for computer_partition in computer_partition_list:
......@@ -1599,37 +1644,24 @@ stderr_logfile_backups=1
try:
computer_partition_id = computer_partition.getId()
# We want to execute all the script in the report folder
instance_path = os.path.join(self.instance_root,
computer_partition.getId())
report_path = os.path.join(instance_path, 'etc', 'report')
if os.path.isdir(report_path):
script_list_to_run = os.listdir(report_path)
else:
script_list_to_run = []
instance_path = os.path.join(self.instance_root, computer_partition_id)
# We now generate the pseudorandom name for the xml file
# and we add it in the invocation_list
f = tempfile.NamedTemporaryFile()
name_xml = '%s.%s' % ('slapreport', os.path.basename(f.name))
path_to_slapreport = os.path.join(instance_path, 'var', 'xml_report',
name_xml)
# We now generate a pseudorandom name for the report xml file
# that will be passed to the invocation list
slapreport_path = tempfile.mktemp(
prefix='slapreport.',
dir=os.path.join(instance_path, 'var', 'xml_report'))
# We want to execute all the script in the report folder
failed_script_list = []
for script in script_list_to_run:
report_dir = os.path.join(instance_path, 'etc', 'report')
for script in listifdir(report_dir):
invocation_list = []
invocation_list.append(os.path.join(instance_path, 'etc', 'report',
script))
# We add the xml_file name to the invocation_list
#f = tempfile.NamedTemporaryFile()
#name_xml = '%s.%s' % ('slapreport', os.path.basename(f.name))
#path_to_slapreport = os.path.join(instance_path, 'var', name_xml)
invocation_list.append(path_to_slapreport)
invocation_list.append(os.path.join(report_dir, script))
invocation_list.append(slapreport_path)
# Dropping privileges
uid, gid = None, None
stat_info = os.stat(instance_path)
#stat sys call to get statistics informations
uid = stat_info.st_uid
gid = stat_info.st_gid
process_handler = SlapPopen(invocation_list,
......
......@@ -164,6 +164,20 @@ def md5digest(url):
return hashlib.md5(url.encode('utf-8')).hexdigest()
def getPythonExecutableFromSoftwarePath(software_path):
"""
Return the path of the python executable installed for the software release
installed as `software_path`.
"""
try:
with open(os.path.join(software_path, 'bin', 'buildout')) as f:
shebang = f.readline()
except OSError:
return
if shebang.startswith('#!'):
return shebang[2:].split(None, 1)[0]
def getCleanEnvironment(logger, home_path='/tmp'):
changed_env = {}
removed_env = []
......
......@@ -41,6 +41,8 @@ from contextlib import closing
from six.moves import BaseHTTPServer
from six.moves import urllib_parse
from ..grid.utils import getPythonExecutableFromSoftwarePath
try:
import typing
if typing.TYPE_CHECKING:
......@@ -77,8 +79,12 @@ def getPromisePluginParameterDict(filepath):
This allow to check that monitoring plugin are using a proper config.
"""
executable = getPythonExecutableFromSoftwarePath(
os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(filepath))),
'software_release'))
extra_config_dict_json = subprocess.check_output([
sys.executable,
executable,
"-c",
"""
import json, sys
......
......@@ -45,6 +45,7 @@ import json
import re
import grp
import hashlib
import errno
import mock
from mock import patch
......@@ -65,6 +66,11 @@ import slapos.grid.SlapObject
from slapos import manager as slapmanager
from slapos.util import dumps
from slapos import __path__ as slapos_path
from zope import __path__ as zope_path
PROMISE_PATHS = sorted(set(map(os.path.dirname, slapos_path + zope_path)))
import httmock
......@@ -113,6 +119,9 @@ touch worked
"""
PROMISE_CONTENT_TEMPLATE = """
import sys
sys.path[0:0] = %(paths)r
from zope.interface import implementer
from slapos.grid.promise import interface
from slapos.grid.promise import GenericPromise
......@@ -122,7 +131,7 @@ class RunPromise(GenericPromise):
def __init__(self, config):
super(RunPromise, self).__init__(config)
self.setPeriodicity(minute=%(periodicity)s)
self.setPeriodicity(minute=%(periodicity)r)
def sense(self):
%(content)s
......@@ -133,11 +142,11 @@ class RunPromise(GenericPromise):
self.logger.info("success")
def anomaly(self):
return self._anomaly(result_count=2, failure_amount=%(failure_amount)s)
return self._anomaly(result_count=2, failure_amount=%(failure_amount)r)
def test(self):
return self._test(result_count=1, failure_amount=%(failure_amount)s)
"""
return self._test(result_count=1, failure_amount=%(failure_amount)r)
"""
class BasicMixin(object):
def setUp(self):
......@@ -150,6 +159,14 @@ class BasicMixin(object):
del os.environ['SLAPGRID_INSTANCE_ROOT']
logging.basicConfig(level=logging.DEBUG)
self.setSlapgrid()
self.setMock()
def setMock(self):
module = slapos.grid.SlapObject
func = 'getPythonExecutableFromSoftwarePath'
orig = getattr(module, func)
self.addCleanup(setattr, module, func, orig)
setattr(module, func, lambda software_path: None)
def setSlapgrid(self, develop=False, force_stop=False):
if getattr(self, 'master_url', None) is None:
......@@ -576,7 +593,8 @@ class InstanceForTest(object):
{'success': success,
'content': promise_content,
'failure_amount': failure_count,
'periodicity': periodicity}
'periodicity': periodicity,
'paths': PROMISE_PATHS}
with open(os.path.join(promise_dir, promise_name), 'w') as f:
f.write(_promise_content)
......@@ -599,7 +617,7 @@ class InstanceForTest(object):
class SoftwareForTest(object):
"""
Class to prepare and simulate software.
each instance has a sotfware attributed
each instance has a software attributed
"""
def __init__(self, software_root, name=''):
"""
......@@ -2297,6 +2315,7 @@ exit 1
self.assertFalse(os.path.exists(promise_file))
self.assertTrue(instance.error)
class TestSlapgridDestructionLock(MasterMixin, unittest.TestCase):
def test_retention_lock(self):
"""
......@@ -3999,9 +4018,6 @@ class TestSlapgridPromiseWithMaster(MasterMixin, unittest.TestCase):
self.assertEqual('success', result["result"]["message"])
def test_one_succeeding_one_timing_out_promises(self):
computer = ComputerForTest(self.software_root, self.instance_root)
with httmock.HTTMock(computer.request_handler):
......@@ -4095,6 +4111,58 @@ class TestSlapgridPromiseWithMaster(MasterMixin, unittest.TestCase):
".slapgrid/promise/result/fail.status.json")))
class TestSlapgridPluginPromiseWithInstancePython(TestSlapgridPromiseWithMaster):
expect_plugin = False
def setPython(self):
self.python_called = os.path.join(self.software_root, 'called')
wrapper = """#!/bin/sh
touch %s
exec %s "$@"
""" % (self.python_called, sys.executable)
path = os.path.join(self.software_root, 'python')
with open(path, 'w') as f:
f.write(wrapper)
os.chmod(path, 0o755)
return path
def patchBuildoutSetter(self):
cls = SoftwareForTest
attr = 'setBuildout'
orig = getattr(cls, attr)
def setBuildout(soft):
buildout = "#!" + self.setPython()
orig(soft, buildout)
self.addCleanup(setattr, cls, attr, orig)
setattr(cls, attr, setBuildout)
def patchPluginSetter(self):
cls = InstanceForTest
attr = 'setPluginPromise'
orig = getattr(cls, attr)
def setPluginPromise(inst, *args, **kwargs):
self.expect_plugin = inst.requested_state == 'started'
return orig(inst, *args, **kwargs)
self.addCleanup(setattr, cls, attr, orig)
setattr(cls, attr, setPluginPromise)
def setMock(self):
self.patchBuildoutSetter()
self.patchPluginSetter()
def tearDown(self):
try:
os.remove(self.python_called)
called = True
except OSError as e:
if e.errno != errno.ENOENT:
raise
called = False
finally:
super(TestSlapgridPluginPromiseWithInstancePython, self).tearDown()
self.assertEqual(self.expect_plugin, called)
class TestSVCBackend(unittest.TestCase):
"""Tests for supervisor backend.
"""
......
......@@ -95,6 +95,18 @@ def mkdir_p(path, mode=0o700):
raise
def listifdir(path):
"""
Like listdir, but returns an empty tuple if the path is not a directory.
"""
try:
return os.listdir(path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
return ()
def chownDirectory(path, uid, gid):
if os.getuid() != 0:
# we are probably inside of a webrunner
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment