Commit c81eb3ea authored by Alain Takoudjou's avatar Alain Takoudjou

Merge branch 'tap-nobridge'

parents fc6c6ee8 658d48dd
......@@ -333,6 +333,7 @@ class Partition(object):
"""
retention_lock_delay_filename = '.slapos-retention-lock-delay'
retention_lock_date_filename = '.slapos-retention-lock-date'
partition_firewall_rules_name = '.slapos-firewalld-rules'
# XXX: we should give the url (or the "key") instead of the software_path
# then compute the path from it, like in Software.
......@@ -394,6 +395,9 @@ class Partition(object):
self.retention_lock_date_file_path = os.path.join(
self.instance_path, self.retention_lock_date_filename
)
self.firewall_rules_path = os.path.join(
self.instance_path, self.partition_firewall_rules_name
)
self.instance_min_free_space = instance_min_free_space
......
......@@ -85,6 +85,18 @@ class _formatXMLError(Exception):
pass
class FPopen(subprocess.Popen):
def __init__(self, *args, **kwargs):
kwargs['stdin'] = subprocess.PIPE
kwargs['stderr'] = subprocess.STDOUT
kwargs.setdefault('stdout', subprocess.PIPE)
kwargs.setdefault('close_fds', True)
kwargs.setdefault('shell', True)
subprocess.Popen.__init__(self, *args, **kwargs)
self.stdin.flush()
self.stdin.close()
self.stdin = None
def check_missing_parameters(options):
required = set([
'computer_id',
......@@ -656,14 +668,17 @@ stderr_logfile_backups=1
def _checkAddFirewallRules(self, partition_id, command_list, add=True):
"""
Process Firewall rules from and save rules to firewall_rules_path
"""
instance_path = os.path.join(self.instance_root, partition_id)
firewall_rules = os.path.join(instance_path, '.slapos-firewalld-rules')
firewall_rules_path = os.path.join(instance_path,
Partition.partition_firewall_rules_name)
json_list = []
reload_rules = False
if os.path.exists(firewall_rules):
with open(firewall_rules, 'r') as frules:
if os.path.exists(firewall_rules_path):
with open(firewall_rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
for command in rules_list:
......@@ -679,40 +694,29 @@ stderr_logfile_backups=1
if skip_check:
continue
# Check if this rule exists in iptables
check_cmd = command.replace('--add-rule', '--query-rule')
process = subprocess.Popen(check_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
check_result = process.communicate()[0]
self.logger.debug('%s: %s' % (check_cmd, check_result))
if check_result.strip() == 'yes':
current_cmd = command.replace('--add-rule', '--query-rule')
cmd_process = FPopen(current_cmd)
result = cmd_process.communicate()[0]
self.logger.debug('%s: %s' % (current_cmd, result))
if result.strip() == 'yes':
reload_rules = True
command = command.replace('--add-rule', '--remove-rule')
self.logger.debug(command)
cmd_process = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
current_cmd = command.replace('--add-rule', '--remove-rule')
self.logger.debug(current_cmd)
cmd_process = FPopen(current_cmd)
result = cmd_process.communicate()[0]
if cmd_process.returncode == 1:
raise Exception("Failed to remove firewalld rule %s. \n%s" % (
command, result))
if cmd_process.returncode == 1:
raise Exception("Failed to execute firewalld rule %s." % current_cmd)
if add:
for i in range(0, len(command_list)):
reload_rules = True
command = command_list.pop()
self.logger.debug(command)
cmd_process = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
result = cmd_process.communicate()[0]
cmd_process = FPopen(command)
cmd_process.communicate()[0]
if cmd_process.returncode == 1:
raise Exception("Failed to add firewalld rule %s. \n%s" % (
command, result))
raise Exception("Failed to add firewalld rule %s." % command)
json_list.append(command)
if reload_rules:
......@@ -720,20 +724,18 @@ stderr_logfile_backups=1
# XXX - need to check firewalld reload instead of restart
self.logger.info("Reloading firewall configuration...")
reload_cmd = self.firewall_conf['reload_config_cmd']
reload_process = subprocess.Popen(reload_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
reload_process = FPopen(reload_cmd)
result = reload_process.communicate()[0]
if reload_process.returncode == 1:
self.logger.error('FirewallD Reload: %s' % result)
raise Exception("Failed to load firewalld rules with command %s" % reload_cmd)
with open(firewall_rules, 'w') as frules:
with open(firewall_rules_path, 'w') as frules:
frules.write(json.dumps(json_list))
def _getFirewallRules(self, ip, ip_list, ip_type='ipv4'):
def _getFirewallAcceptRules(self, ip, hosting_ip_list, source_ip_list, ip_type='ipv4'):
"""
Generate rules for firewall based on list of IP that should have access to `ip`
"""
if ip_type not in ['ipv4', 'ipv6', 'eb']:
raise NotImplementedError("firewall-cmd has not rules with tables %s." % ip_type)
......@@ -742,6 +744,7 @@ stderr_logfile_backups=1
command = '%s --permanent --direct --add-rule %s filter' % (fw_cmd, ip_type)
cmd_list = []
ip_list = hosting_ip_list + source_ip_list
for other_ip in ip_list:
# Configure INPUT rules
......@@ -751,25 +754,85 @@ stderr_logfile_backups=1
cmd_list.append('%s FORWARD 0 -s %s -d %s -j ACCEPT' % (command,
other_ip, ip))
# Drop all other requests
cmd_list.append('%s INPUT 1000 -d %s -j DROP' % (command, ip))
cmd_list.append('%s FORWARD 1000 -d %s -j DROP' % (command, ip))
cmd_list.append('%s INPUT 900 -d %s -m state --state ESTABLISHED,RELATED -j DROP' % (
# Reject all other requests
cmd_list.append('%s INPUT 1000 -d %s -j REJECT' % (command, ip))
cmd_list.append('%s FORWARD 1000 -d %s -j REJECT' % (command, ip))
cmd_list.append('%s INPUT 900 -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (
command, ip))
cmd_list.append('%s FORWARD 900 -d %s -m state --state ESTABLISHED,RELATED -j DROP' % (
cmd_list.append('%s FORWARD 900 -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (
command, ip))
return cmd_list
def _getFirewallRejectRules(self, ip, hosting_ip_list, source_ip_list, ip_type='ipv4'):
"""
Generate rules for firewall based on list of IP that should not have access to `ip`
"""
if ip_type not in ['ipv4', 'ipv6', 'eb']:
raise NotImplementedError("firewall-cmd has not rules with tables %s." % ip_type)
fw_cmd = self.firewall_conf['firewall_cmd']
command = '%s --permanent --direct --add-rule %s filter' % (fw_cmd, ip_type)
cmd_list = []
# Accept all other requests
cmd_list.append('%s INPUT 0 -d %s -j ACCEPT' % (command, ip))
cmd_list.append('%s FORWARD 0 -d %s -j ACCEPT' % (command, ip))
# Reject all other requests from the list
for other_ip in source_ip_list:
cmd_list.append('%s INPUT 800 -s %s -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (
command, other_ip, ip))
cmd_list.append('%s FORWARD 800 -s %s -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (
command, other_ip, ip))
cmd_list.append('%s INPUT 900 -s %s -d %s -j REJECT' % (command,
other_ip, ip))
cmd_list.append('%s FORWARD 900 -s %s -d %s -j REJECT' % (command,
other_ip, ip))
# Accept on this hosting subscription
for other_ip in hosting_ip_list:
cmd_list.append('%s INPUT 1000 -s %s -d %s -j ACCEPT' % (command,
other_ip, ip))
cmd_list.append('%s FORWARD 1000 -s %s -d %s -j ACCEPT' % (command,
other_ip, ip))
return cmd_list
def _getValidIpv4FromList(self, ipv4_list, warn=False):
"""
Return the list containing only valid ipv4 or network address.
"""
valid_list = []
for ip in ipv4_list:
if not ip:
continue
the_ip = ip.split('/')[0]
if valid_ipv4(the_ip):
valid_list.append(ip)
elif warn:
self.logger.warn("IP/Network address %s is not valid. ignored.." % ip)
return valid_list
def _setupComputerPartitionFirewall(self, computer_partition, ip_list, authorized_ip_list, drop_entries=False):
def _setupComputerPartitionFirewall(self, computer_partition, ip_list, drop_entries=False):
"""
Using linux iptables, limit access to IP of this partition to all
others partitions of the same Hosting Subscription
"""
ipv4_list = []
ipv6_list = []
authorized_ipv4_list = []
authorized_ipv6_list = []
source_ipv4_list = []
source_ipv6_list = []
hosting_ipv4_list = []
hosting_ipv6_list = []
getFirewallRules = getattr(self, '_getFirewallAcceptRules')
if not drop_entries:
self.logger.info("Configuring firewall...")
add_rules = True
else:
add_rules = False
self.logger.info("Removing firewall configuration...")
for net_ip in ip_list:
iface, ip = (net_ip[0], net_ip[1])
......@@ -780,41 +843,37 @@ stderr_logfile_backups=1
elif valid_ipv6(ip):
ipv6_list.append(ip)
for iface, ip in authorized_ip_list:
hosting_ip_list = computer_partition.getFullHostingIpAddressList()
for iface, ip in hosting_ip_list:
if valid_ipv4(ip):
if not ip in ipv4_list:
authorized_ipv4_list.append(ip)
hosting_ipv4_list.append(ip)
elif valid_ipv6(ip):
if not ip in ipv6_list:
authorized_ipv6_list.append(ip)
hosting_ipv6_list.append(ip)
filter_dict = getattr(computer_partition, '_filter_dict', None)
extra_list = []
accept_ip_list = []
if filter_dict is not None:
extra_list = filter_dict.get('authorized_sources', '').split(' ')
extra_list.extend(self.firewall_conf.get('authorized_sources', []))
for ip in extra_list:
if not ip:
continue
the_ip = ip.split('/')[0]
if valid_ipv4(the_ip):
authorized_ipv4_list.append(ip)
elif valid_ipv6(the_ip):
authorized_ipv6_list.append(ip)
if filter_dict.get('fw_restricted_access', 'on') == 'off':
extra_list = filter_dict.get('fw_rejected_sources', '').split(' ')
getFirewallRules = getattr(self, '_getFirewallRejectRules')
accept_ip_list.extend(self.firewall_conf.get('authorized_sources', []))
accept_ip_list.extend(filter_dict.get('fw_authorized_sources', '').split(' '))
else:
extra_list = filter_dict.get('fw_authorized_sources', '').split(' ')
extra_list.extend(self.firewall_conf.get('authorized_sources', []))
if not drop_entries:
self.logger.info("Configuring firewall...")
add_rules = True
else:
add_rules = False
self.logger.info("Removing firewall configuration...")
source_ipv4_list = self._getValidIpv4FromList(extra_list, True)
hosting_ipv4_list.extend(self._getValidIpv4FromList(accept_ip_list, True))
# XXX - ipv6_list and source_ipv6_list ignored for the moment
for ip in ipv4_list:
cmd_list = self._getFirewallRules(ip,
authorized_ipv4_list,
ip_type='ipv4')
self._checkAddFirewallRules(computer_partition.getId(), cmd_list,
add=add_rules)
cmd_list = getFirewallRules(ip, hosting_ipv4_list,
source_ipv4_list, ip_type='ipv4')
self._checkAddFirewallRules(computer_partition.getId(),
cmd_list, add=add_rules)
def processComputerPartition(self, computer_partition):
"""
......@@ -954,7 +1013,6 @@ stderr_logfile_backups=1
if self.firewall_conf:
partition_ip_list = parameter_dict['ip_list'] + parameter_dict.get(
'full_ip_list', [])
full_hosting_ip_list = computer_partition.getFullHostingIpAddressList()
if computer_partition_state == COMPUTER_PARTITION_STARTED_STATE:
local_partition.install()
......@@ -962,8 +1020,7 @@ stderr_logfile_backups=1
local_partition.start()
if self.firewall_conf:
self._setupComputerPartitionFirewall(computer_partition,
partition_ip_list,
full_hosting_ip_list)
partition_ip_list)
self._checkPromises(computer_partition)
computer_partition.started()
elif computer_partition_state == COMPUTER_PARTITION_STOPPED_STATE:
......@@ -974,8 +1031,7 @@ stderr_logfile_backups=1
computer_partition.available()
if self.firewall_conf:
self._setupComputerPartitionFirewall(computer_partition,
partition_ip_list,
full_hosting_ip_list)
partition_ip_list)
finally:
# Instance has to be stopped even if buildout/reporting is wrong.
local_partition.stop()
......@@ -985,7 +1041,6 @@ stderr_logfile_backups=1
if self.firewall_conf:
self._setupComputerPartitionFirewall(computer_partition,
partition_ip_list,
full_hosting_ip_list,
drop_entries=True)
try:
computer_partition.stopped()
......
......@@ -449,6 +449,7 @@ class InstanceForTest(object):
if self.timestamp is not None:
partition._parameter_dict['timestamp'] = self.timestamp
self.current_partition = partition
return partition
def getSoftwareRelease(self):
......@@ -1985,69 +1986,117 @@ class TestSlapgridCPWithFirewall(MasterMixin, unittest.TestCase):
)
self.grid.firewall_conf = firewall_conf
def checkRuleFromIpSource(self, ip, cmd_list):
# XXX - rules for one ip contain 2*len(ip_address_list) rules ACCEPT and 4 rules DROP
num_rules = len(self.ip_address_list) * 2 + 4
def checkRuleFromIpSource(self, ip, accept_ip_list, cmd_list):
# XXX - rules for one ip contain 2*len(ip_address_list + accept_ip_list) rules ACCEPT and 4 rules REJECT
num_rules = len(self.ip_address_list) * 2 + len(accept_ip_list) * 2 + 4
self.assertEqual(len(cmd_list), num_rules)
base_cmd = '%s --permanent --direct --add-rule ipv4 filter' % self.grid.firewall_conf['firewall_cmd']
# Check that there is DROP rule on INPUT
rule = '%s INPUT 1000 -d %s -j DROP' % (base_cmd, ip)
# Check that there is REJECT rule on INPUT
rule = '%s INPUT 1000 -d %s -j REJECT' % (base_cmd, ip)
self.assertIn(rule, cmd_list)
# Check that there is DROP rule on FORWARD
rule = '%s FORWARD 1000 -d %s -j DROP' % (base_cmd, ip)
# Check that there is REJECT rule on FORWARD
rule = '%s FORWARD 1000 -d %s -j REJECT' % (base_cmd, ip)
self.assertIn(rule, cmd_list)
# Check that there is DROP rule on INPUT, ESTABLISHED,RELATED
rule = '%s INPUT 900 -d %s -m state --state ESTABLISHED,RELATED -j DROP' % (base_cmd, ip)
# Check that there is REJECT rule on INPUT, ESTABLISHED,RELATED
rule = '%s INPUT 900 -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (base_cmd, ip)
self.assertIn(rule, cmd_list)
# Check that there is DROP rule on FORWARD, ESTABLISHED,RELATED
rule = '%s FORWARD 900 -d %s -m state --state ESTABLISHED,RELATED -j DROP' % (base_cmd, ip)
# Check that there is REJECT rule on FORWARD, ESTABLISHED,RELATED
rule = '%s FORWARD 900 -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (base_cmd, ip)
self.assertIn(rule, cmd_list)
# Check that there is INPUT ACCEPT on ip_list
for _, other_ip in self.ip_address_list:
rule = '%s INPUT 0 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s FORWARD 0 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
# Check that there is FORWARD ACCEPT on ip_list
for _, other_ip in self.ip_address_list:
for other_ip in accept_ip_list:
rule = '%s INPUT 0 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s FORWARD 0 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
def checkRuleFromIpSourceReject(self, ip, reject_ip_list, cmd_list):
# XXX - rules for one ip contain 2 + 2*len(ip_address_list) rules ACCEPT and 4*len(reject_ip_list) rules REJECT
num_rules = 2 + (len(self.ip_address_list) * 2) + (len(reject_ip_list) * 4)
self.assertEqual(len(cmd_list), num_rules)
base_cmd = '%s --permanent --direct --add-rule ipv4 filter' % self.grid.firewall_conf['firewall_cmd']
# Check that there is ACCEPT rule on INPUT
rule = '%s INPUT 0 -d %s -j ACCEPT' % (base_cmd, ip)
self.assertIn(rule, cmd_list)
# Check that there is ACCEPT rule on FORWARD
rule = '%s FORWARD 0 -d %s -j ACCEPT' % (base_cmd, ip)
self.assertIn(rule, cmd_list)
# Check that there is INPUT/FORWARD ACCEPT on ip_list
for _, other_ip in self.ip_address_list:
rule = '%s INPUT 1000 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s FORWARD 1000 -s %s -d %s -j ACCEPT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
# Check that there is INPUT/FORWARD REJECT on ip_list
for other_ip in reject_ip_list:
rule = '%s INPUT 900 -s %s -d %s -j REJECT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s FORWARD 900 -s %s -d %s -j REJECT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s INPUT 800 -s %s -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
rule = '%s FORWARD 800 -s %s -d %s -m state --state ESTABLISHED,RELATED -j REJECT' % (base_cmd, other_ip, ip)
self.assertIn(rule, cmd_list)
def test_getFirewallRules(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
self.ip_address_list = computer.ip_address_list
ip = computer.instance_list[0].full_ip_list[0][1]
source_ip_list = ['10.32.0.15', '10.32.0.0/8']
cmd_list = self.grid._getFirewallAcceptRules(ip,
[elt[1] for elt in self.ip_address_list],
source_ip_list,
ip_type='ipv4')
self.checkRuleFromIpSource(ip, source_ip_list, cmd_list)
cmd_list = self.grid._getFirewallRules(ip,
cmd_list = self.grid._getFirewallRejectRules(ip,
[elt[1] for elt in self.ip_address_list],
source_ip_list,
ip_type='ipv4')
self.checkRuleFromIpSource(ip, cmd_list)
self.checkRuleFromIpSourceReject(ip, source_ip_list, cmd_list)
def test_checkAddFirewallRules(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
# For simulate query rule success
self.grid.firewall_conf['firewall_cmd'] = '/bin/echo "yes" #'
self.ip_address_list = computer.ip_address_list
instance = computer.instance_list[0]
ip = instance.full_ip_list[0][1]
name = computer.instance_list[0].name
cmd_list = self.grid._getFirewallRules(ip,
cmd_list = self.grid._getFirewallAcceptRules(ip,
[elt[1] for elt in self.ip_address_list],
[],
ip_type='ipv4')
self.grid._checkAddFirewallRules(name, cmd_list, add=True)
rules_path = os.path.join(instance.partition_path,
'.slapos-firewalld-rules')
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSource(ip, rules_list)
self.checkRuleFromIpSource(ip, [], rules_list)
# Remove all rules
self.grid._checkAddFirewallRules(name, cmd_list, add=False)
......@@ -2057,14 +2106,15 @@ class TestSlapgridCPWithFirewall(MasterMixin, unittest.TestCase):
# Add one more ip in the authorized list
self.ip_address_list.append(('interface1', '10.0.8.7'))
cmd_list = self.grid._getFirewallRules(ip,
cmd_list = self.grid._getFirewallAcceptRules(ip,
[elt[1] for elt in self.ip_address_list],
[],
ip_type='ipv4')
self.grid._checkAddFirewallRules(name, cmd_list, add=True)
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSource(ip, rules_list)
self.checkRuleFromIpSource(ip, [], rules_list)
def test_partition_no_firewall(self):
computer = ComputerForTest(self.software_root, self.instance_root)
......@@ -2077,84 +2127,185 @@ class TestSlapgridCPWithFirewall(MasterMixin, unittest.TestCase):
'.slapos-firewalld-rules'
)))
def test_partition_firewall_restrict(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
'.slapos-firewalld-rules'
)))
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
self.ip_address_list = computer.ip_address_list
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
ip = instance.full_ip_list[0][1]
self.checkRuleFromIpSource(ip, [], rules_list)
def test_partition_firewall(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.filter_dict = {'fw_restricted_access': 'off'}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
'.slapos-firewalld-rules'
)))
rules_path = os.path.join(instance.partition_path,
'.slapos-firewalld-rules')
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
self.ip_address_list = computer.ip_address_list
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.assertEqual(len(rules_list), len(self.ip_address_list) * 2 + 4)
ip = instance.full_ip_list[0][1]
self.checkRuleFromIpSource(ip, rules_list)
def test_partition_firewall_ipsource(self):
self.checkRuleFromIpSourceReject(ip, [], rules_list)
@unittest.skip('Always fail: instance.filter_dict can\'t change')
def test_partition_firewall_restricted_access_change(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
# For simulate query rule success
self.grid.firewall_conf['firewall_cmd'] = '/bin/echo "yes" #'
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.filter_dict = {'fw_restricted_access': 'off',
'fw_rejected_sources': '10.0.8.11'}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
'.slapos-firewalld-rules'
)))
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
self.ip_address_list = computer.ip_address_list
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
ip = instance.full_ip_list[0][1]
self.checkRuleFromIpSourceReject(ip, ['10.0.8.11'], rules_list)
instance.setFilterParameter({'fw_restricted_access': 'on',
'fw_authorized_sources': ''})
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSource(ip, [], rules_list)
def test_partition_firewall_ipsource_accept(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
source_ip = ['10.0.8.10']
self.grid.firewall_conf['authorized_sources'] = source_ip
source_ip = ['10.0.8.10', '10.0.8.11']
self.grid.firewall_conf['authorized_sources'] = [source_ip[0]]
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.filter_dict = {'fw_restricted_access': 'on',
'fw_authorized_sources': source_ip[1]}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
'.slapos-firewalld-rules'
)))
rules_path = os.path.join(instance.partition_path,
'.slapos-firewalld-rules')
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
rules_list= []
self.ip_address_list = computer.ip_address_list + [('iface', ip)
for ip in source_ip]
self.ip_address_list = computer.ip_address_list
ip = instance.full_ip_list[0][1]
base_cmd = '%s --permanent --direct --add-rule ipv4 filter' % self.grid.firewall_conf['firewall_cmd']
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.assertEqual(len(rules_list), len(self.ip_address_list) * 2 + 4)
rule_input = '%s INPUT 0 -s %s -d %s -j ACCEPT' % (base_cmd, source_ip[0], ip)
self.assertIn(rule_input, rules_list)
rule_fwd = '%s FORWARD 0 -s %s -d %s -j ACCEPT' % (base_cmd, source_ip[0], ip)
self.assertIn(rule_fwd, rules_list)
for thier_ip in source_ip:
rule_input = '%s INPUT 0 -s %s -d %s -j ACCEPT' % (base_cmd, thier_ip, ip)
self.assertIn(rule_input, rules_list)
rule_fwd = '%s FORWARD 0 -s %s -d %s -j ACCEPT' % (base_cmd, thier_ip, ip)
self.assertIn(rule_fwd, rules_list)
self.checkRuleFromIpSource(ip, rules_list)
self.checkRuleFromIpSource(ip, source_ip, rules_list)
def test_partition_firewall_ip_parameter(self):
def test_partition_firewall_ipsource_reject(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
source_ip = '10.0.8.10'
self.grid.firewall_conf['authorized_sources'] = ['10.0.8.15']
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.filter_dict = {'authorized_sources': source_ip}
instance.filter_dict = {'fw_rejected_sources': source_ip,
'fw_restricted_access': 'off'}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
'.slapos-firewalld-rules'
)))
rules_path = os.path.join(instance.partition_path,
'.slapos-firewalld-rules')
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
rules_list= []
self.ip_address_list = computer.ip_address_list + [('iface', ip)
for ip in source_ip.split(' ')]
self.ip_address_list = computer.ip_address_list
self.ip_address_list.append(('iface', '10.0.8.15'))
ip = instance.full_ip_list[0][1]
base_cmd = '%s --permanent --direct --add-rule ipv4 filter' % self.grid.firewall_conf['firewall_cmd']
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.assertEqual(len(rules_list), len(self.ip_address_list) * 2 + 4)
rule_input = '%s INPUT 0 -s %s -d %s -j ACCEPT' % (base_cmd, source_ip, ip)
self.assertIn(rule_input, rules_list)
self.checkRuleFromIpSourceReject(ip, source_ip.split(' '), rules_list)
def test_partition_firewall_ip_change(self):
computer = ComputerForTest(self.software_root, self.instance_root)
self.setFirewallConfig()
# For simulate query rule success
self.grid.firewall_conf['firewall_cmd'] = '/bin/echo "yes" #'
source_ip = ['10.0.8.10', '10.0.8.11']
self.grid.firewall_conf['authorized_sources'] = [source_ip[0]]
with httmock.HTTMock(computer.request_handler):
instance = computer.instance_list[0]
instance.filter_dict = {'fw_restricted_access': 'on',
'fw_authorized_sources': source_ip[1]}
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.assertTrue(os.path.exists(os.path.join(
instance.partition_path,
'.slapos-firewalld-rules'
)))
rules_path = os.path.join(
instance.partition_path,
slapos.grid.SlapObject.Partition.partition_firewall_rules_name
)
rules_list= []
self.ip_address_list = computer.ip_address_list
ip = instance.full_ip_list[0][1]
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSource(ip, source_ip, rules_list)
instance = computer.instance_list[0]
# XXX -- removed
#instance.filter_dict = {'fw_restricted_access': 'on',
# 'fw_authorized_sources': source_ip[0]}
rule_fwd = '%s FORWARD 0 -s %s -d %s -j ACCEPT' % (base_cmd, source_ip, ip)
self.assertIn(rule_fwd, rules_list)
self.grid.firewall_conf['authorized_sources'] = []
computer.ip_address_list.append(('route_interface1', '10.10.8.4'))
self.assertEqual(self.grid.processComputerPartitionList(), slapgrid.SLAPGRID_SUCCESS)
self.ip_address_list = computer.ip_address_list
self.checkRuleFromIpSource(ip, rules_list)
with open(rules_path, 'r') as frules:
rules_list = json.loads(frules.read())
self.checkRuleFromIpSource(ip, [source_ip[1]], rules_list)
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment