Commit c121940b authored by Guillaume Hervier's avatar Guillaume Hervier

portredir: Validate port redirection config values

parent e122de72
# coding: utf-8
import json
import logging
import netaddr
import os.path
from .interface import IManager
......@@ -61,7 +62,11 @@ class Manager(object):
# Read it
with open(port_redirect_file_path) as f:
try:
port_redirects = json.load(f)
except:
logger.warning('Bad port redirection config file', exc_info=True)
return
# Get partitions IPv6 address
computer_partition = partition.computer_partition
......@@ -79,22 +84,40 @@ class Manager(object):
socat_programs = []
for port_redirect in port_redirects:
redir_type = port_redirect.get('type', 'tcp')
if redir_type.lower() not in {'tcp', 'udp'}:
logger.warning('Bad source redirection type: %s', redir_type)
continue
source_port = port_redirect['srcPort']
source_addr = port_redirect.get('srcAddress')
source_is_ipv4 = source_addr is None or '.' in source_addr
try:
source_port = int(port_redirect['srcPort'])
except:
logger.warning('Bad source port provided', exc_info=True)
continue
dest_port = port_redirect['destPort']
try:
source_addr = port_redirect.get('srcAddress')
if source_addr is not None:
source_addr = netaddr.IPAddress(source_addr)
except:
logger.warning('Bad source address provided', exc_info=True)
continue
try:
dest_port = int(port_redirect['destPort'])
except:
logger.warning('Bad source port provided', exc_info=True)
continue
try:
dest_addr = port_redirect.get('destAddress', partition_ipv6)
dest_is_ipv6 = ':' in dest_addr
if dest_is_ipv6:
dest_addr = '[{}]'.format(dest_addr)
dest_addr = netaddr.IPAddress(dest_addr)
except:
logger.warning('Bad source address provided', exc_info=True)
continue
command = ['socat']
socat_source_version = 4 if source_is_ipv4 else 6
socat_source_version = source_addr.version if source_addr is not None else 4
socat_source_type = '{rtype}{version}-LISTEN'.format(rtype=redir_type.upper(), version=socat_source_version)
socat_source = '{}:{}'.format(socat_source_type, source_port)
if source_addr is not None:
......@@ -102,8 +125,7 @@ class Manager(object):
socat_source += ',fork'
command.append(socat_source)
socat_dest_version = 6 if dest_is_ipv6 else 4
socat_dest_type = '{rtype}{version}'.format(rtype=redir_type.upper(), version=socat_dest_version)
socat_dest_type = '{rtype}{version}'.format(rtype=redir_type.upper(), version=dest_addr.version)
socat_dest = '{}:{}:{}'.format(socat_dest_type, dest_addr, dest_port)
command.append(socat_dest)
......
......@@ -2832,3 +2832,80 @@ class TestSlapgridWithPortRedirection(MasterMixin, unittest.TestCase):
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-tcp-{}'.format(1234), partition_supervisord_config)
self.assertNotIn('socat TCP4-LISTEN:1234,fork TCP4:127.0.0.1:4321', partition_supervisord_config)
def test_port_redirection_config_bad_source_port(self):
with self._mock_requests():
self._setup_instance([
{
'srcPort': 'bad',
'destPort': 4321,
'destAddress': '127.0.0.1',
},
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-tcp-bad', partition_supervisord_config)
self.assertNotIn('socat TCP4-LISTEN:bad,fork TCP4:127.0.0.1:4321', partition_supervisord_config)
def test_port_redirection_config_bad_dest_port(self):
with self._mock_requests():
self._setup_instance([
{
'srcPort': 1234,
'destPort': 'wolf',
'destAddress': '127.0.0.1',
},
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-tcp-1234', partition_supervisord_config)
self.assertNotIn('socat TCP4-LISTEN:1234,fork TCP4:127.0.0.1:wolf', partition_supervisord_config)
def test_port_redirection_config_bad_source_address(self):
with self._mock_requests():
self._setup_instance([
{
'srcPort': 1234,
'srcAddress': 'bad',
'destPort': 4321,
'destAddress': '127.0.0.1',
},
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-tcp-1234', partition_supervisord_config)
self.assertNotIn('socat TCP4-LISTEN:1234,bind=bad,fork TCP4:127.0.0.1:4321', partition_supervisord_config)
def test_port_redirection_config_bad_dest_address(self):
with self._mock_requests():
self._setup_instance([
{
'srcPort': 1234,
'destPort': 4321,
'destAddress': 'wolf',
},
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-tcp-1234', partition_supervisord_config)
self.assertNotIn('socat TCP4-LISTEN:1234,fork TCP4:wolf:4321', partition_supervisord_config)
def test_port_redirection_config_bad_redir_type(self):
with self._mock_requests():
self._setup_instance([
{
'type': 'htcpcp',
'srcPort': 1234,
'destPort': 4321,
'destAddress': '127.0.0.1',
},
])
# Check the socat command
partition_supervisord_config = self._read_instance_supervisord_config()
self.assertNotIn('socat-htcpcp-1234', partition_supervisord_config)
self.assertNotIn('socat HTCPCP4-LISTEN:1234,fork HTCPCP4:127.0.0.1:4321', partition_supervisord_config)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment