Commit 594edfa5 authored by Xiaowu Zhang's avatar Xiaowu Zhang

Unittest

See merge request !38
parents 47f69a00 9ac64f16
......@@ -17,9 +17,11 @@
# Nemu. If not, see <http://www.gnu.org/licenses/>.
import re
import os
from new import function
from nemu.iproute import backticks, get_if_data, route, \
get_addr_data, get_all_route_data, interface
from nemu.interface import Switch, Interface
def _get_all_route_data():
ipdata = backticks([IP_PATH, "-o", "route", "list"]) # "table", "all"
......@@ -69,3 +71,19 @@ def _get_addr_data():
byidx, bynam = get_addr_data.orig()
return byidx, {name.split('@',1)[0]: a for name, a in bynam.iteritems()}
get_addr_data.func_code = _get_addr_data.func_code
@staticmethod
def _gen_if_name():
n = Interface._gen_next_id()
# Max 15 chars
# XXX: We truncate pid to not exceed IFNAMSIZ on systems with 32-bits pids
# but we should find something better to avoid possible collision.
return "NETNSif-%.4x%.3x" % (os.getpid() % 0xffff, n)
Interface._gen_if_name = _gen_if_name
@staticmethod
def _gen_br_name():
n = Switch._gen_next_id()
# XXX: same as for _gen_if_name
return "NETNSbr-%.4x%.3x" % (os.getpid() % 0xffff, n)
Switch._gen_br_name = _gen_br_name
*.cert
*.key
\ No newline at end of file
If you clone this repo directly, you should add re6stnet to PYTHONPATH, so
python can find module re6st
To run all the tests, you have two methods: one is by creating a new user
namespace, the other is by using root privileges.
* `unshare -Unr bash -c "mount -t sysfs /sys & python -m unittest discover"`
* `python -m unittest discover`
The mount of /sys is because nemu will read files in /sys.
When creating a new network namespace, the exiting network devices will not
disappear, and a re-mount is needed to update /sys.
If you want to only run the unit tests `python -m unittest discover` is ok,
the network tests will be skipped.
from pathlib2 import Path
DEMO_PATH = Path(__file__).resolve().parent.parent.parent / "demo"
{
"verbose": 1,
"port": 9090,
"anonymous_prefix_length": null,
"smtp_pwd": null,
"client_count": 10,
"authorized_origin": [
"127.0.0.1",
"::1"
],
"bind6": "::",
"ipv4": null,
"prefix_length": 16,
"min_protocol": 1,
"smtp_starttls": false,
"run": "run",
"bind4": "0.0.0.0",
"db": "registry.db",
"mailhost": "miku@miku.com",
"encrypt": false,
"logfile": "registry.log",
"max_clients": null,
"smtp_user": null,
"same_country": null,
"tunnel_refresh": 300,
"hello": 15,
"community": null
}
\ No newline at end of file
import json
import os
import sqlite3
import subprocess
import tempfile
import time
import unittest
import zlib
from re6st import registry, x509
from re6st.tests.test_network import re6st_wrap
from re6st.tests import tools
from re6st.tests import DEMO_PATH
DH_FILE = DEMO_PATH / "dh2048.pem"
class DummyNode(object):
"""fake node to reuse Re6stRegistry
error: node.Popen has destory method which not in subprocess.Popen
"""
def __init__(self):
self.ip = "localhost"
self.Popen = subprocess.Popen
self.pid = os.getpid()
class TestRegistryClientInteract(unittest.TestCase):
@classmethod
def setUpClass(cls):
re6st_wrap.initial()
# if running in net ns, set lo up
subprocess.check_call(("ip", "link", "set", "lo", "up"))
def setUp(self):
self.port = 18080
self.url = "http://localhost:{}/".format(self.port)
# not important, used in network_config check
self.max_clients = 10
def tearDown(self):
self.server.proc.terminate()
def test_1_main(self):
""" a client interact a server, no re6stnet node test basic function"""
self.server = re6st_wrap.Re6stRegistry(
DummyNode(),
"2001:db8:42::",
self.max_clients, port=self.port,
recreate=True)
client = registry.RegistryClient(self.url)
email = "m1@miku.com"
# simulate the process in conf
# request a token
client.requestToken(email)
# read token from db
db = sqlite3.connect(str(self.server.db), isolation_level=None)
token = None
for _ in xrange(100):
time.sleep(.1)
token = db.execute("SELECT token FROM token WHERE email=?",
(email,)).fetchone()
if token:
break
else:
self.fail("Request token failed, no token in database")
# token: tuple[unicode,]
token = str(token[0])
self.assertEqual(client.isToken(token), "1")
# request ca
ca = client.getCa()
# request a cert and get cn
key, csr = tools.generate_csr()
cert = client.requestCertificate(token, csr)
self.assertEqual(client.isToken(token), '', "token should be deleted")
# creat x509.cert object
def write_to_temp(text):
"""text: bytes"""
file = tempfile.NamedTemporaryFile()
file.write(text)
file.flush()
return file
ca, key, cert = map(write_to_temp, (ca, key, cert))
client.cert = x509.Cert(ca.name, key.name, cert.name)
ca.close()
cert.close()
# cert.decrypt use key file, close after entire test
self.addCleanup(key.close)
# verfiy cn and prefix
prefix = client.cert.prefix
cn = client.getNodePrefix(email)
self.assertEqual(tools.prefix2cn(prefix), cn)
# simulate the process in cache
# just prove works
net_config = client.getNetworkConfig(prefix)
net_config = json.loads(zlib.decompress(net_config))
self.assertEqual(net_config[u'max_clients'], self.max_clients)
# no re6stnet, empty result
bootpeer = client.getBootstrapPeer(prefix)
self.assertEqual(bootpeer, "")
# server should not die
self.assertIsNone(self.server.proc.poll())
#TODO with a registry and some node, test babel_dump related function
if __name__ == "__main__":
unittest.main()
*.log
temp_net_test/
\ No newline at end of file
clean_ruleset_interval=600
allow 1024-65535 10.0.0.0/8 1024-65535
deny 0-65535 0.0.0.0/0 0-65535
import ipaddress
import logging
import nemu
import time
import weakref
from subprocess import PIPE
from pathlib2 import Path
from re6st.tests import DEMO_PATH
fix_file = DEMO_PATH / "fixnemu.py"
# execfile(str(fix_file)) Removed in python3
exec(open(str(fix_file)).read())
IPTABLES = 'iptables-nft'
class ConnectableError(Exception):
pass
class Node(nemu.Node):
"""simple nemu.Node used for registry and nodes"""
def __init__(self):
super(Node, self).__init__()
self.Popen(('sysctl', '-q',
'net.ipv4.icmp_echo_ignore_broadcasts=0')).wait()
def _add_interface(self, iface):
self.iface = iface
iface.__dict__['node'] = weakref.proxy(self)
return super(Node, self)._add_interface(iface)
@property
def ip(self):
try:
return str(self._ip)
except AttributeError:
# return 1 ipv4 address of the one interface, reverse mode
for iface in self.get_interfaces()[::-1]:
for addr in iface.get_addresses():
addr = addr['address']
if '.' in addr:
#TODO different type problem?
self._ip = addr
return addr
def connect_switch(self, switch, ip, prefix_len=24):
self.if_s = if_s = nemu.NodeInterface(self)
switch.connect(if_s)
if_s.up = True
if_s.add_v4_address(ip, prefix_len=prefix_len)
return if_s
class NetManager(object):
"""contain all the nemu object created, so they can live more time"""
def __init__(self):
self.object = []
self.registries = {}
def connectable_test(self):
"""test each node can ping to their registry
Raise:
AssertionError
"""
for reg, nodes in self.registries.iteritems():
for node in nodes:
app0 = node.Popen(["ping", "-c", "1", reg.ip], stdout=PIPE)
ret = app0.wait()
if ret:
raise ConnectableError(
"network construct failed {} to {}".format(node.ip, reg.ip))
logging.debug("each node can ping to their registry")
def net_route():
"""build a network connect by a route(bridge)
Returns:
a network manager contain 3 nodes
"""
nm = NetManager()
switch1 = nemu.Switch()
switch1.up = True
registry = Node()
machine1 = Node()
machine2 = Node()
registry.connect_switch(switch1, "192.168.1.1")
machine1.connect_switch(switch1, "192.168.1.2")
machine2.connect_switch(switch1, "192.168.1.3")
nm.object.append(switch1)
nm.registries[registry] = [machine1, machine2]
nm.connectable_test()
return nm
def net_demo():
internet = Node()
gateway1 = Node()
gateway2 = Node()
registry = Node()
m1 = Node()
m2 = Node()
m3 = Node()
m4 = Node()
m5 = Node()
m6 = Node()
m7 = Node()
m8 = Node()
switch1 = nemu.Switch()
switch2 = nemu.Switch()
switch3 = nemu.Switch()
nm = NetManager()
nm.object = [internet, switch3, switch1, switch2, gateway1, gateway2]
nm.registries = {registry: [m1, m2, m3, m4, m5, m6, m7, m8]}
# for node in [g1, m3, m4, m5]:
# print "pid: {}".format(node.pid)
re_if_0, in_if_0 = nemu.P2PInterface.create_pair(registry, internet)
g1_if_0, in_if_1 = nemu.P2PInterface.create_pair(gateway1, internet)
g2_if_0, in_if_2 = nemu.P2PInterface.create_pair(gateway2, internet)
re_if_0.add_v4_address(address="10.0.0.2", prefix_len=24)
in_if_0.add_v4_address(address='10.0.0.1', prefix_len=24)
in_if_1.add_v4_address(address='10.1.0.1', prefix_len=24)
in_if_2.add_v4_address(address='10.2.0.1', prefix_len=24)
g1_if_0.add_v4_address(address='10.1.0.2', prefix_len=24)
g2_if_0.add_v4_address(address='10.2.0.2', prefix_len=24)
for iface in [re_if_0, in_if_0, g1_if_0, in_if_1, g2_if_0, in_if_2]:
nm.object.append(iface)
iface.up = True
ip = ipaddress.ip_address(u"10.1.1.1")
for i, node in enumerate([gateway1, m1, m2]):
iface = node.connect_switch(switch1, str(ip + i))
nm.object.append(iface)
if i: # except the first
node.add_route(nexthop=ip)
gateway1.Popen((IPTABLES, '-t', 'nat', '-A', 'POSTROUTING', '-o', g1_if_0.name, '-j', 'MASQUERADE')).wait()
gateway1.Popen((IPTABLES, '-t', 'nat', '-N', 'MINIUPNPD')).wait()
gateway1.Popen((IPTABLES, '-t', 'nat', '-A', 'PREROUTING', '-i', g1_if_0.name, '-j', 'MINIUPNPD')).wait()
gateway1.Popen((IPTABLES, '-N', 'MINIUPNPD')).wait()
ip = ipaddress.ip_address(u"10.2.1.1")
for i, node in enumerate([gateway2, m3, m4, m5]):
iface = node.connect_switch(switch1, str(ip + i))
nm.object.append(iface)
if i: # except the first
node.add_route(prefix='10.0.0.0', prefix_len=8, nexthop=ip)
ip = ipaddress.ip_address(u"10.0.1.1")
for i, node in enumerate([internet, m6, m7, m8]):
iface = node.connect_switch(switch2, str(ip + i))
nm.object.append(iface)
if i: # except the first
node.add_route(prefix='10.0.0.0', prefix_len=8, nexthop=ip)
registry.add_route(prefix='10.0.0.0', prefix_len=8, nexthop='10.0.0.1')
gateway1.add_route(prefix='10.0.0.0', prefix_len=8, nexthop='10.1.0.1')
gateway2.add_route(prefix='10.0.0.0', prefix_len=8, nexthop='10.2.0.1')
internet.add_route(prefix='10.2.0.0', prefix_len=16, nexthop='10.2.0.2')
MINIUPnP_CONF = Path(__file__).parent / 'miniupnpd.conf'
gateway1.proc = gateway1.Popen(['miniupnpd', '-d', '-f', MINIUPnP_CONF,
'-P', 'miniupnpd.pid', '-a', gateway1.if_s.name,
'-i', g1_if_0.name],
stdout=PIPE, stderr=PIPE)
switch1.up = switch2.up = switch3.up = True
nm.connectable_test()
return nm
def network_direct():
"""one server and one client connect direct"""
registry = Node()
m0 = Node()
nm = NetManager()
nm.registries = {registry: [m0]}
re_if_0, m_if_0 = nemu.P2PInterface.create_pair(registry, m0)
registry._ip = u"10.1.2.1"
re_if_0.add_v4_address(u"10.1.2.1", prefix_len=24)
m_if_0.add_v4_address(u"10.1.2.2", prefix_len=24)
re_if_0.up = m_if_0.up = True
nm.connectable_test()
return nm
if __name__ == "__main__":
nm = net_demo()
time.sleep(1000000)
# -*- coding: utf-8 -*-
'''
Script launched on machines from the demo with the option -p/--ping
It uses Multiping to ping several IPs passed as arguments.
After Re6st is stable, this script logs when it does not get response from a
machine in a csv file stored in the directory of the machine in this format:
time, sequence number, number of non-responding machines, ip of these machines
'''
import argparse, errno, socket, time, sys
from multiping import MultiPing
PING_INTERVAL = 10
PING_TIMEOUT = 4
class MultiPing(MultiPing):
# Patch of Multiping because it stays blocked to ipv4
# emission when we want to ping only ipv6 addresses.
# So we only keep the ipv6 part for the demo.
# Bug issued: https://github.com/romana/multi-ping/issues/22
def _read_all_from_socket(self, timeout):
pkts = []
if self._ipv6_address_present:
try:
self._sock6.settimeout(timeout)
while True:
p = self._sock6.recv(128)
pkts.append((bytearray(p), time.time()))
self._sock6.settimeout(0)
except socket.timeout:
pass
except socket.error as e:
if e.errno != errno.EWOULDBLOCK:
raise
return pkts
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-a', nargs = '+', help = 'the list of addresses to ping')
parser.add_argument('--retry', action='store_true', help='retry ping unitl success')
args = parser.parse_args()
addrs = args.a
retry = args.retry
no_responses = "1"
while retry and no_responses:
mp = MultiPing(addrs)
mp.send()
_, no_responses = mp.receive(PING_TIMEOUT)
sys.stdout.write(" ".join(no_responses))
if __name__ == '__main__':
main()
"""wrap the deploy of re6st node, ease the creation of cert
file and run of the node
"""
import errno
import ipaddress
import json
import logging
import re
import shutil
import sqlite3
import tempfile
import time
import weakref
from subprocess import PIPE
from pathlib2 import Path
from re6st.tests import tools
from re6st.tests import DEMO_PATH
WORK_DIR = Path(__file__).parent / "temp_net_test"
DH_FILE = DEMO_PATH / "dh2048.pem"
RE6STNET = "python -m re6st.cli.node"
RE6ST_REGISTRY = "python -m re6st.cli.registry"
RE6ST_CONF = "python -m re6st.cli.conf"
def initial():
"""create the workplace"""
if not WORK_DIR.exists():
WORK_DIR.mkdir()
def ip_to_serial(ip6):
"""convert ipv6 address to serial"""
ip6 = ipaddress.IPv6Address(u"{}".format(ip6))
ip6 = "1{:x}".format(int(ip6)).rstrip('0')
return int(ip6, 16)
class Re6stRegistry(object):
"""class run a re6st-registry service on a namespace"""
registry_seq = 0
def __init__(self, node, ip6, client_number, port=80, recreate=False):
self.node = node
# TODO need set once
self.ip = node.ip
self.ip6 = ip6
self.client_number = client_number
self.port = port
self.name = self.generate_name()
self.path = WORK_DIR / self.name
self.ca_key = self.path / "ca.key"
# because re6st-conf will create ca.crt so use another name
self.ca_crt = self.path / "ca.cert"
self.log = self.path / "registry.log"
self.db = self.path / "registry.db"
self.run_path = tempfile.mkdtemp()
if recreate and self.path.exists():
shutil.rmtree(str(self.path))
if not self.path.exists():
self.create_registry()
# use hash to identify the registry
with self.ca_key.open() as f:
text = f.read()
self.ident = hash(text)
self.clean()
self.run()
# wait the servcice started
p = self.node.Popen(['python', '-c', """if 1:
import socket, time
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True:
try:
s.connect(('localhost', {}))
break
except socket.error:
time.sleep(.1)
""".format(self.port)])
now = time.time()
while time.time() - now < 10:
if p.poll() != None:
break
time.sleep(0.1)
else:
logging.error("registry failed to start, %s", self.name)
p.destroy()
raise Exception("registry failed to start")
logging.info("re6st service started")
@classmethod
def generate_name(cls):
cls.registry_seq += 1
return "registry_{}".format(cls.registry_seq)
@property
def url(self):
return "http://{ip}/".format(ip=self.ip)
def create_registry(self):
self.path.mkdir()
tools.create_ca_file(str(self.ca_key), str(self.ca_crt),
serial=ip_to_serial(self.ip6))
def run(self):
cmd = ['--ca', self.ca_crt, '--key', self.ca_key, '--dh', DH_FILE,
'--ipv4', '10.42.0.0/16', '8', '--logfile', self.log, '--db', self.db,
'--run', self.run_path, '--hello', '4', '--mailhost', 's', '-v4',
'--client-count', (self.client_number+1)//2, '--port', self.port]
#PY3: convert PosixPath to str, can be remove in Python 3
cmd = map(str, cmd)
cmd[:0] = RE6ST_REGISTRY.split()
logging.debug("run registry %s at ns: %s with cmd: %s",
self.name, self.node.pid, " ".join(cmd))
self.proc = self.node.Popen(cmd, stdout=PIPE, stderr=PIPE)
def clean(self):
"""remove the file created last time"""
try:
self.log.unlink()
except OSError as e:
if e.errno != errno.ENOENT:
raise
def __del__(self):
try:
logging.debug("teminate process %s", self.proc.pid)
self.proc.destroy()
except:
pass
class Re6stNode(object):
"""class run a re6stnet service on a namespace"""
node_seq = 0
def __init__(self, node, registry, name=None, recreate=False):
"""
node: nemu node
name: name for res6st node
"""
self.name = name or self.generate_name()
self.node = node
self.registry = weakref.proxy(registry)
self.path = WORK_DIR / self.name
self.email = self.name + "@example.com"
if self.name == self.registry.name:
self.run_path = self.registry.run_path
else:
self.run_path = tempfile.mkdtemp()
self.log = self.path / "re6stnet.log"
self.crt = self.path / "cert.crt"
self.key = self.path / 'cert.key'
self.console = self.run_path + "/console.sock"
self.data_file = self.path / "data.json" # contain data for restart node
# condition, node of the registry
if self.name == self.registry.name:
self.ip6 = self.registry.ip6
if not self.crt.exists():
self.create_node()
else:
# if ca file changed, we need recreate node file
if self.data_file.exists():
with self.data_file.open() as f:
data = json.load(f)
self.ip6 = data.get("ip6")
recreate = data.get('hash') != self.registry.ident
else:
recreate = True
if recreate and self.path.exists():
shutil.rmtree(str(self.path))
if not self.path.exists():
self.path.mkdir()
self.create_node()
logging.debug("%s's subnet is %s", self.name, self.ip6)
self.clean()
def __repr__(self):
return self.name
@classmethod
def generate_name(cls):
cls.node_seq += 1
return "node_{}".format(cls.node_seq)
def create_node(self):
"""create necessary file for node"""
logging.info("create dir of node %s", self.name)
cmd = ["--registry", self.registry.url, '--email', self.email]
cmd[:0] = RE6ST_CONF.split()
p = self.node.Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE,
cwd=str(self.path))
# read token
db = sqlite3.connect(str(self.registry.db), isolation_level=None)
token = None
for _ in xrange(100):
time.sleep(.1)
token = db.execute("SELECT token FROM token WHERE email=?",
(self.email,)).fetchone()
if token:
break
else:
p.destroy()
raise Exception("can't connect to the Register")
out, _ = p.communicate(str(token[0]))
# logging.debug("re6st-conf output: {}".format(out))
# find the ipv6 subnet of node
self.ip6 = re.search('(?<=subnet: )[0-9:a-z]+', out).group(0)
data = {'ip6': self.ip6, 'hash': self.registry.ident}
with open(str(self.data_file), 'w') as f:
json.dump(data, f)
logging.info("create dir of node %s finish", self.name)
def run(self, *args):
"""execute re6stnet"""
cmd = ['--log', self.path, '--run', self.run_path, '--state', self.path,
'--dh', DH_FILE, '--ca', self.registry.ca_crt, '--cert', self.crt,
'--key', self.key, '-v4', '--registry', self.registry.url,
'--console', self.console]
#PY3: same as for Re6stRegistry.run
cmd = map(str, cmd)
cmd[:0] = RE6STNET.split()
cmd += args
logging.debug("run node %s at ns: %s with cmd: %s",
self.name, self.node.pid, " ".join(cmd))
# if len(args) > 4 :
# self.proc = self.node.Popen(cmd)
# else:
self.proc = self.node.Popen(cmd, stdout=PIPE, stderr=PIPE)
def clean(self):
"""remove the file created last time"""
for name in ["re6stnet.log", "babeld.state", "cache.db", "babeld.log"]:
f = self.path / name
try:
f.unlink()
except OSError as e:
if e.errno != errno.ENOENT:
raise
def stop(self):
"""stop running re6stnet process"""
logging.debug("%s teminate process %s", self.name, self.proc.pid)
self.proc.destroy()
def __del__(self):
"""teminate process and rm temp dir"""
try:
self.stop()
except Exception as e:
logging.warning("%s: %s", self.name, e)
# re6stnet seems auto clean the tempdir
# try:
# shutil.rmtree(self.run_path)
# except Exception as e:
# logging.error("{}: {}".format(self.name, e))
"""contain ping-test for re6set net"""
import os
import unittest
import time
import psutil
import logging
import random
from pathlib2 import Path
import network_build
import re6st_wrap
PING_PATH = str(Path(__file__).parent.resolve() / "ping.py")
def deploy_re6st(nm, recreate=False):
net = nm.registries
nodes = []
registries = []
re6st_wrap.Re6stRegistry.registry_seq = 0
re6st_wrap.Re6stNode.node_seq = 0
for registry in net:
reg = re6st_wrap.Re6stRegistry(registry, "2001:db8:42::", len(net[registry]),
recreate=recreate)
reg_node = re6st_wrap.Re6stNode(registry, reg, name=reg.name)
registries.append(reg)
reg_node.run("--gateway", "--disable-proto", "none", "--ip", registry.ip)
nodes.append(reg_node)
for m in net[registry]:
node = re6st_wrap.Re6stNode(m, reg)
node.run("-i" + m.iface.name)
nodes.append(node)
return nodes, registries
def wait_stable(nodes, timeout=240):
"""try use ping6 from each node to the other until ping success to all the
other nodes
Args:
timeout: int, the time for wait
return:
True if success
"""
logging.info("wait all node stable, timeout: %s", timeout)
now = time.time()
ips = {node.ip6: node.name for node in nodes}
# start the ping processs
for node in nodes:
sub_ips = set(ips) - {node.ip6}
node.ping_proc = node.node.Popen(
["python", PING_PATH, '--retry', '-a'] + list(sub_ips))
# check all the node network can ping each other, in order reverse
unfinished = list(nodes)
while unfinished:
for i in xrange(len(unfinished)-1, -1, -1):
node = unfinished[i]
if node.ping_proc.poll() is not None:
logging.debug("%s 's network is stable", node.name)
unfinished.pop(i)
time.sleep(0.5)
if time.time() - now > timeout:
for node in unfinished:
node.ping_proc.destroy()
logging.warning("%s can't ping to all the nodes", unfinished)
return False
logging.info("wait time cost: %s", time.time() - now)
return True
@unittest.skipIf(os.geteuid(), "Using root or creating a user namespace")
class TestNet(unittest.TestCase):
""" network test case"""
@classmethod
def setUpClass(cls):
"""create work dir"""
logging.basicConfig(level=logging.INFO)
re6st_wrap.initial()
@classmethod
def tearDownClass(cls):
"""watch any process leaked after tests"""
logging.basicConfig(level=logging.WARNING)
for p in psutil.Process().children():
logging.debug("unterminate ps, name: %s, pid: %s, status: %s, cmd: %s",
p.name(), p.pid, p.status(), p.cmdline())
p.terminate()
# try:
# p.kill()
# except:
# pass
def test_ping_router(self):
"""create a network in a net segment, test the connectivity by ping
"""
nm = network_build.net_route()
nodes, _ = deploy_re6st(nm)
wait_stable(nodes, 40)
time.sleep(10)
self.assertTrue(wait_stable(nodes, 30), " ping test failed")
@unittest.skip("usually failed due to UPnP problem")
def test_reboot_one_machine(self):
"""create a network demo, wait the net stable, reboot on machine,
then test if network recover, this test seems always failed
"""
nm = network_build.net_demo()
nodes, _ = deploy_re6st(nm)
wait_stable(nodes, 100)
# stop on machine randomly
index = int(random.random() * 7) + 1
machine = nodes[index]
machine.stop()
time.sleep(5)
machine.run("-i" + machine.node.iface.name)
logging.info("restart %s", machine.name)
self.assertTrue(wait_stable(nodes, 400), "network can't recover")
def test_reboot_one_machine_router(self):
"""create a network router, wait the net stable, reboot on machine,
then test if network recover,
"""
nm = network_build.net_route()
nodes, _ = deploy_re6st(nm)
wait_stable(nodes, 40)
# stop on machine randomly
index = int(random.random() * 2) + 1
machine = nodes[index]
machine.stop()
time.sleep(5)
machine.run("-i" + machine.node.iface.name)
logging.info("restart %s", machine.name)
self.assertTrue(wait_stable(nodes, 100), "network can't recover")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG, filename='test.log', filemode='w',
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%I:%M:%S')
unittest.main(verbosity=3)
"""Re6st unittest module
"""
\ No newline at end of file
#!/usr/bin/python2
""" unit test for re6st-conf
"""
import os
import sys
import unittest
from shutil import rmtree
from StringIO import StringIO
from mock import patch
from OpenSSL import crypto
from re6st.cli import conf
from re6st.tests.tools import generate_cert, serial2prefix, create_ca_file
# gloable value from conf.py
conf_path = 're6stnet.conf'
ca_path = 'ca.crt'
cert_path = 'cert.crt'
key_path = 'cert.key'
# TODO test for is needed
class TestConf(unittest.TestCase):
""" Unit test case for re6st-conf"""
@classmethod
def setUpClass(cls):
# because conf will change directory
cls.origin_dir = os.getcwd()
cls.work_dir = "temp"
if not os.path.exists(cls.work_dir):
os.makedirs(cls.work_dir)
# mocked server cert and pkey
cls.pkey, cls.cert = create_ca_file(os.devnull, os.devnull)
cls.fingerprint = "".join( cls.cert.digest("sha1").split(":"))
# client.getCa should return a string form cert
cls.cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cls.cert)
cls.command = "re6st-conf --registry http://localhost/" \
" --dir %s" % cls.work_dir
cls.serial = 0
cls.stdout = sys.stdout
cls.null = open(os.devnull, 'w')
sys.stdout = cls.null
@classmethod
def tearDownClass(cls):
# remove work directory
rmtree(cls.work_dir)
cls.null.close()
sys.stdout = cls.stdout
def setUp(self):
patcher = patch("re6st.registry.RegistryClient")
self.addCleanup(patcher.stop)
self.client = patcher.start()()
self.client.getCa.return_value = self.cert
prefix = serial2prefix(self.serial)
self.client.requestCertificate.side_effect = \
lambda _, req: generate_cert(self.cert, self.pkey, req, prefix, self.serial)
self.serial += 1
def tearDown(self):
# go back to original dir
os.chdir(self.origin_dir)
@patch("__builtin__.raw_input")
def test_basic(self, mock_raw_input):
""" go through all the step
getCa, requestToken, requestCertificate
"""
mail = "example@email.com"
token = "a_token"
mock_raw_input.side_effect = [mail, token]
command = self.command \
+ " --fingerprint sha1:%s" % self.fingerprint \
+ " --req L lille"
sys.argv = command.split()
conf.main()
self.client.requestToken.assert_called_once_with(mail)
self.assertEqual(self.client.requestCertificate.call_args[0][0],
token)
# created file part
self.assertTrue(os.path.exists(ca_path))
self.assertTrue(os.path.exists(key_path))
self.assertTrue(os.path.exists(cert_path))
self.assertTrue(os.path.exists(conf_path))
def test_fingerprint_mismatch(self):
""" wrong fingerprint with same size,
"""
command = self.command \
+ " --fingerprint sha1:a1861330f1299b98b529fa52c3d8e5d1a94dc000"
sys.argv = command.split()
with self.assertRaises(SystemExit) as e:
conf.main()
self.assertIn("fingerprint doesn't match", str(e.exception))
def test_ca_only(self):
""" only create ca file and exit
"""
command = self.command + " --ca-only"
sys.argv = command.split()
with self.assertRaises(SystemExit):
conf.main()
self.assertTrue(os.path.exists(ca_path))
def test_anonymous(self):
""" with args anonymous, so script will use '' as token
"""
command = self.command + " --anonymous"
sys.argv = command.split()
conf.main()
self.assertEqual(self.client.requestCertificate.call_args[0][0],
'')
def test_anonymous_failed(self):
""" with args anonymous and token, so script will failed
"""
command = self.command + " --anonymous" \
+ " --token a"
sys.argv = command.split()
text = StringIO()
old_err = sys.stderr
sys.stderr = text
with self.assertRaises(SystemExit):
conf.main()
# check the error message
self.assertIn("anonymous conflicts", text.getvalue())
sys.stderr = old_err
def test_req_reserved(self):
""" with args req, but contain reserved value
"""
command = self.command + " --req CN 1111"
sys.argv = command.split()
with self.assertRaises(SystemExit) as e:
conf.main()
self.assertIn("CN field", str(e.exception))
def test_get_null_cert(self):
""" simulate fake token, and get null cert
"""
command = self.command + " --token a"
sys.argv = command.split()
self.client.requestCertificate.side_effect = "",
with self.assertRaises(SystemExit) as e:
conf.main()
self.assertIn("invalid or expired token", str(e.exception))
if __name__ == "__main__":
unittest.main()
This diff is collapsed.
import sys
import os
import unittest
import hmac
import httplib
import base64
import hashlib
from mock import Mock, patch
from re6st import registry
class TestRegistryClient(unittest.TestCase):
@classmethod
def setUpClass(cls):
server_url = "http://10.0.0.2/"
cls.client = registry.RegistryClient(server_url)
cls.client._conn = Mock()
def test_init(self):
url1 = "https://localhost/example/"
url2 = "http://10.0.0.2/"
client1 = registry.RegistryClient(url1)
client2 = registry.RegistryClient(url2)
self.assertEqual(client1._path, "/example")
self.assertEqual(client1._conn.host, "localhost")
self.assertIsInstance(client1._conn, httplib.HTTPSConnection)
self.assertIsInstance(client2._conn, httplib.HTTPConnection)
def test_rpc_hello(self):
prefix = "0000000011111111"
protocol = "7"
body = "a_hmac_key"
query = "/hello?client_prefix=0000000011111111&protocol=7"
response = fakeResponse(body, httplib.OK)
self.client._conn.getresponse.return_value = response
res = self.client.hello(prefix, protocol)
self.assertEqual(res, body)
conn = self.client._conn
conn.putrequest.assert_called_once_with('GET', query, skip_accept_encoding=1)
conn.close.assert_not_called()
conn.endheaders.assert_called_once()
def test_rpc_with_cn(self):
query = "/getNetworkConfig?cn=0000000011111111"
cn = "0000000011111111"
# hmac part
self.client._hmac = None
self.client.hello = Mock(return_value = "aaabbb")
self.client.cert = Mock()
key = "this_is_a_key"
self.client.cert.decrypt.return_value = key
h = hmac.HMAC(key, query, hashlib.sha1).digest()
key = hashlib.sha1(key).digest()
# response part
body = None
response = fakeResponse(body, httplib.NO_CONTENT)
response.msg = dict(Re6stHMAC=hmac.HMAC(key, body, hashlib.sha1).digest())
self.client._conn.getresponse.return_value = response
res = self.client.getNetworkConfig(cn)
self.client.cert.verify.assert_called_once_with("bbb", "aaa")
self.assertEqual(self.client._hmac, hashlib.sha1(key).digest())
conn = self.client._conn
conn.putheader.assert_called_with("Re6stHMAC", base64.b64encode(h))
conn.close.assert_called_once()
self.assertEqual(res, body)
class fakeResponse:
def __init__(self, body, status, reason = None):
self.body = body
self.status = status
self.reason = reason
def read(self):
return self.body
if __name__ == "__main__":
unittest.main()
#!/usr/bin/python2
import os
import sys
import unittest
import time
from mock import patch, Mock
from re6st import tunnel
from re6st import x509
from re6st import cache
from re6st.tests import tools
class testBaseTunnelManager(unittest.TestCase):
@classmethod
def setUpClass(cls):
ca_key, ca = tools.create_ca_file("ca.key", "ca.cert")
tools.create_cert_file("node.key", "node.cert", ca, ca_key, "00000001", 1)
cls.cert = x509.Cert("ca.cert", "node.key", "node.cert")
cls.control_socket = "babeld.sock"
def setUp(self):
patcher = patch("re6st.cache.Cache")
pacher_sock = patch("socket.socket")
self.addCleanup(patcher.stop)
self.addCleanup(pacher_sock.stop)
self.cache = patcher.start()()
self.sock = pacher_sock.start()
self.cache.same_country = False
address = [(2, [('10.0.0.2', '1194', 'udp'), ('10.0.0.2', '1194', 'tcp')])]
self.tunnel = tunnel.BaseTunnelManager(self.control_socket,
self.cache, self.cert, None, address)
def tearDown(self):
self.tunnel.close()
del self.tunnel
#TODO selectTimeout in contain callback, removing, update
@patch("re6st.tunnel.BaseTunnelManager.selectTimeout")
def test_invalidatePeers(self, selectTimeout):
"""normal case, stop_date: p2 < now < p1 < p3
expect:
_peers -> [p1, p3]
next = p1.stoptime
"""
p1 = x509.Peer("00")
p2 = x509.Peer("01")
p3 = x509.Peer("10")
p1.stop_date = time.time() + 1000
p2.stop_date = 1
p3.stop_date = p1.stop_date + 500
self.tunnel._peers = [p1, p2, p3]
self.tunnel.invalidatePeers()
self.assertEqual(self.tunnel._peers, [p1, p3])
selectTimeout.assert_called_once_with(p1.stop_date, self.tunnel.invalidatePeers)
# Because _makeTunnel is defined in sub class of BaseTunnelManager, so i comment
# the follow test
# @patch("re6st.tunnel.BaseTunnelManager._makeTunnel", create=True)
# def test_processPacket_address_with_msg_peer(self, makeTunnel):
# """code is 1, peer and msg not none """
# c = chr(1)
# msg = "address"
# peer = x509.Peer("000001")
# self.tunnel._connecting = {peer}
# self.tunnel._processPacket(c + msg, peer)
# self.cache.addPeer.assert_called_once_with(peer, msg)
# self.assertFalse(self.tunnel._connecting)
# makeTunnel.assert_called_once_with(peer, msg)
def test_processPacket_address(self):
"""code is 1, for address. And peer or msg are none"""
c = chr(1)
self.tunnel._address = {1: "1,1", 2: "2,2"}
res = self.tunnel._processPacket(c)
self.assertEqual(res, "1,1;2,2")
def test_processPacket_address_with_peer(self):
"""code is 1, peer is not none, msg is none
in my opion, this function return address in form address,port,portocl
and each address join by ;
it will truncate address which has more than 3 element
"""
c = chr(1)
peer = x509.Peer("000001")
peer.protocol = 1
self.tunnel._peers.append(peer)
self.tunnel._address = {1: "1,1,1;0,0,0", 2: "2,2,2,2"}
res = self.tunnel._processPacket(c, peer)
self.assertEqual(res, "1,1,1;0,0,0;2,2,2")
@patch("re6st.x509.Cert.verifyVersion", Mock(return_value=True))
@patch("re6st.tunnel.BaseTunnelManager.selectTimeout")
def test_processPacket_version(self, selectTimeout):
"""code is 0, for network version, peer is not none
2 case, one modify the version, one not
"""
c = chr(0)
peer = x509.Peer("000001")
version1 = "00003"
version2 = "00007"
self.tunnel._version = version3 = "00005"
self.tunnel._peers.append(peer)
res = self.tunnel._processPacket(c + version1, peer)
self.tunnel._processPacket(c + version2, peer)
self.assertEqual(res, version3)
self.assertEqual(self.tunnel._version, version2)
self.assertEqual(peer.version, version2)
self.assertEqual(selectTimeout.call_args[0][1], self.tunnel.newVersion)
if __name__ == "__main__":
unittest.main()
#!/usr/bin/python2
import os
import sys
import unittest
from mock import patch
from re6st import tunnel
class testMultGatewayManager(unittest.TestCase):
def setUp(self):
self.manager = tunnel.MultiGatewayManager(lambda x:x+x)
patcher = patch("subprocess.check_call")
self.addCleanup(patcher.stop)
self.sub = patcher.start()
@patch("logging.trace", create=True)
def test_add(self, log_trace):
"""add new dest twice"""
dest = "dest"
self.manager.add(dest, True)
self.manager.add(dest, True)
self.assertEqual(self.manager[dest][1], 1)
self.sub.assert_called_once()
cmd = log_trace.call_args[0][1]
self.assertIn(dest+dest, cmd)
self.assertIn("add", cmd)
def test_add_null_route(self):
""" add two dest which don't call ip route"""
dest1 = "dest1"
dest2 = ""
self.manager.add(dest1, False)
self.manager.add(dest2, True)
self.sub.assert_not_called()
@patch("logging.trace", create=True)
def test_remove(self, log_trace):
"remove a dest twice"
dest = "dest"
gw = "gw"
self.manager[dest] = [gw,1]
self.manager.remove(dest)
self.assertEqual(self.manager[dest][1], 0)
self.manager.remove(dest)
self.sub.assert_called_once()
self.assertIsNone(self.manager.get(dest))
cmd = log_trace.call_args[0][1]
self.assertIn(gw, cmd)
self.assertIn("del", cmd)
def test_remove_null_gw(self):
""" remove a dest which don't have gw"""
dest = "dest"
gw = ""
self.manager[dest] = [gw, 0]
self.manager.remove(dest)
self.assertIsNone(self.manager.get(dest))
self.sub.assert_not_called()
if __name__ == "__main__":
unittest.main()
\ No newline at end of file
import sys
import os
import time
import subprocess
from OpenSSL import crypto
from re6st import registry
def generate_csr():
"""generate a certificate request
return:
crypto.Pekey and crypto.X509Req both in pem format
"""
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
req = crypto.X509Req()
req.set_pubkey(key)
req.get_subject().CN = "test ca"
req.sign(key, 'sha256')
csr = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
return pkey, csr
def generate_cert(ca, ca_key, csr, prefix, serial, not_after=None):
"""generate a certificate
return
crypto.X509Cert in pem format
"""
if type(ca) is str:
ca = crypto.load_certificate(crypto.FILETYPE_PEM, ca)
if type(ca_key) is str:
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, ca_key)
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
cert = crypto.X509()
cert.gmtime_adj_notBefore(0)
if not_after:
cert.set_notAfter(
time.strftime("%Y%m%d%H%M%SZ", time.gmtime(not_after)))
else:
cert.gmtime_adj_notAfter(registry.RegistryServer.cert_duration)
subject = req.get_subject()
if prefix:
subject.CN = prefix2cn(prefix)
cert.set_subject(req.get_subject())
cert.set_issuer(ca.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.set_serial_number(serial)
cert.sign(ca_key, 'sha512')
return crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
def create_cert_file(pkey_file, cert_file, ca, ca_key, prefix, serial):
pkey, csr = generate_csr()
cert = generate_cert(ca, ca_key, csr, prefix, serial)
with open(pkey_file, 'w') as f:
f.write(pkey)
with open(cert_file, 'w') as f:
f.write(cert)
return pkey, cert
def create_ca_file(pkey_file, cert_file, serial=0x120010db80042):
"""create key and ca file with specify name
return key, cert in pem format """
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
cert = crypto.X509()
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(registry.RegistryServer.cert_duration)
subject= cert.get_subject()
subject.C = "FR"
subject.ST = "Lille"
subject.L = "Lille"
subject.O = "nexedi"
subject.CN = "TEST-CA"
cert.set_issuer(cert.get_subject())
cert.set_serial_number(serial)
cert.set_pubkey(key)
cert.sign(key, "sha512")
with open(pkey_file, 'w') as pkey_file:
pkey_file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
with open(cert_file, 'w') as cert_file:
cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
return key, cert
def prefix2cn(prefix):
return "%u/%u" % (int(prefix, 2), len(prefix))
def serial2prefix(serial):
return bin(serial)[2:].rjust(16, '0')
# pkey: private key
def decrypt(pkey, incontent):
with open("node.key", 'w') as f:
f.write(pkey)
args = "openssl rsautl -decrypt -inkey node.key".split()
p = subprocess.Popen(
args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
outcontent, err = p.communicate(incontent)
return outcontent
......@@ -95,6 +95,7 @@ setup(
extras_require = {
'geoip': ['geoip2'],
'multicast': ['PyYAML'],
'test': ['mock', 'pathlib2', 'nemu', 'multiping']
},
#dependency_links = [
# "http://miniupnp.free.fr/files/download.php?file=miniupnpc-1.7.20120714.tar.gz#egg=miniupnpc-1.7",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment