Commit 94b4c294 authored by Tomáš Peterka's avatar Tomáš Peterka

WIP: Move manager out from slapformat and add security

parent a81c9a6d
slapos.manager
==============
Manager is a plugin-like class that is being run in multiple phases of slapos node lifecycle.
- **format**, manager can format additionally the underlaying OS
- **software**, manager can react on software installation
- **instance**, manager can update instance runtime frequently
Constructor will receive configuration of current stage. Then each method receives
object most related to the current operation. For details see <slapos/manager/interface.py>.
...@@ -60,14 +60,11 @@ import slapos.util ...@@ -60,14 +60,11 @@ import slapos.util
from slapos.util import mkdir_p from slapos.util import mkdir_p
import slapos.slap as slap import slapos.slap as slap
from slapos import version from slapos import version
from slapos import manager as slapmanager
logger = logging.getLogger("slapos.format") logger = logging.getLogger("slapos.format")
# dict[str: ManagerClass] used in configuration and XML dump of computer
# this dictionary is intended to be filled after each definition of a Manager
available_manager_list = {}
def prettify_xml(xml): def prettify_xml(xml):
root = lxml.etree.fromstring(xml) root = lxml.etree.fromstring(xml)
...@@ -244,205 +241,6 @@ def _getDict(obj): ...@@ -244,205 +241,6 @@ def _getDict(obj):
} }
class Manager(object):
short_name = None
def format(self):
raise NotImplementedError("Implement function to format underlaying OS")
def update(self):
raise NotImplementedError("Implement function to format underlaying OS")
class CGroupManager(Manager):
"""Manage cgroups in terms on initializing and runtime operations.
This class takes advantage of slapformat being run periodically thus
it can act as a "daemon" performing runtime tasks.
"""
short_name = 'cgroup'
cpu_exclusive_file = ".slapos-cpu-exclusive"
cpuset_path = "/sys/fs/cgroup/cpuset/"
task_write_mode = "wt"
def __init__(self, computer):
"""Extract necessary information from the ``computer``.
:param computer: slapos.format.Computer, extract necessary attributes
"""
self.instance_root = computer.instance_root
logger.info("Allowing " + self.__class__.__name__)
def __str__(self):
"""Manager representation when dumped to string."""
return self.short_name
def is_allowed(self):
return os.path.exists("/sys/fs/cgroup/cpuset/cpuset.cpus")
def format(self):
"""Build CGROUP tree to fit SlapOS needs.
- Create hierarchy of CPU sets so that every partition can have exclusive
hold of one of the CPUs.
"""
self.prepare_cpuset()
self.prepare_cpu_space()
def update(self):
"""Control runtime state of the computer."""
cpu0_path = os.path.join(self.cpuset_path, "cpu0")
if os.path.exists(cpu0_path):
# proceed only whe CPUSETs were formatted by this manager
self.prepare_cpu_space()
self.ensure_exlusive_cpu()
else:
logger.warning("Computer was not formatted by {} because {} doesn't exist!".format(
self.__class__.__name__, cpu0_path))
def prepare_cpuset(self):
"""Create cgroup folder per-CPU with exclusive access to the CPU.
Those folders are "/sys/fs/cgroup/cpuset/cpu<N>".
"""
for cpu in self._cpu_list():
cpu_path = self._prepare_folder(
os.path.join(self.cpuset_path, "cpu" + str(cpu)))
with open(cpu_path + "/cpuset.cpus", "wt") as fx:
fx.write(str(cpu)) # this cgroup manages only this cpu
with open(cpu_path + "/cpuset.cpu_exclusive", "wt") as fx:
fx.write("1") # manages it exclusively
with open(cpu_path + "/cpuset.mems", "wt") as fx:
fx.write("0") # it doesn't work without that
def ensure_exlusive_cpu(self):
"""Move processes among exclusive CPUSets based on software release demands.
We expect PIDs which require own CPU to be found in ~instance/.slapos-cpu-exclusive
"""
cpu_list = self._cpu_list()
generic_cpu = cpu_list[0]
exclusive_cpu_list = cpu_list[1:]
# gather all running PIDs for filtering out stale PIDs
running_pid_set = set()
with open(os.path.join(self.cpuset_path, "tasks"), "rt") as fi:
running_pid_set.update(map(int, fi.read().split()))
with open(os.path.join(self.cpuset_path, "cpu" + str(generic_cpu), "tasks"), "rt") as fi:
running_pid_set.update(map(int, fi.read().split()))
# gather already exclusively running PIDs
exclusive_pid_set = set()
for exclusive_cpu in exclusive_cpu_list:
with open(os.path.join(self.cpuset_path, "cpu" + str(exclusive_cpu), "tasks"), "rt") as fi:
exclusive_pid_set.update(map(int, fi.read().split()))
for request_file in glob.iglob(os.path.join(self.instance_root, '*', CGroupManager.cpu_exclusive_file)):
with open(request_file, "rt") as fi:
# take such PIDs which are either really running or are not already exclusive
request_pid_list = [int(pid) for pid in fi.read().split()
if int(pid) in running_pid_set or int(pid) not in exclusive_pid_set]
with open(request_file, "wt") as fo:
fo.write("") # empty file (we will write back only PIDs which weren't moved)
for request_pid in request_pid_list:
assigned_cpu = self._move_to_exclusive_cpu(request_pid)
if assigned_cpu < 0:
# if no exclusive CPU was assigned - write the PID back and try other time
with open(request_file, "at") as fo:
fo.write(str(request_pid) + "\n")
def prepare_cpu_space(self):
"""Move all PIDs from the pool of all CPUs into the first exclusive CPU."""
with open(os.path.join(self.cpuset_path, "tasks"), "rt") as fi:
running_list = sorted(list(map(int, fi.read().split())), reverse=True)
first_cpu = self._cpu_list()[0]
success_set = set()
refused_set = set()
for pid in running_list:
try:
self._move_task(pid, first_cpu)
success_set.add(pid)
time.sleep(0.01)
except IOError as e:
refused_set.add(pid)
logger.debug("Refused to move {:d} PIDs: {!s}\n"
"Suceeded in moving {:d} PIDs {!s}\n".format(
len(refused_set), refused_set, len(success_set), success_set)
)
def _cpu_list(self):
"""Extract IDs of available CPUs and return them as a list.
The first one will be always used for all non-exclusive processes.
:return: list[int]
"""
cpu_list = [] # types: list[int]
with open(self.cpuset_path + "cpuset.cpus", "rt") as cpu_def:
for cpu_def_split in cpu_def.read().strip().split(","):
# IDs can be in form "0-4" or "0,1,2,3,4"
if "-" in cpu_def_split:
a, b = map(int, cpu_def_split.split("-"))
cpu_list.extend(range(a, b + 1)) # because cgroup's range is inclusive
continue
cpu_list.append(int(cpu_def_split))
return cpu_list
def _move_to_exclusive_cpu(self, pid):
"""Try all exclusive CPUs and place the ``pid`` to the first available one.
:return: int, cpu_id of used CPU, -1 if placement was not possible
"""
exclusive_cpu_list = self._cpu_list()[1:]
for exclusive_cpu in exclusive_cpu_list:
# gather tasks assigned to current exclusive CPU
task_path = os.path.join(self.cpuset_path, "cpu" + str(exclusive_cpu), "tasks")
with open(task_path, "rt") as fi:
task_list = fi.read().split()
if len(task_list) > 0:
continue # skip occupied CPUs
return self._move_task(pid, exclusive_cpu)[1]
return -1
def _move_task(self, pid, cpu_id, cpu_mode="performance"):
"""Move ``pid`` to ``cpu_id``.
cpu_mode can be "performance" or "powersave"
"""
known_cpu_mode_list = ("performance", "powersave")
with open(os.path.join(self.cpuset_path, "cpu" + str(cpu_id), "tasks"), self.task_write_mode) as fo:
fo.write(str(pid) + "\n")
# set the core to `cpu_mode`
scaling_governor_file = "/sys/devices/system/cpu/cpu{:d}/cpufreq/scaling_governor".format(cpu_id)
if os.path.exists(scaling_governor_file):
if cpu_mode not in known_cpu_mode_list:
logger.warning("Cannot set CPU to mode \"{}\"! Known modes {!s}".format(
cpu_mode, known_cpu_mode_list))
else:
try:
with open(scaling_governor_file, self.task_write_mode) as fo:
fo.write(cpu_mode + "\n") # default is "powersave"
except IOError as e:
# handle permission error
logger.error("Failed to write \"{}\" to {}".format(cpu_mode, scaling_governor_file))
return pid, cpu_id
def _prepare_folder(self, folder):
"""If-Create folder and set group write permission."""
if not os.path.exists(folder):
os.mkdir(folder)
# make your life and testing easier and create mandatory files if they don't exist
mandatory_file_list = ("tasks", "cpuset.cpus")
for mandatory_file in mandatory_file_list:
file_path = os.path.join(folder, mandatory_file)
if not os.path.exists(file_path):
with open(file_path, "wb"):
pass # touche
return folder
# mark manager available
available_manager_list[CGroupManager.short_name] = CGroupManager
class Computer(object): class Computer(object):
"""Object representing the computer""" """Object representing the computer"""
...@@ -450,11 +248,13 @@ class Computer(object): ...@@ -450,11 +248,13 @@ class Computer(object):
ipv6_interface=None, software_user='slapsoft', ipv6_interface=None, software_user='slapsoft',
tap_gateway_interface=None, tap_gateway_interface=None,
instance_root=None, software_root=None, instance_storage_home=None, instance_root=None, software_root=None, instance_storage_home=None,
partition_list=None, manager_list=None): partition_list=None, config=None):
""" """
Attributes: Attributes:
reference: str, the reference of the computer. reference: str, the reference of the computer.
interface: str, the name of the computer's used interface. interface: str, the name of the computer's used interface.
:param config: dict-like, holds raw data from configuration file
""" """
self.reference = str(reference) self.reference = str(reference)
self.interface = interface self.interface = interface
...@@ -478,12 +278,10 @@ class Computer(object): ...@@ -478,12 +278,10 @@ class Computer(object):
self.python_version = None self.python_version = None
self.slapos_version = None self.slapos_version = None
# HASA relation to managers (could turn into plugins with `format` and `update` methods) # attributes starting with '_' are saved from serialization
self.manager_list = manager_list # for serialization # monkey-patch use of class instead of dictionary
# hide list[Manager] from serializer by prepending "_" self._config = config if isinstance(config, dict) else config.__dict__
self._manager_list = tuple(filter(lambda manager: manager.is_allowed(), self._manager_list = slapmanager.from_config(self._config)
(available_manager_list[manager_str](self) for manager_str in manager_list))) \
if manager_list else tuple()
def __getinitargs__(self): def __getinitargs__(self):
return (self.reference, self.interface) return (self.reference, self.interface)
...@@ -521,12 +319,7 @@ class Computer(object): ...@@ -521,12 +319,7 @@ class Computer(object):
raise NoAddressOnInterface('No valid IPv6 found on %s.' % self.interface.name) raise NoAddressOnInterface('No valid IPv6 found on %s.' % self.interface.name)
def update(self): def update(self):
"""Update computer runtime info and state.""" """Collect environmental hardware/network information."""
for manager in self._manager_list:
logger.info("Updating computer with " + manager.__class__.__name__)
manager.update()
# Collect environmental hardware/network information.
self.public_ipv4_address = getPublicIPv4Address() self.public_ipv4_address = getPublicIPv4Address()
self.slapos_version = version.version self.slapos_version = version.version
self.python_version = platform.python_version() self.python_version = platform.python_version()
...@@ -613,7 +406,7 @@ class Computer(object): ...@@ -613,7 +406,7 @@ class Computer(object):
@classmethod @classmethod
def load(cls, path_to_xml, reference, ipv6_interface, tap_gateway_interface, def load(cls, path_to_xml, reference, ipv6_interface, tap_gateway_interface,
instance_root=None, software_root=None, manager_list=None): instance_root=None, software_root=None, config=None):
""" """
Create a computer object from a valid xml file. Create a computer object from a valid xml file.
...@@ -637,7 +430,7 @@ class Computer(object): ...@@ -637,7 +430,7 @@ class Computer(object):
tap_gateway_interface=tap_gateway_interface, tap_gateway_interface=tap_gateway_interface,
software_root=dumped_dict.get('software_root', software_root), software_root=dumped_dict.get('software_root', software_root),
instance_root=dumped_dict.get('instance_root', instance_root), instance_root=dumped_dict.get('instance_root', instance_root),
manager_list=dumped_dict.get('manager_list', manager_list), config=config,
) )
for partition_index, partition_dict in enumerate(dumped_dict['partition_list']): for partition_index, partition_dict in enumerate(dumped_dict['partition_list']):
...@@ -760,8 +553,7 @@ class Computer(object): ...@@ -760,8 +553,7 @@ class Computer(object):
# Iterate over all managers and let them `format` the computer too # Iterate over all managers and let them `format` the computer too
for manager in self._manager_list: for manager in self._manager_list:
logger.info("Formatting computer with " + manager.__class__.__name__) manager.format(self)
manager.format()
# get list of instance external storage if exist # get list of instance external storage if exist
instance_external_list = [] instance_external_list = []
...@@ -1521,7 +1313,7 @@ def parse_computer_xml(conf, xml_path): ...@@ -1521,7 +1313,7 @@ def parse_computer_xml(conf, xml_path):
tap_gateway_interface=conf.tap_gateway_interface, tap_gateway_interface=conf.tap_gateway_interface,
software_root=conf.software_root, software_root=conf.software_root,
instance_root=conf.instance_root, instance_root=conf.instance_root,
manager_list=conf.manager_list) config=conf)
# Connect to the interface defined by the configuration # Connect to the interface defined by the configuration
computer.interface = interface computer.interface = interface
else: else:
...@@ -1537,7 +1329,7 @@ def parse_computer_xml(conf, xml_path): ...@@ -1537,7 +1329,7 @@ def parse_computer_xml(conf, xml_path):
ipv6_interface=conf.ipv6_interface, ipv6_interface=conf.ipv6_interface,
software_user=conf.software_user, software_user=conf.software_user,
tap_gateway_interface=conf.tap_gateway_interface, tap_gateway_interface=conf.tap_gateway_interface,
manager_list=conf.manager_list, config=conf,
) )
partition_amount = int(conf.partition_amount) partition_amount = int(conf.partition_amount)
...@@ -1652,7 +1444,6 @@ class FormatConfig(object): ...@@ -1652,7 +1444,6 @@ class FormatConfig(object):
tap_gateway_interface = None tap_gateway_interface = None
use_unique_local_address_block = None use_unique_local_address_block = None
instance_storage_home = None instance_storage_home = None
manager_list = None
def __init__(self, logger): def __init__(self, logger):
self.logger = logger self.logger = logger
...@@ -1740,16 +1531,6 @@ class FormatConfig(object): ...@@ -1740,16 +1531,6 @@ class FormatConfig(object):
self.logger.error(message) self.logger.error(message)
raise UsageError(message) raise UsageError(message)
# Split str into list[str] and check availability of every manager
# Config value is expected to be strings separated by spaces or commas
manager_list = []
for manager in self.manager_list.replace(",", " ").split():
if manager not in available_manager_list:
raise ValueError("Unknown manager \"{}\"! Known are: {!s}".format(
manager, list(available_manager_list.keys())))
manager_list.append(manager)
self.manager_list = manager_list # replace original str with list[str] of known managers
if not self.dry_run: if not self.dry_run:
if self.alter_user: if self.alter_user:
self.checkRequiredBinary(['groupadd', 'useradd', 'usermod', ['passwd', '-h']]) self.checkRequiredBinary(['groupadd', 'useradd', 'usermod', ['passwd', '-h']])
......
...@@ -49,6 +49,7 @@ if sys.version_info < (2, 6): ...@@ -49,6 +49,7 @@ if sys.version_info < (2, 6):
from lxml import etree from lxml import etree
from slapos import manager as slapmanager
from slapos.slap.slap import NotFoundError from slapos.slap.slap import NotFoundError
from slapos.slap.slap import ServerError from slapos.slap.slap import ServerError
from slapos.slap.slap import COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME from slapos.slap.slap import COMPUTER_PARTITION_REQUEST_LIST_TEMPLATE_FILENAME
...@@ -274,7 +275,8 @@ def create_slapgrid_object(options, logger): ...@@ -274,7 +275,8 @@ def create_slapgrid_object(options, logger):
instance_min_free_space=instance_min_free_space, instance_min_free_space=instance_min_free_space,
instance_storage_home=op.get('instance_storage_home'), instance_storage_home=op.get('instance_storage_home'),
ipv4_global_network=op.get('ipv4_global_network'), ipv4_global_network=op.get('ipv4_global_network'),
firewall_conf=op.get('firewall')) firewall_conf=op.get('firewall'),
config=options)
def check_required_only_partitions(existing, required): def check_required_only_partitions(existing, required):
...@@ -333,6 +335,7 @@ class Slapgrid(object): ...@@ -333,6 +335,7 @@ class Slapgrid(object):
instance_storage_home=None, instance_storage_home=None,
ipv4_global_network=None, ipv4_global_network=None,
firewall_conf={}, firewall_conf={},
config=None,
): ):
"""Makes easy initialisation of class parameters""" """Makes easy initialisation of class parameters"""
# Parses arguments # Parses arguments
...@@ -395,7 +398,8 @@ class Slapgrid(object): ...@@ -395,7 +398,8 @@ class Slapgrid(object):
else: else:
self.ipv4_global_network= "" self.ipv4_global_network= ""
self.firewall_conf = firewall_conf self.firewall_conf = firewall_conf
self.config = config
self._manager_list = slapmanager.from_config(config)
def _getWatchdogLine(self): def _getWatchdogLine(self):
invocation_list = [WATCHDOG_PATH] invocation_list = [WATCHDOG_PATH]
...@@ -553,6 +557,11 @@ stderr_logfile_backups=1 ...@@ -553,6 +557,11 @@ stderr_logfile_backups=1
shadir_cert_file=self.shadir_cert_file, shadir_cert_file=self.shadir_cert_file,
shadir_key_file=self.shadir_key_file, shadir_key_file=self.shadir_key_file,
software_min_free_space=self.software_min_free_space) software_min_free_space=self.software_min_free_space)
# call manager for every software release
for manager in self._manager_list:
manager.software(software)
if state == 'available': if state == 'available':
completed_tag = os.path.join(software_path, '.completed') completed_tag = os.path.join(software_path, '.completed')
if (self.develop or (not os.path.exists(completed_tag) and if (self.develop or (not os.path.exists(completed_tag) and
...@@ -1075,6 +1084,10 @@ stderr_logfile_backups=1 ...@@ -1075,6 +1084,10 @@ stderr_logfile_backups=1
partition_ip_list = parameter_dict['ip_list'] + parameter_dict.get( partition_ip_list = parameter_dict['ip_list'] + parameter_dict.get(
'full_ip_list', []) 'full_ip_list', [])
# call manager for every software release
for manager in self._manager_list:
manager.instance(partition)
if computer_partition_state == COMPUTER_PARTITION_STARTED_STATE: if computer_partition_state == COMPUTER_PARTITION_STARTED_STATE:
local_partition.install() local_partition.install()
computer_partition.available() computer_partition.available()
......
# coding: utf-8
import re
import importlib
from zope.interface import declarations
config_option = "manager_list"
def load_manager(name):
"""Load a manager from local files if it exists."""
if re.match(r'[a-zA-Z_]', name) is None:
raise ValueError("Manager name \"{!s}\" is not allowed! Must contain only letters and \"_\"".
format(name))
from slapos.manager import interface
manager_module = importlib.import_module("slapos.manager." + name)
if not hasattr(manager_module, "Manager"):
raise AttributeError("Manager class in {} has to be called \"Manager\"".format(
name))
if not interface.IManager.implementedBy(manager_module.Manager):
raise RuntimeError("Manager class in {} must zope.interface.implements \"IManager\"".format(
name))
return manager_module.Manager
def from_config(config):
"""Return list of instances of managers allowed from the config."""
name_list = config.get(config_option, "").split()
return [load_manager(name)(config) for name in name_list]
\ No newline at end of file
# coding: utf-8
import logging
import os
import os.path
from zope import interface as zope_interface
from slapos.manager import interface
logger = logging.getLogger(__name__)
class Manager(object):
"""Manage cgroup's cpuset in terms on initializing and runtime operations.
CPUSET manager moves PIDs between CPU cores using Linux cgroup system.
In order to use this feature put "cpuset" into "manager_list" into your slapos
configuration file inside [slapos] section.
TODO: there is no limit on number of reserved cores per user.
"""
zope_interface.implements(interface.IManager)
cpu_exclusive_file = ".slapos-cpu-exclusive"
cpuset_path = "/sys/fs/cgroup/cpuset/"
task_write_mode = "wt"
config_power_user_option = "power_user_list"
def __init__(self, config):
"""Retain access to dict-like configuration."""
self.config = config
def software(self, software):
"""We don't need to mingle with software."""
pass
def format(self, computer):
"""Create cgroup folder per-CPU with exclusive access to the CPU.
- Those folders are "/sys/fs/cgroup/cpuset/cpu<N>".
"""
if not os.path.exists("/sys/fs/cgroup/cpuset/cpuset.cpus"):
logger.warning("CPUSet Manager cannot format computer because cgroups do not exist.")
return
for cpu in self._cpu_id_list():
cpu_path = self._prepare_folder(
os.path.join(self.cpuset_path, "cpu" + str(cpu)))
with open(cpu_path + "/cpuset.cpus", "wt") as fx:
fx.write(str(cpu)) # this cgroup manages only this cpu
with open(cpu_path + "/cpuset.cpu_exclusive", "wt") as fx:
fx.write("1") # manages it exclusively
with open(cpu_path + "/cpuset.mems", "wt") as fx:
fx.write("0") # it doesn't work without that
def instance(self, partition):
"""Control runtime state of the computer."""
if not os.path.exists(os.path.join(self.cpuset_path, "cpu0")):
# check whether the computer was formatted
logger.warning("CGROUP's CPUSET Manager cannot update computer because it is not cpuset-formatted.")
return
request_file = os.path.join(partition.instance_path, self.cpu_exclusive_file)
if not os.path.exists(request_file) or not read_file(request_file):
# This instance does not ask for cpu exclusive access
return
# Gather list of users allowed to request exlusive cores
power_user_list = self.config.get(self.config_power_user_option, "").split()
uid, gid = partition.getUserGroupId()
uname = pwd.getpwuid(uid).pw_name
if uname not in power_user_list:
logger.warning("User {} not allowed to modify cpuset! "
"Allowed users are in {} option in config file.".format(
uname, self.config_power_user_option))
return
# prepare paths to tasks file for all and per-cpu
tasks_file = os.path.join(self.cpuset_path, "tasks")
cpu_tasks_file_list = [os.path.join(cpu_folder, "tasks")
for cpu_folder in self._cpu_folder_list()]
# Gather exclusive CPU usage map {username: set[cpu_id]}
cpu_usage = defaultdict(set)
for cpu_id in self._cpu_id_list()[1:]: # skip the first public CPU
pids = [int(pid)
for pid in read_file(cpu_tasks_file_list[cpu_id]).splitlines()]
for pid in pids:
process = psutil.Process(pid)
cpu_usage[process.username()].add(cpu_id)
# Move all PIDs from the pool of all CPUs onto the first exclusive CPU.
running_list = sorted(list(map(int, read_file(tasks_file).split())), reverse=True)
first_cpu = self._cpu_id_list()[0]
success_set, refused_set = set(), set()
for pid in running_list:
try:
self._move_task(pid, first_cpu)
success_set.add(pid)
time.sleep(0.01)
except IOError as e:
refused_set.add(pid)
logger.debug("Refused to move {:d} PIDs: {!s}\n"
"Suceeded in moving {:d} PIDs {!s}\n".format(
len(refused_set), refused_set, len(success_set), success_set))
cpu_list = self._cpu_folder_list()
generic_cpu_path = cpu_folder_list[0]
exclusive_cpu_path_list = cpu_folder_list[1:]
# Gather all running PIDs for filtering out stale PIDs
running_pid_set = set(running_list)
running_pid_set.update(map(int, read_file(cpu_tasks_file_list[0]).split()))
# gather already exclusively running PIDs
exclusive_pid_set = set()
for cpu_tasks_file in cpu_tasks_file_list[1:]:
exclusive_pid_set.update(map(int, read_content(cpu_tasks_file).split()))
# Move processes to their demanded exclusive CPUs
with open(request_file, "rt") as fi:
# take such PIDs which are either really running or are not already exclusive
request_pid_list = [int(pid) for pid in fi.read().split()
if int(pid) in running_pid_set or int(pid) not in exclusive_pid_set]
with open(request_file, "wt") as fo:
fo.write("") # empty file (we will write back only PIDs which weren't moved)
for request_pid in request_pid_list:
assigned_cpu = self._move_to_exclusive_cpu(request_pid)
if assigned_cpu < 0:
# if no exclusive CPU was assigned - write the PID back and try other time
with open(request_file, "at") as fo:
fo.write(str(request_pid) + "\n")
def _cpu_folder_list(self):
"""Return list of folders for exclusive cpu cores."""
return [os.path.join(self.cpuset_path, "cpu" + str(cpu_id))
for cpu_id in self._cpu_id_list]
def _cpu_id_list(self):
"""Extract IDs of available CPUs and return them as a list.
The first one will be always used for all non-exclusive processes.
:return: list[int]
"""
cpu_list = [] # types: list[int]
with open(self.cpuset_path + "cpuset.cpus", "rt") as cpu_def:
for cpu_def_split in cpu_def.read().strip().split(","):
# IDs can be in form "0-4" or "0,1,2,3,4"
if "-" in cpu_def_split:
a, b = map(int, cpu_def_split.split("-"))
cpu_list.extend(range(a, b + 1)) # because cgroup's range is inclusive
continue
cpu_list.append(int(cpu_def_split))
return cpu_list
def _move_to_exclusive_cpu(self, pid):
"""Try all exclusive CPUs and place the ``pid`` to the first available one.
:return: int, cpu_id of used CPU, -1 if placement was not possible
"""
exclusive_cpu_list = self._cpu_id_list()[1:]
for exclusive_cpu in exclusive_cpu_list:
# gather tasks assigned to current exclusive CPU
task_path = os.path.join(self.cpuset_path, "cpu" + str(exclusive_cpu), "tasks")
with open(task_path, "rt") as fi:
task_list = fi.read().split()
if len(task_list) > 0:
continue # skip occupied CPUs
return self._move_task(pid, exclusive_cpu)[1]
return -1
def _move_task(self, pid, cpu_id, cpu_mode="performance"):
"""Move ``pid`` to ``cpu_id``.
cpu_mode can be "performance" or "powersave"
"""
known_cpu_mode_list = ("performance", "powersave")
with open(os.path.join(self.cpuset_path, "cpu" + str(cpu_id), "tasks"), self.task_write_mode) as fo:
fo.write(str(pid) + "\n")
# set the core to `cpu_mode`
scaling_governor_file = "/sys/devices/system/cpu/cpu{:d}/cpufreq/scaling_governor".format(cpu_id)
if os.path.exists(scaling_governor_file):
if cpu_mode not in known_cpu_mode_list:
logger.warning("Cannot set CPU to mode \"{}\"! Known modes {!s}".format(
cpu_mode, known_cpu_mode_list))
else:
try:
with open(scaling_governor_file, self.task_write_mode) as fo:
fo.write(cpu_mode + "\n") # default is "powersave"
except IOError as e:
# handle permission error
logger.error("Failed to write \"{}\" to {}".format(cpu_mode, scaling_governor_file))
return pid, cpu_id
def _prepare_folder(self, folder):
"""If-Create folder and set group write permission."""
if not os.path.exists(folder):
os.mkdir(folder)
# make your life and testing easier and create mandatory files if they don't exist
mandatory_file_list = ("tasks", "cpuset.cpus")
for mandatory_file in mandatory_file_list:
file_path = os.path.join(folder, mandatory_file)
if not os.path.exists(file_path):
with open(file_path, "wb"):
pass # touche
return folder
def read_file(path, mode="rt"):
with open(path, mode) as fi:
return fi.read()
def write_file(content, path, mode="wt"):
with open(path, mode) as fo:
fo.write(content)
\ No newline at end of file
# coding: utf-8
from zope.interface import Interface
class IManager(Interface):
"""Manager is called in every step of preparation of the computer."""
def __init__(config):
"""Manager needs to know config for its functioning.
:param conf: dictionary-like object with full access to [slapos] section of the config file
"""
def format(computer):
"""Method called at `slapos node format` phase.
:param computer: slapos.format.Computer, currently formatted computer
"""
def software(software):
"""Method called at `slapos node software` phase.
:param software: slapos.grid.SlapObject.Software, currently processed software
"""
def instance(partition):
"""Method called at `slapos node instance` phase.
:param partition: slapos.grid.SlapObject.Partition, currently processed partition
"""
...@@ -30,6 +30,7 @@ import glob ...@@ -30,6 +30,7 @@ import glob
import logging import logging
import slapos.format import slapos.format
import slapos.util import slapos.util
import slapos.manager.cpuset
import unittest import unittest
import netaddr import netaddr
...@@ -41,6 +42,7 @@ import netifaces ...@@ -41,6 +42,7 @@ import netifaces
import os import os
import pwd import pwd
import time import time
import mock
USER_LIST = [] USER_LIST = []
GROUP_LIST = [] GROUP_LIST = []
...@@ -183,21 +185,6 @@ class SlaposUtilMock: ...@@ -183,21 +185,6 @@ class SlaposUtilMock:
def chownDirectory(*args, **kw): def chownDirectory(*args, **kw):
pass pass
class CGroupManagerMock(slapos.format.CGroupManager):
short_name = 'cgroup_mock'
cpuset_path = "/tmp/cpuset/"
task_write_mode = "at" # append insted of write tasks PIDs for the tests
def is_allowed(self):
"""Always allowed."""
return True
# update available managers with our partially-mocked version
slapos.format.available_manager_list[CGroupManagerMock.short_name] = CGroupManagerMock
class SlapformatMixin(unittest.TestCase): class SlapformatMixin(unittest.TestCase):
# keep big diffs # keep big diffs
maxDiff = None maxDiff = None
...@@ -668,23 +655,32 @@ class TestComputer(SlapformatMixin): ...@@ -668,23 +655,32 @@ class TestComputer(SlapformatMixin):
self.fakeCallAndRead.external_command_list) self.fakeCallAndRead.external_command_list)
class TestComputerWithCGroup(SlapformatMixin): class SlapGridPartitionMock:
def __init__(self, partition):
self.partition = partition
self.instance_path = partition.path
class TestComputerWithCPUSet(SlapformatMixin):
cpuset_path = "/tmp/cpuset/"
task_write_mode = "at" # append insted of write tasks PIDs for the tests
def setUp(self): def setUp(self):
super(TestComputerWithCGroup, self).setUp() super(TestComputerWithCPUSet, self).setUp()
self.restoreOs() self.restoreOs()
if os.path.isdir("/tmp/slapgrid/"): if os.path.isdir("/tmp/slapgrid/"):
shutil.rmtree("/tmp/slapgrid/") shutil.rmtree("/tmp/slapgrid/")
os.mkdir("/tmp/slapgrid/") os.mkdir("/tmp/slapgrid/")
if os.path.isdir(CGroupManagerMock.cpuset_path): if os.path.isdir(self.cpuset_path):
shutil.rmtree(CGroupManagerMock.cpuset_path) shutil.rmtree(self.cpuset_path)
os.mkdir(CGroupManagerMock.cpuset_path) os.mkdir(self.cpuset_path)
file_write("0,1-3", file_write("0,1-3",
os.path.join(CGroupManagerMock.cpuset_path, "cpuset.cpus")) os.path.join(self.cpuset_path, "cpuset.cpus"))
file_write("\n".join(("1000", "1001", "1002", "")), file_write("\n".join(("1000", "1001", "1002", "")),
os.path.join(CGroupManagerMock.cpuset_path, "tasks")) os.path.join(self.cpuset_path, "tasks"))
self.cpu_list = [0, 1, 2, 3] self.cpu_list = [0, 1, 2, 3]
global USER_LIST, INTERFACE_DICT global USER_LIST, INTERFACE_DICT
...@@ -695,6 +691,13 @@ class TestComputerWithCGroup(SlapformatMixin): ...@@ -695,6 +691,13 @@ class TestComputerWithCGroup(SlapformatMixin):
socket.AF_INET6: [ socket.AF_INET6: [
{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}] {'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
} }
from slapos.manager.cpuset import Manager
self.orig_cpuset_path = Manager.cpuset_path
self.orig_task_write_mode = Manager.task_write_mode
Manager.cpuset_path = self.cpuset_path
Manager.task_write_mode = self.task_write_mode
self.computer = slapos.format.Computer('computer', self.computer = slapos.format.Computer('computer',
software_user='testuser', software_user='testuser',
instance_root='/tmp/slapgrid/instance_root', instance_root='/tmp/slapgrid/instance_root',
...@@ -705,47 +708,58 @@ class TestComputerWithCGroup(SlapformatMixin): ...@@ -705,47 +708,58 @@ class TestComputerWithCGroup(SlapformatMixin):
slapos.format.Partition( slapos.format.Partition(
'partition', '/tmp/slapgrid/instance_root/part1', slapos.format.User('testuser'), [], tap=None), 'partition', '/tmp/slapgrid/instance_root/part1', slapos.format.User('testuser'), [], tap=None),
], ],
manager_list=(CGroupManagerMock.short_name, ) config={
"manager_list": "cpuset",
"power_user_list": "testuser"
}
) )
# self.patchOs(self.logger) # self.patchOs(self.logger)
def tearDown(self): def tearDown(self):
"""Cleanup temporary test folders.""" """Cleanup temporary test folders."""
super(TestComputerWithCGroup, self).tearDown()
from slapos.manager.cpuset import Manager
Manager.cpuset_path = self.orig_cpuset_path
Manager.task_write_mode = self.orig_task_write_mode
super(TestComputerWithCPUSet, self).tearDown()
shutil.rmtree("/tmp/slapgrid/") shutil.rmtree("/tmp/slapgrid/")
if CGroupManagerMock.cpuset_path.startswith("/tmp"): if self.cpuset_path.startswith("/tmp"):
shutil.rmtree(CGroupManagerMock.cpuset_path) shutil.rmtree(self.cpuset_path)
def test_positive_cgroups(self): def test_positive_cgroups(self):
"""Positive test of cgroups.""" """Positive test of cgroups."""
# Test parsing "cpuset.cpus" file # Test parsing "cpuset.cpus" file
self.assertEqual(self.computer._manager_list[0]._cpu_list(), self.cpu_list) self.assertEqual(self.computer._manager_list[0]._cpu_id_list(), self.cpu_list)
# This should created per-cpu groups and move all tasks in CPU pool into cpu0 # This should created per-cpu groups and move all tasks in CPU pool into cpu0
self.computer.format(alter_network=False, alter_user=False) self.computer.format(alter_network=False, alter_user=False)
# Test files creation for exclusive CPUs # Test files creation for exclusive CPUs
for cpu_id in self.cpu_list: for cpu_id in self.cpu_list:
cpu_n_path = os.path.join(CGroupManagerMock.cpuset_path, "cpu" + str(cpu_id)) cpu_n_path = os.path.join(self.cpuset_path, "cpu" + str(cpu_id))
self.assertEqual(str(cpu_id), file_content(os.path.join(cpu_n_path, "cpuset.cpus"))) self.assertEqual(str(cpu_id), file_content(os.path.join(cpu_n_path, "cpuset.cpus")))
self.assertEqual("1", file_content(os.path.join(cpu_n_path, "cpuset.cpu_exclusive"))) self.assertEqual("1", file_content(os.path.join(cpu_n_path, "cpuset.cpu_exclusive")))
if cpu_id > 0: if cpu_id > 0:
self.assertEqual("", file_content(os.path.join(cpu_n_path, "tasks"))) self.assertEqual("", file_content(os.path.join(cpu_n_path, "tasks")))
# Simulate slapos instance call
self.computer._manager_list[0].instance(SlapGridPartitionMock(self.computer.partition_list[0]))
# Test that format moved all PIDs from CPU pool into CPU0 # Test that format moved all PIDs from CPU pool into CPU0
tasks_at_cpu0 = file_content(os.path.join(CGroupManagerMock.cpuset_path, "cpu0", "tasks")).split() tasks_at_cpu0 = file_content(os.path.join(self.cpuset_path, "cpu0", "tasks")).split()
self.assertIn("1000", tasks_at_cpu0) self.assertIn("1000", tasks_at_cpu0)
self.assertIn("1001", tasks_at_cpu0) self.assertIn("1001", tasks_at_cpu0)
self.assertIn("1002", tasks_at_cpu0) self.assertIn("1002", tasks_at_cpu0)
# Simulate cgroup behaviour - empty tasks in the pool # Simulate cgroup behaviour - empty tasks in the pool
file_write("", os.path.join(CGroupManagerMock.cpuset_path, "tasks")) file_write("", os.path.join(self.cpuset_path, "tasks"))
# test moving tasks from generic core to private core # test moving tasks from generic core to private core
# request PID 1001 to be moved to its private CPU # request PID 1001 to be moved to its private CPU
request_file_path = os.path.join(self.computer.partition_list[0].path, request_file_path = os.path.join(self.computer.partition_list[0].path,
CGroupManagerMock.cpu_exclusive_file) self.cpu_exclusive_file)
file_write("1001\n", request_file_path) file_write("1001\n", request_file_path)
# let format do the moving # let format do the moving
self.computer.update() self.computer.update()
# test if the moving suceeded into any provate CPUS (id>0) # test if the moving suceeded into any provate CPUS (id>0)
self.assertTrue(any("1001" in file_content(exclusive_task) self.assertTrue(any("1001" in file_content(exclusive_task)
for exclusive_task in glob.glob(os.path.join(CGroupManagerMock.cpuset_path, "cpu[1-9]", "tasks")))) for exclusive_task in glob.glob(os.path.join(self.cpuset_path, "cpu[1-9]", "tasks"))))
# slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file # slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file
self.assertEqual("", file_content(request_file_path).strip()) self.assertEqual("", file_content(request_file_path).strip())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment