# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Vifib SARL and Contributors.
# All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################
from __future__ import print_function

import glob
import logging
import slapos.format
import slapos.util
import slapos.manager.cpuset
import unittest
from slapos.cli.format import FormatCommand
from slapos.cli.entry import SlapOSApp
from argparse import Namespace

import netaddr
import shutil
import socket
# for mocking
import grp
import netifaces
import os
import pwd
import time
import mock

from .test_slapgrid import DummyManager

import six

USER_LIST = []
GROUP_LIST = []
INTERFACE_DICT = {}


def file_content(file_path):
  """Read file(s) content."""
  if isinstance(file_path, (list, tuple)):
    return [file_content(fx) for fx in file_path]
  with open(file_path, "rt") as fi:
    return fi.read().strip()


def file_write(stuff, file_path):
  """Write stuff into file_path."""
  with open(file_path, "wt") as fo:
    fo.write(stuff)


class FakeConfig:
  pass


class TestLoggerHandler(logging.Handler):
  def __init__(self, *args, **kwargs):
    self.bucket = []
    logging.Handler.__init__(self, *args, **kwargs)

  def emit(self, record):
    self.bucket.append(record.msg)


class FakeCallAndRead:
  def __init__(self):
    self.external_command_list = []

  def __call__(self, argument_list, raise_on_error=True):
    retval = 0, 'UP'
    global INTERFACE_DICT
    if 'useradd' in argument_list:
      print(argument_list)
      global USER_LIST
      username = argument_list[-1]
      if username == '-r':
        username = argument_list[-2]
      USER_LIST.append(username)
    elif 'groupadd' in argument_list:
      global GROUP_LIST
      GROUP_LIST.append(argument_list[-1])
    elif argument_list[:3] == ['ip', 'addr', 'add']:
      ip, interface = argument_list[3], argument_list[5]
      if ':' not in ip:
        netmask = netaddr.strategy.ipv4.int_to_str(
          netaddr.strategy.ipv4.prefix_to_netmask[int(ip.split('/')[1])])
        ip = ip.split('/')[0]
        INTERFACE_DICT[interface][socket.AF_INET].append({'addr': ip, 'netmask': netmask})
      else:
        netmask = netaddr.strategy.ipv6.int_to_str(
          netaddr.strategy.ipv6.prefix_to_netmask[int(ip.split('/')[1])])
        ip = ip.split('/')[0]
        INTERFACE_DICT[interface][socket.AF_INET6].append({'addr': ip, 'netmask': netmask})
      # stabilise by mangling ip to just ip string
      argument_list[3] = 'ip/%s' % netmask
    elif argument_list[:3] == ['ip', 'addr', 'list'] or \
         argument_list[:4] == ['ip', '-6', 'addr', 'list']:
      retval = 0, str(INTERFACE_DICT)
    elif argument_list[:3] == ['ip', 'route', 'show']:
      retval = 0, 'OK'
    elif argument_list[:3] == ['ip', '-6', 'route']:
      retval = 0, 'OK'
      ip = argument_list[4]
      if '/' in ip:
        netmask = int(ip.split('/')[1])
        argument_list[4] = 'ip/%s' % netmask
      else:
        argument_list[4] = 'ip'
      if len(argument_list) > 7:
        argument_list[8] = 'gateway'
    elif argument_list[:3] == ['route', 'add', '-host']:
      retval = 0, 'OK'
    self.external_command_list.append(' '.join(argument_list))
    return retval


class LoggableWrapper:
  def __init__(self, logger, name):
    self.__logger = logger
    self.__name = name

  def __call__(self, *args, **kwargs):
    arg_list = [repr(x) for x in args] + [
      '%s=%r' % (x, y) for x, y in six.iteritems(kwargs)]
    self.__logger.debug('%s(%s)' % (self.__name, ', '.join(arg_list)))


class TimeMock:
  @classmethod
  def sleep(self, seconds):
    return


class GrpMock:
  @classmethod
  def getgrnam(self, name):
    global GROUP_LIST
    if name in GROUP_LIST:
      return True
    raise KeyError


class PwdMock:
  @classmethod
  def getpwnam(self, name):
    global USER_LIST
    if name in USER_LIST:
      class PwdResult:
        def __init__(self, name):
          self.pw_name = name
          self.pw_uid = self.pw_gid = USER_LIST.index(name)

        def __getitem__(self, index):
          if index == 0:
            return self.pw_name
          if index == 2:
            return self.pw_uid
          if index == 3:
            return self.pw_gid
      return PwdResult(name)
    raise KeyError("User \"{}\" not in global USER_LIST {!s}".format(name, USER_LIST))


class NetifacesMock:
  @classmethod
  def ifaddresses(self, name):
    global INTERFACE_DICT
    if name in INTERFACE_DICT:
      return INTERFACE_DICT[name]
    raise ValueError("Interface \"{}\" not in INTERFACE_DICT {!s}".format(
      name, INTERFACE_DICT))

  @classmethod
  def interfaces(self):
    global INTERFACE_DICT
    return INTERFACE_DICT.keys()

class SlaposUtilMock:
  @classmethod
  def chownDirectory(*args, **kw):
    pass

class SlapformatMixin(unittest.TestCase):
  # keep big diffs
  maxDiff = None

  def patchNetifaces(self):
    self.netifaces = NetifacesMock()
    self.saved_netifaces = {}
    for fake in vars(NetifacesMock):
      if fake.startswith("__"):
        continue
      self.saved_netifaces[fake] = getattr(netifaces, fake, None)
      setattr(netifaces, fake, getattr(self.netifaces, fake))

  def restoreNetifaces(self):
    for name, original_value in self.saved_netifaces.items():
      setattr(netifaces, name, original_value)
    del self.saved_netifaces

  def patchPwd(self):
    self.saved_pwd = {}
    for fake in vars(PwdMock):
      if fake.startswith("__"):
        continue
      self.saved_pwd[fake] = getattr(pwd, fake, None)
      setattr(pwd, fake, getattr(PwdMock, fake))

  def restorePwd(self):
    for name, original_value in self.saved_pwd.items():
      setattr(pwd, name, original_value)
    del self.saved_pwd

  def patchTime(self):
    self.saved_time = {}
    for fake in vars(TimeMock):
      if fake.startswith("__"):
        continue
      self.saved_time[fake] = getattr(time, fake, None)
      setattr(time, fake, getattr(TimeMock, fake))

  def restoreTime(self):
    for name, original_value in self.saved_time.items():
      setattr(time, name, original_value)
    del self.saved_time

  def patchGrp(self):
    self.saved_grp = {}
    for fake in vars(GrpMock):
      if fake.startswith("__"):
        continue
      self.saved_grp[fake] = getattr(grp, fake, None)
      setattr(grp, fake, getattr(GrpMock, fake))

  def restoreGrp(self):
    for name, original_value in self.saved_grp.items():
      setattr(grp, name, original_value)
    del self.saved_grp

  def patchOs(self, logger):
    self.saved_os = {}
    for fake in ['mkdir', 'chown', 'chmod', 'makedirs']:
      self.saved_os[fake] = getattr(os, fake, None)
      f = LoggableWrapper(logger, fake)
      setattr(os, fake, f)

  def restoreOs(self):
    if not hasattr(self, 'saved_os'):
      return  # os was never patched or already restored
    for name, original_value in self.saved_os.items():
      setattr(os, name, original_value)
    del self.saved_os

  def patchSlaposUtil(self):
    self.saved_slapos_util = {}
    for fake in ['chownDirectory']:
      self.saved_slapos_util[fake] = getattr(slapos.util, fake, None)
      setattr(slapos.util, fake, getattr(SlaposUtilMock, fake))

  def restoreSlaposUtil(self):
    for name, original_value in self.saved_slapos_util.items():
      setattr(slapos.util, name, original_value)
    del self.saved_slapos_util

  def setUp(self):
    config = FakeConfig()
    config.dry_run = True
    config.verbose = True
    logger = logging.getLogger('testcatch')
    logger.setLevel(logging.DEBUG)
    self.test_result = TestLoggerHandler()
    logger.addHandler(self.test_result)
    config.logger = logger
    if hasattr(self, "logger"):
      raise ValueError("{} already has logger attached".format(self.__class__.__name__))
    self.logger = logger
    self.partition = slapos.format.Partition('partition', '/part_path',
      slapos.format.User('testuser'), [], None)
    global USER_LIST
    USER_LIST = []
    global GROUP_LIST
    GROUP_LIST = []
    global INTERFACE_DICT
    INTERFACE_DICT = {}

    self.real_callAndRead = slapos.format.callAndRead
    self.fakeCallAndRead = FakeCallAndRead()
    slapos.format.callAndRead = self.fakeCallAndRead
    self.patchOs(logger)
    self.patchGrp()
    self.patchTime()
    self.patchPwd()
    self.patchNetifaces()
    self.patchSlaposUtil()

  def tearDown(self):
    self.restoreOs()
    self.restoreGrp()
    self.restoreTime()
    self.restorePwd()
    self.restoreNetifaces()
    self.restoreSlaposUtil()
    slapos.format.callAndRead = self.real_callAndRead


class TestComputer(SlapformatMixin):
  def test_getAddress_empty_computer(self):
    computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='software_root')
    self.assertEqual(computer.getAddress(), {'netmask': None, 'addr': None})

  @unittest.skip("Not implemented")
  def test_construct_empty(self):
    computer = slapos.format.Computer('computer', instance_root='/instance_root', software_root='software_root')
    computer.format()

  @unittest.skip("Not implemented")
  def test_construct_empty_prepared(self):
    computer = slapos.format.Computer('computer',
      instance_root='/instance_root',
      software_root='/software_root',
      interface=slapos.format.Interface(
        logger=self.logger, name='lo', ipv4_local_network='127.0.0.1/16'),
      partition_list=[])
    computer.format()
    self.assertEqual([
      "makedirs('/instance_root', 493)",
      "makedirs('/software_root', 493)",
      "chown('/software_root', 0, 0)",
      "chmod('/software_root', 493)"],
      self.test_result.bucket)
    self.assertEqual([
      'ip addr list lo',
      'groupadd slapsoft',
      'useradd -d /software_root -g slapsoft slapsoft -r'
      ],
      self.fakeCallAndRead.external_command_list)

  def test_construct_empty_prepared_no_alter_user(self):
    computer = slapos.format.Computer('computer',
      instance_root='/instance_root',
      software_root='/software_root',
      interface=slapos.format.Interface(
        logger=self.logger, name='lo', ipv4_local_network='127.0.0.1/16'),
      partition_list=[])
    computer.format(alter_user=False)
    self.assertEqual([
      "makedirs('/instance_root', 493)",
      "makedirs('/software_root', 493)",
      "chmod('/software_root', 493)"],
      self.test_result.bucket)
    self.assertEqual([],
      self.fakeCallAndRead.external_command_list)

  @unittest.skip("Not implemented")
  def test_construct_empty_prepared_no_alter_network(self):
    computer = slapos.format.Computer('computer',
      instance_root='/instance_root',
      software_root='/software_root',
      interface=slapos.format.Interface(
        logger=self.logger, name='lo', ipv4_local_network='127.0.0.1/16'),
      partition_list=[])
    self.assertEqual([
      "makedirs('/instance_root', 493)",
      "makedirs('/software_root', 493)",
      "chown('/software_root', 0, 0)",
      "chmod('/software_root', 493)"],
      self.test_result.bucket)
    self.assertEqual([
      'ip addr list lo',
      'groupadd slapsoft',
      'useradd -d /software_root -g slapsoft slapsoft -r'
      ],
      self.fakeCallAndRead.external_command_list)

  def test_construct_empty_prepared_no_alter_network_user(self):
    computer = slapos.format.Computer('computer',
      instance_root='/instance_root',
      software_root='/software_root',
      interface=slapos.format.Interface(
        logger=self.logger, name='lo', ipv4_local_network='127.0.0.1/16'),
      partition_list=[])
    computer.format(alter_network=False, alter_user=False)
    self.assertEqual([
      "makedirs('/instance_root', 493)",
      "makedirs('/software_root', 493)",
      "chmod('/software_root', 493)"],
      self.test_result.bucket)
    self.assertEqual([],
      self.fakeCallAndRead.external_command_list)

  @unittest.skip("Not implemented")
  def test_construct_prepared(self):
    computer = slapos.format.Computer('computer',
      instance_root='/instance_root',
      software_root='/software_root',
      interface=slapos.format.Interface(
        logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'),
      partition_list=[
          slapos.format.Partition(
            'partition', '/part_path', slapos.format.User('testuser'), [], tap=slapos.format.Tap('tap')),
        ])
    global INTERFACE_DICT
    INTERFACE_DICT['myinterface'] = {
      socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
        'netmask': '255.255.255.0'}],
      socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
    }
    computer.format()
    self.assertEqual([
      "makedirs('/instance_root', 493)",
      "makedirs('/software_root', 493)",
      "chown('/software_root', 0, 0)",
      "chmod('/software_root', 493)",
      "mkdir('/instance_root/partition', 488)",
      "chown('/instance_root/partition', 0, 0)",
      "chmod('/instance_root/partition', 488)"
    ],
      self.test_result.bucket)
    self.assertEqual([
      'ip addr list myinterface',
      'groupadd slapsoft',
      'useradd -d /software_root -g slapsoft slapsoft -r',
      'groupadd testuser',
      'useradd -d /instance_root/partition -g testuser -G slapsoft testuser -r',
      'ip tuntap add dev tap mode tap user testuser',
      'ip link set tap up',
      'ip addr add ip/255.255.255.255 dev myinterface',
      'ip addr list myinterface',
      'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface',
      'ip addr list myinterface',
    ],
      self.fakeCallAndRead.external_command_list)

  def test_construct_prepared_no_alter_user(self):
    computer = slapos.format.Computer('computer',
      instance_root='/instance_root',
      software_root='/software_root',
      tap_ipv6=True,
      interface=slapos.format.Interface(
        logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'),
      partition_list=[
          slapos.format.Partition(
            'partition', '/part_path', slapos.format.User('testuser'), [], tap=slapos.format.Tap('tap')),
        ])
    global USER_LIST
    USER_LIST = ['testuser']
    global INTERFACE_DICT
    INTERFACE_DICT['myinterface'] = {
      socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
        'netmask': '255.255.255.0'}],
      socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
    }

    INTERFACE_DICT['tap'] = {
      socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
    }

    computer.format(alter_user=False)
    self.assertEqual([
      "makedirs('/instance_root', 493)",
      "makedirs('/software_root', 493)",
      "chmod('/software_root', 493)",
      "mkdir('/instance_root/partition', 488)",
      "chmod('/instance_root/partition', 488)"
    ],
      self.test_result.bucket)
    self.assertEqual([
        'ip tuntap add dev tap mode tap user testuser',
        'ip link set tap up',
        'ip addr add ip/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff dev tap',
        'ip -6 addr list tap',
        'ip route show 10.0.0.2',
        'ip route add 10.0.0.2 dev tap',
        'ip -6 route show ip',
        'ip -6 route add ip dev tap',
        'ip -6 route show ip/80',
        'ip -6 route add ip/80 dev tap via gateway',
        'ip addr add ip/255.255.255.255 dev myinterface',
        # 'ip addr list myinterface',
        'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface',
        'ip -6 addr list myinterface',
      ],
      self.fakeCallAndRead.external_command_list)

  def test_construct_prepared_tap_no_alter_user(self):
    computer = slapos.format.Computer('computer',
      instance_root='/instance_root',
      software_root='/software_root',
      tap_gateway_interface='eth1',
      tap_ipv6=True,
      interface=slapos.format.Interface(
        logger=self.logger, name='iface', ipv4_local_network='127.0.0.1/16'),
      partition_list=[
          slapos.format.Partition(
            'partition', '/part_path', slapos.format.User('testuser'), [],
            tap=slapos.format.Tap('tap')),
        ])
    global USER_LIST
    USER_LIST = ['testuser']
    global INTERFACE_DICT
    INTERFACE_DICT['iface'] = {
      socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
        'netmask': '255.255.255.0'}],
      socket.AF_INET6: [{'addr': '2a01:e35:2e27:3456:1357::e59c', 'netmask': 'ffff:ffff:ffff:ffff:ffff::'}]
    }
    INTERFACE_DICT['eth1'] = {
      socket.AF_INET: [{'addr': '10.8.0.1', 'broadcast': '10.8.0.254',
        'netmask': '255.255.255.0'}]
    }

    INTERFACE_DICT['tap'] = {
            socket.AF_INET6: [{'addr': '2a01:e35:2e27:3456:1357:7890:ffff:ffff', 'netmask': 'ffff:ffff:ffff:ffff:ffff:ffff:ffff'}]
    }

    computer.format(alter_user=False)
    self.assertEqual([
      "makedirs('/instance_root', 493)",
      "makedirs('/software_root', 493)",
      "chmod('/software_root', 493)",
      "mkdir('/instance_root/partition', 488)",
      "chmod('/instance_root/partition', 488)"
    ],
      self.test_result.bucket)
    self.assertEqual([
        'ip tuntap add dev tap mode tap user testuser',
        'ip link set tap up',
        'ip addr add ip/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff dev tap',
        'ip -6 addr list tap',
        'ip route show 10.8.0.2',
        'ip route add 10.8.0.2 dev tap',
        'ip -6 route show ip',
        'ip -6 route add ip dev tap',
        'ip -6 route show ip/96',
        'ip -6 route add ip/96 dev tap via gateway',
        'ip addr add ip/255.255.255.255 dev iface',
        'ip addr add ip/ffff:ffff:ffff:ffff:ffff:: dev iface',
        'ip -6 addr list iface'
      ],
      self.fakeCallAndRead.external_command_list)
    partition = computer.partition_list[0]
    self.assertEqual(partition.tap.ipv4_addr, '10.8.0.2')
    self.assertEqual(partition.tap.ipv4_netmask, '255.255.255.0')
    self.assertEqual(partition.tap.ipv4_gateway, '10.8.0.1')
    self.assertEqual(partition.tap.ipv4_network, '10.8.0.0')

  @unittest.skip("Not implemented")
  def test_construct_prepared_no_alter_network(self):
    computer = slapos.format.Computer('computer',
      instance_root='/instance_root',
      software_root='/software_root',
      interface=slapos.format.Interface(
        logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'),
      partition_list=[
          slapos.format.Partition(
            'partition', '/part_path', slapos.format.User('testuser'), [],
            tap=slapos.format.Tap('tap')),
        ])
    global INTERFACE_DICT
    INTERFACE_DICT['myinterface'] = {
      socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
        'netmask': '255.255.255.0'}],
      socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
    }
    computer.format(alter_network=False)
    self.assertEqual([
      "makedirs('/instance_root', 493)",
      "makedirs('/software_root', 493)",
      "chown('/software_root', 0, 0)",
      "chmod('/software_root', 493)",
      "mkdir('/instance_root/partition', 488)",
      "chown('/instance_root/partition', 0, 0)",
      "chmod('/instance_root/partition', 488)"
    ],
      self.test_result.bucket)
    self.assertEqual([
      # 'ip addr list myinterface',
      'groupadd slapsoft',
      'useradd -d /software_root -g slapsoft slapsoft -r',
      'groupadd testuser',
      'useradd -d /instance_root/partition -g testuser -G slapsoft testuser -r',
      # 'ip addr add ip/255.255.255.255 dev myinterface',
      # 'ip addr list myinterface',
      # 'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface',
      # 'ip addr list myinterface',
    ],
      self.fakeCallAndRead.external_command_list)

  def test_construct_prepared_no_alter_network_user(self):
    computer = slapos.format.Computer('computer',
      instance_root='/instance_root',
      software_root='/software_root',
      interface=slapos.format.Interface(
        logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'),
      partition_list=[
          slapos.format.Partition(
            'partition', '/part_path', slapos.format.User('testuser'), [],
            tap=slapos.format.Tap('tap')),
        ])
    global INTERFACE_DICT
    INTERFACE_DICT['myinterface'] = {
      socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
        'netmask': '255.255.255.0'}],
      socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
    }

    computer.format(alter_network=False, alter_user=False)
    self.assertEqual([
        "makedirs('/instance_root', 493)",
        "makedirs('/software_root', 493)",
        "chmod('/software_root', 493)",
        "mkdir('/instance_root/partition', 488)",
        "chmod('/instance_root/partition', 488)"
      ],
      self.test_result.bucket)
    self.assertEqual([
        'ip addr add ip/255.255.255.255 dev myinterface',
        # 'ip addr list myinterface',
        'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface',
        'ip -6 addr list myinterface',
      ],
      self.fakeCallAndRead.external_command_list)

  def test_construct_use_unique_local_address_block(self):
    """
    Test that slapformat creates a unique local address in the interface.
    """
    global USER_LIST
    USER_LIST = ['root']
    computer = slapos.format.Computer('computer',
      instance_root='/instance_root',
      software_root='/software_root',
      interface=slapos.format.Interface(
        logger=self.logger, name='myinterface', ipv4_local_network='127.0.0.1/16'),
      partition_list=[
          slapos.format.Partition(
            'partition', '/part_path', slapos.format.User('testuser'), [],
            tap=slapos.format.Tap('tap')),
        ])
    global INTERFACE_DICT
    INTERFACE_DICT['myinterface'] = {
      socket.AF_INET: [{'addr': '192.168.242.77', 'broadcast': '127.0.0.1',
        'netmask': '255.255.255.0'}],
      socket.AF_INET6: [{'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
    }

    computer.format(use_unique_local_address_block=True, alter_user=False, create_tap=False)
    self.assertEqual([
        "makedirs('/instance_root', 493)",
        "makedirs('/software_root', 493)",
        "chmod('/software_root', 493)",
        "mkdir('/instance_root/partition', 488)",
        "chmod('/instance_root/partition', 488)"
      ],
      self.test_result.bucket)
    self.assertEqual([
        'ip address add dev myinterface fd00::1/64',
        'ip addr add ip/255.255.255.255 dev myinterface',
        'ip addr add ip/ffff:ffff:ffff:ffff:: dev myinterface',
        'ip -6 addr list myinterface'
      ],
      self.fakeCallAndRead.external_command_list)


class SlapGridPartitionMock:

  def __init__(self, partition):
    self.partition = partition
    self.instance_path = partition.path

  def getUserGroupId(self):
    return (0, 0)


class TestComputerWithCPUSet(SlapformatMixin):

  cpuset_path = "/tmp/cpuset/"
  task_write_mode = "at"  # append insted of write tasks PIDs for the tests

  def setUp(self):
    logging.getLogger("slapos.manager.cpuset").addHandler(
      logging.StreamHandler())

    super(TestComputerWithCPUSet, self).setUp()
    self.restoreOs()

    if os.path.isdir("/tmp/slapgrid/"):
      shutil.rmtree("/tmp/slapgrid/")
    os.mkdir("/tmp/slapgrid/")

    if os.path.isdir(self.cpuset_path):
      shutil.rmtree(self.cpuset_path)
    os.mkdir(self.cpuset_path)
    file_write("0,1-3",
               os.path.join(self.cpuset_path, "cpuset.cpus"))
    file_write("\n".join(("1000", "1001", "1002", "")),
               os.path.join(self.cpuset_path, "tasks"))
    self.cpu_list = [0, 1, 2, 3]

    global USER_LIST, INTERFACE_DICT
    USER_LIST = ['testuser']
    INTERFACE_DICT['lo'] = {
      socket.AF_INET: [
        {'addr': '127.0.0.1', 'broadcast': '127.0.255.255', 'netmask': '255.255.0.0'}],
      socket.AF_INET6: [
        {'addr': '2a01:e35:2e27::e59c', 'netmask': 'ffff:ffff:ffff:ffff::'}]
    }

    from slapos.manager.cpuset import Manager
    self.orig_cpuset_path = Manager.cpuset_path
    self.orig_task_write_mode = Manager.task_write_mode
    Manager.cpuset_path = self.cpuset_path
    Manager.task_write_mode = self.task_write_mode

    self.computer = slapos.format.Computer('computer',
      software_user='testuser',
      instance_root='/tmp/slapgrid/instance_root',
      software_root='/tmp/slapgrid/software_root',
      interface=slapos.format.Interface(
        logger=self.logger, name='lo', ipv4_local_network='127.0.0.1/16'),
      partition_list=[
          slapos.format.Partition(
            'partition', '/tmp/slapgrid/instance_root/part1', slapos.format.User('testuser'), [], tap=None),
        ],
      config={
        "manager_list": "cpuset",
        "power_user_list": "testuser root"
      }
    )
    # self.patchOs(self.logger)

  def tearDown(self):
    """Cleanup temporary test folders."""
    from slapos.manager.cpuset import Manager
    Manager.cpuset_path = self.orig_cpuset_path
    Manager.task_write_mode = self.orig_task_write_mode

    super(TestComputerWithCPUSet, self).tearDown()
    shutil.rmtree("/tmp/slapgrid/")
    if self.cpuset_path.startswith("/tmp"):
      shutil.rmtree(self.cpuset_path)
    logging.getLogger("slapos.manager.cpuset")

  def test_positive_cgroups(self):
    """Positive test of cgroups."""
    # Test parsing "cpuset.cpus" file
    self.assertEqual(self.computer._manager_list[0]._cpu_id_list(), self.cpu_list)
    # This should created per-cpu groups and move all tasks in CPU pool into cpu0
    self.computer.format(alter_network=False, alter_user=False)
    # Test files creation for exclusive CPUs
    for cpu_id in self.cpu_list:
      cpu_n_path = os.path.join(self.cpuset_path, "cpu" + str(cpu_id))
      self.assertEqual(str(cpu_id), file_content(os.path.join(cpu_n_path, "cpuset.cpus")))
      self.assertEqual("1", file_content(os.path.join(cpu_n_path, "cpuset.cpu_exclusive")))
      if cpu_id > 0:
        self.assertEqual("", file_content(os.path.join(cpu_n_path, "tasks")))

    # Test moving tasks from generic core to private core
    # request PID 1001 to be moved to its private CPU
    request_file_path = os.path.join(self.computer.partition_list[0].path,
                                     slapos.manager.cpuset.Manager.cpu_exclusive_file)
    file_write("1001\n", request_file_path)
    # Simulate slapos instance call to perform the actual movement
    self.computer._manager_list[0].instance(
      SlapGridPartitionMock(self.computer.partition_list[0]))
    # Simulate cgroup behaviour - empty tasks in the pool
    file_write("", os.path.join(self.cpuset_path, "tasks"))
    # Test that format moved all PIDs from CPU pool into CPU0
    tasks_at_cpu0 = file_content(os.path.join(self.cpuset_path, "cpu0", "tasks")).split()
    self.assertIn("1000", tasks_at_cpu0)
    # test if the moving suceeded into any provate CPUS (id>0)
    self.assertTrue(any("1001" in file_content(exclusive_task)
                        for exclusive_task in glob.glob(os.path.join(self.cpuset_path, "cpu[1-9]", "tasks"))))
    self.assertIn("1002", tasks_at_cpu0)
    # slapformat should remove successfully moved PIDs from the .slapos-cpu-exclusive file
    self.assertEqual("", file_content(request_file_path).strip())


class TestPartition(SlapformatMixin):

  def test_createPath_no_alter_user(self):
    self.partition.createPath(False)
    self.assertEqual(
      [
        "mkdir('/part_path', 488)",
        "chmod('/part_path', 488)"
      ],
      self.test_result.bucket
    )


class TestUser(SlapformatMixin):
  def test_create(self):
    user = slapos.format.User('doesnotexistsyet')
    user.setPath('/doesnotexistsyet')
    user.create()

    self.assertEqual([
      'groupadd doesnotexistsyet',
      'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/sh '\
        'doesnotexistsyet -r',
      'passwd -l doesnotexistsyet'
    ],
      self.fakeCallAndRead.external_command_list)

  def test_create_additional_groups(self):
    user = slapos.format.User('doesnotexistsyet', ['additionalgroup1',
      'additionalgroup2'])
    user.setPath('/doesnotexistsyet')
    user.create()

    self.assertEqual([
      'groupadd doesnotexistsyet',
      'useradd -d /doesnotexistsyet -g doesnotexistsyet -s /bin/sh -G '\
        'additionalgroup1,additionalgroup2 doesnotexistsyet -r',
      'passwd -l doesnotexistsyet'
      ],
      self.fakeCallAndRead.external_command_list)

  def test_create_group_exists(self):
    global GROUP_LIST
    GROUP_LIST = ['testuser']
    user = slapos.format.User('testuser')
    user.setPath('/testuser')
    user.create()

    self.assertEqual([
      'useradd -d /testuser -g testuser -s /bin/sh testuser -r',
      'passwd -l testuser'
    ],
      self.fakeCallAndRead.external_command_list)

  def test_create_user_exists_additional_groups(self):
    global USER_LIST
    USER_LIST = ['testuser']
    user = slapos.format.User('testuser', ['additionalgroup1',
      'additionalgroup2'])
    user.setPath('/testuser')
    user.create()

    self.assertEqual([
      'groupadd testuser',
      'usermod -d /testuser -g testuser -s /bin/sh -G '\
        'additionalgroup1,additionalgroup2 testuser',
      'passwd -l testuser'
    ],
      self.fakeCallAndRead.external_command_list)

  def test_create_user_exists(self):
    global USER_LIST
    USER_LIST = ['testuser']
    user = slapos.format.User('testuser')
    user.setPath('/testuser')
    user.create()

    self.assertEqual([
      'groupadd testuser',
      'usermod -d /testuser -g testuser -s /bin/sh testuser',
      'passwd -l testuser'
    ],
      self.fakeCallAndRead.external_command_list)

  def test_create_user_group_exists(self):
    global USER_LIST
    USER_LIST = ['testuser']
    global GROUP_LIST
    GROUP_LIST = ['testuser']
    user = slapos.format.User('testuser')
    user.setPath('/testuser')
    user.create()

    self.assertEqual([
      'usermod -d /testuser -g testuser -s /bin/sh testuser',
      'passwd -l testuser'
    ],
      self.fakeCallAndRead.external_command_list)

  def test_isAvailable(self):
    global USER_LIST
    USER_LIST = ['testuser']
    user = slapos.format.User('testuser')
    self.assertTrue(user.isAvailable())

  def test_isAvailable_notAvailable(self):
    user = slapos.format.User('doesnotexistsyet')
    self.assertFalse(user.isAvailable())


class TestSlapformatManagerLifecycle(SlapformatMixin):

  def test_partition_format(self):
    computer = slapos.format.Computer('computer',
                                      instance_root='/instance_root',
                                      software_root='software_root')
    manager = DummyManager()
    computer._manager_list = [manager]

    computer.format(alter_user=False, alter_network=False)

    self.assertEqual(manager.sequence,
                     ['format', 'formatTearDown'])

class TestFormatConfig(SlapformatMixin):

  def fake_take_action(self, *args):
    app = SlapOSApp()
    format_command = FormatCommand(app, Namespace())
    parsed_args = format_command.get_parser("slapos node format fake").parse_args(args)
    configp = format_command.fetch_config(parsed_args)
    conf = slapos.format.FormatConfig(logger=self.logger)
    conf.mergeConfig(parsed_args, configp)
    conf.setConfig()
    return conf

  def test_empty_cmdline_options(self):
    conf = self.fake_take_action()
    self.assertTrue(conf.alter_network)
    self.assertTrue(conf.alter_user)

  def test_cmdline1_options(self):
    conf = self.fake_take_action(
      "--alter_network", "False", "--alter_user", "True")
    self.assertFalse(conf.alter_network)
    self.assertTrue(conf.alter_user)

  # TODO add more tests with config file

if __name__ == '__main__':
      unittest.main()