Commit 66a23eed authored by Xavier Thompson's avatar Xavier Thompson

slapformat: WIP

parent 943df8b7
##############################################################################
#
# Copyright (c) 2010, 2011, 2012 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import configparser
import ipaddress
import logging
import netifaces
import os
from netifaces import AF_INET, AF_INET6
from typing import List, Union
def do_format(conf):
# load configuration
computer = Computer(conf)
# sanity checks
computer.validate()
# format
computer.format()
# collect some environmental data
computer.update()
# send to master
computer.send()
class UsageError(Exception):
pass
class Parameters(object):
master_url: str
computer_id: str
instance_root: str
software_root: str
partition_amount: int
class Options(object):
input_definition_file: str = None
output_definition_file: str = None
computer_cache_file: str = None
key_file: str = None
cert_file: str = None
log_file: str = None
dry_run: bool = False
software_user = 'slapsoft'
partition_base_name = 'slappart'
user_base_name = 'slapuser'
interface_name: str = None
ipv6_interface: str = None
ipv4_local_network: str = None
ipv6_range: bool = True
create_tap: bool = True
tap_base_name: str = 'slaptap'
tap_ipv6: bool = True
tap_gateway_interface: str = None
create_tun: bool = False
tun_base_name: str = 'slaptun'
tun_ipv6: bool = True
tun_ipv4_network: str = '172.16.0.0/12'
class FormatConfig(Parameters, Options):
DEPRECATED = [
'bridge_name',
'no_bridge_',
'master_ca_file',
'alter_network',
'alter_user',
'computer_json',
'computer_xml',
]
CHECK_FILES = ['key_file', 'cert_file', 'input_definition_file']
NORMALIZE_PATHS = ['instance_root', 'software_root']
logger : logging.Logger
def __init__(self, logger):
self.logger = logger
def error(self, fmt, *args):
message = fmt % tuple(args)
self.logger.error(message)
raise UsageError(message)
def parse(self, name, value, t):
if not isinstance(value, str):
if not isinstance(value, t):
self.error("Option %s takes type %s, not %r", name, t.__name__, value)
return value
if t in (int,):
try:
return t(value)
except ValueError:
self.error("Option %s takes type %s, not %r", name, t.__name__, value)
if t is bool:
try:
return {'true': True, 'false': False}[value.lower()]
except KeyError:
self.error("Option %r must be 'true' or 'false', not %r", name, value)
def get(self, option):
try:
return gettatr(self, option)
except AttributeError:
self.error("Parameter %r is not defined", option)
def mergeConfig(self, args, configp):
# args (from command line) override configp (from cfg) options
for section in ('slapformat', 'slapos'):
self.__dict__.update(configp.items(section))
self.__dict__.update(args.__dict__)
def setConfig(self):
for option in self.DEPRECATED:
if option in self.__dict__:
if option == 'computer_xml':
self.error(
"Option %r is no longer supported\n"
"Use --output_definition_file to migrate existing computer_xml"
", then use the generated file as input_definition_file",
option
)
else:
self.error("Option %r is no longer supported" % option)
for option, t in Parameters.__annotations__.items():
setattr(self, option, self.parse(option, self.get(option), t))
for option, t in self.__annotations__.items():
setattr(self, option, self.parse(option, getattr(self, option), t))
for option in self.CHECK_FILES:
path = getattr(self, option)
if path is not None and not os.path.exists(path):
self.error("File %r does not exist or is not readable", path)
setattr(self, option, os.path.abspath(path))
for option in self.NORMALIZE_PATHS:
setattr(self, option, os.path.abspath(getattr(self, option)))
# XXX Check command line tools + Logs
class Computer(object):
reference : str
interface : Interface
partitions : List[Partition]
address : Union[ipaddress.IPv4Interface, ipaddress.IPv6Interface]
user : User
conf : FormatConfig
def __init__(self, conf):
self.conf = conf
self.reference = conf.computer_id
self.interface = Interface(conf)
self.address = self.interface.getComputerIPv6Addr()
self.user = User(conf.software_user, conf.software_root)
definition = None
if conf.input_definition_file:
definition = configparser.ConfigParser(interpolation=None)
definition.read(conf.input_definition_file)
if definition.has_option('computer', 'address')
address = definition.get('computer', 'address')
self.address = ipaddress.ip_interface(address)
if definition.has_option('computer', 'software_user')
user = definition.get('computer', 'software_user')
self.user = User(user, conf.software_root)
amount = conf.partition_amount
self.partitions = [Partition(i, conf, definition) for i in range(amount)]
def validate(self):
conf = self.conf
addresses = {4 : [], 6 : []}
networks = {4 : [], 6 : []}
for p in self.partitions:
addresses[4].extend(p.ipv4_list)
addresses[6]extend(p.ipv6_list)
networks[6]append(p.ipv6_range)
ipv4_addresses.sort()
ipv6_addresses.sort()
ipv4_networks.sort()
ipv6_networks.sort()
for network_list in networks.values()
for i, n in enumerate(network_list[:-1])
if n.overlaps(network_list[i + 1]):
self.conv.warning(
"Network configurations overlap"
)
def format(self):
pass
def update(self):
pass
def send(self):
pass
class Interface(object):
ipv4_interface : str
ipv6_interface : str
ipv4_network : ipaddress.IPv4Network
ipv6_network : ipaddress.IPv6Network
conf : FormatConfig
def __init__(self, conf):
self.conf = conf
self.ipv4_interface = conf.interface_name
self.ipv6_interface = conf.ipv6_interface or conf.interface_name
self.ipv4_network = self.getIPv4Network(conf.ipv4_local_network)
self.ipv6_network = self.getIPv6Network()
def getIPv4Network(self, cidr):
if cidr:
# XXX allow ipv4_local_network to be None ?
return ipaddress.IPv4Network(cidr, strict=False)
def getPartitionIPv4(self, index):
return self.ipv4_network[index + 2]
def getIPv6Network(self):
try:
addresses = netifaces.ifaddresses(self.ipv6_interface)[AF_INET6]
except KeyError:
self.conf.error(
"%s must have at least one IPv6 address assigned",
self.ipv6_interface
)
result = None
for a in addresses:
address = a['addr'].split('%')[0]
netmask = a['netmask'].split('/')[-1]
ip = ipaddress.IPv6Interface('%s/%s' % (address, netmask))
network = ip.network
if network.is_global:
if not result or network.prefixlen < result.prefixlen:
result = network
return result
def getComputerIPv6Addr(self):
network = self.ipv6_network
return ipaddress.ip_interface((network[1], network.prefixlen))
def getPartitionIPv6Addr(self, index):
network = self.ipv6_network
return ipaddress.ip_interface((network[index + 2], network.prefixlen))
def getPartitionIPv6Range(self, index):
network = self.ipv6_network
prefixlen = network.prefixlen + 16
if prefixlen > 128:
self.conf.error("IPv6 network %s is too small for IPv6 ranges", network)
bits = 128 - network.prefixlen
addr = network[(1 << (bits - 2)) + (i << (128 - prefixlen))]
return ipaddress.IPv6Network((addr, prefixlen))
class Partition(object):
reference : str
index: int
path : str
user : User
ipv4_list: List[ipaddress.IPv4Interface]
ipv6_list: List[ipaddress.IPv6Interface]
ipv6_range: ipaddress.IPv6Network
tap : Tap
tun : Tun
def __init__(self, index, computer, definition=None):
self.from_conf(index, computer)
if definition:
self.from_definition(index, computer, definition)
def from_definition(cls, index, computer, definition):
conf = computer.conf
section = 'partition_%d' % index
options = {}
if definition.has_section('default'):
options.update(definition.items('default'))
if definition.has_section(section):
options.update(definition.items(section))
if 'pathname' in options:
self.reference = options['pathname']
self.path = os.path.join(conf.instance_root, self.reference)
if 'user' in options:
self.user = User(options['user'], self.path)
if 'address' in options:
address_list = [ipaddress.ip_interface(a) for a in options['address']]
for v in (4, 6):
ip_list = [ip for ip in ip_addresses if ip.version == v]
if ip_list:
setattr(self, 'ipv%d_list' % v, ip_list)
# tap = Tap(computer_definition.get(section, 'network_interface'))
# tun = Tun.load(conf, index)
def from_conf(self, index, computer):
conf = computer.conf
self.reference = '%s%d' % (conf.partition_base_name, index)
self.path = os.path.join(conf.instance_root, self.reference)
self.user = User('%s%d' % (conf.user_base_name, index), self.path)
self.ipv4_list = [computer.interface.getPartitionIPv4(index)]
self.ipv6_list = [computer.interface.getPartitionIPv6(index)]
# XXX Tap & tun
def createPath(self):
self.path = os.path.abspath(self.path)
owner = self.user if self.user else User('root')
if not os.path.exists(self.path):
os.mkdir(self.path, 0o750)
owner_pw = pwd.getpwnam(owner.name)
os.chown(self.path, owner_pw.pw_uid, owner_pw.pw_gid)
os.chmod(self.path, 0o750)
class User(object):
name: str
path: str
groups: List[str]
SHELL = '/bin/sh'
def __init__(self, name, path, groups=None):
self.name = name
self.path = path
self.groups = groups
def create(self):
grpname = 'grp_' + self.name if sys.platform == 'cygwin' else self.name
if not self.isGroupAvailable(grpname):
callAndRead(['groupadd', grpname])
user_parameter_list = ['-d', self.path, '-g', self.name, '-s', self.SHELL]
if self.groups:
user_parameter_list.extend(['-G', ','.join(self.groups), '-a'])
user_parameter_list.append(self.name)
if self.isUserAvailable(self.name):
# if the user is already created and used we should not fail
callAndRead(['usermod'] + user_parameter_list, raise_on_error=False)
else:
user_parameter_list.append('-r')
callAndRead(['useradd'] + user_parameter_list)
# lock the password of user
callAndRead(['passwd', '-l', self.name])
@classmethod
def isGroupAvailable(cls, name):
try:
pwd.getgrnam(name)
return True
except KeyError:
return False
@classmethod
def isUserAvailable(cls, name):
try:
pwd.getpwnam(name)
return True
except KeyError:
return False
# Utilities
def callAndRead(argument_list, raise_on_error=True):
popen = subprocess.Popen(
argument_list,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
result = popen.communicate()[0]
if raise_on_error and popen.returncode != 0:
raise ValueError('Issue while invoking %r, result was:\n%s' % (
argument_list, result))
return popen.returncode, result
def tracing_monkeypatch(conf):
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment