Commit a85c8c02 authored by Julien Muchembled's avatar Julien Muchembled

Change protocol to discover addresses of peers to connect to

The previous broadcast model is replaced by a query-response one.
During normal operation, the cache of peers is not used anymore to select
peers to connect to. It now only used for bootstrapping and avoid querying
an already known address.
parent 09ec1327
...@@ -12,19 +12,3 @@ ...@@ -12,19 +12,3 @@
the --private option the --private option
- Put a section about how to build the package from the sources in the README - Put a section about how to build the package from the sources in the README
- review protocol to discover pair:
A node that that wants to establish a new tunnel just have to look at the
routing table and pick a random destination node, actually its re6nst subnet.
If its address is unknown (not in local cache), re6st queries it by sending a
UDP packet to the subnet (anycast, i.e. IP suffix is 0): the reply is also an
UDP packet containing the address string to establish a tunnel. Then, the
tunnel can be established.
For bootstrapping, no destination is known so one has to query the registry
as already implemented. The way the registry discovers node addresses should
be changed so that nodes don't need to declare themselves anymore.
The registry should look at the local peer DB cache.
This new protocol should produce less network trafic than the current one.
...@@ -3,7 +3,7 @@ import math, nemu, os, signal, socket, subprocess, sys, time ...@@ -3,7 +3,7 @@ import math, nemu, os, signal, socket, subprocess, sys, time
from collections import defaultdict from collections import defaultdict
IPTABLES = 'iptables' IPTABLES = 'iptables'
SCREEN = 'screen' SCREEN = 'screen'
VERBOSE = 3 VERBOSE = 4
# registry # registry
# |.2 # |.2
...@@ -132,16 +132,17 @@ for ip in '10.1.1.2', '10.1.1.3', '10.2.1.2', '10.2.1.3': ...@@ -132,16 +132,17 @@ for ip in '10.1.1.2', '10.1.1.3', '10.2.1.2', '10.2.1.3':
else: else:
print "Connectivity IPv4 OK!" print "Connectivity IPv4 OK!"
registry.screen('../re6stnet @registry/re6stnet.conf --ip 10.0.0.2 -v%u' % VERBOSE,
'../re6st-registry @registry/re6st-registry.conf')
gateway1.screen('miniupnpd -d -f miniupnpd.conf -P miniupnpd.pid -a 10.1.1.1' gateway1.screen('miniupnpd -d -f miniupnpd.conf -P miniupnpd.pid -a 10.1.1.1'
' -i %s' % g1_if_0_name) ' -i %s' % g1_if_0_name)
machine1.screen('../re6stnet @m1/re6stnet.conf -v%u' % VERBOSE) if 1:
machine2.screen('../re6stnet @m2/re6stnet.conf -v%u' % VERBOSE) registry.screen('../re6stnet @registry/re6stnet.conf --ip 10.0.0.2 -v%u' % VERBOSE,
machine3.screen('../re6stnet @m3/re6stnet.conf -v%u -i%s' % (VERBOSE, m3_if_0.name)) '../re6st-registry @registry/re6st-registry.conf')
machine4.screen('../re6stnet @m4/re6stnet.conf -v%u -i%s' % (VERBOSE, m4_if_0.name)) machine1.screen('../re6stnet @m1/re6stnet.conf -v%u' % VERBOSE)
machine5.screen('../re6stnet @m5/re6stnet.conf -v%u' % VERBOSE) machine2.screen('../re6stnet @m2/re6stnet.conf -v%u' % VERBOSE)
machine6.screen('../re6stnet @m6/re6stnet.conf -v%u' % VERBOSE) machine3.screen('../re6stnet @m3/re6stnet.conf -v%u -i%s' % (VERBOSE, m3_if_0.name))
machine4.screen('../re6stnet @m4/re6stnet.conf -v%u -i%s' % (VERBOSE, m4_if_0.name))
machine5.screen('../re6stnet @m5/re6stnet.conf -v%u' % VERBOSE)
machine6.screen('../re6stnet @m6/re6stnet.conf -v%u' % VERBOSE)
nodes = registry, machine1, machine2, machine3, machine4, machine5, machine6 nodes = registry, machine1, machine2, machine3, machine4, machine5, machine6
_ll = {} _ll = {}
......
PRAGMA foreign_keys=OFF; PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION; BEGIN TRANSACTION;
CREATE TABLE peers (
prefix text primary key not null,
address text not null,
date integer default (strftime('%s','now')));
CREATE TABLE token ( CREATE TABLE token (
token text primary key not null, token text primary key not null,
email text not null, email text not null,
...@@ -190,5 +186,4 @@ rlyT1Q== ...@@ -190,5 +186,4 @@ rlyT1Q==
INSERT INTO "cert" VALUES('0000000000001001',NULL,NULL); INSERT INTO "cert" VALUES('0000000000001001',NULL,NULL);
INSERT INTO "cert" VALUES('000000000000101',NULL,NULL); INSERT INTO "cert" VALUES('000000000000101',NULL,NULL);
INSERT INTO "cert" VALUES('00000000000011',NULL,NULL); INSERT INTO "cert" VALUES('00000000000011',NULL,NULL);
CREATE INDEX peers_ping ON peers(date);
COMMIT; COMMIT;
...@@ -4,7 +4,7 @@ import subprocess, time, threading, traceback, errno, logging, os, xmlrpclib ...@@ -4,7 +4,7 @@ import subprocess, time, threading, traceback, errno, logging, os, xmlrpclib
from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
from email.mime.text import MIMEText from email.mime.text import MIMEText
from OpenSSL import crypto from OpenSSL import crypto
from re6st import utils from re6st import tunnel, utils
# To generate server ca and key with serial for 2001:db8:42::/48 # To generate server ca and key with serial for 2001:db8:42::/48
# openssl req -nodes -new -x509 -key ca.key -set_serial 0x120010db80042 -days 365 -out ca.crt # openssl req -nodes -new -x509 -key ca.key -set_serial 0x120010db80042 -days 365 -out ca.crt
...@@ -64,18 +64,15 @@ class main(object): ...@@ -64,18 +64,15 @@ class main(object):
help='VPN IP of the node on which runs the registry') help='VPN IP of the node on which runs the registry')
self.config = parser.parse_args() self.config = parser.parse_args()
if not self.config.private: if self.config.private:
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
else:
logging.warning('You have declared no private address' logging.warning('You have declared no private address'
', either this is the first start, or you should' ', either this is the first start, or you should'
'check you configuration') 'check you configuration')
# Database initializing # Database initializing
self.db = sqlite3.connect(self.config.db, isolation_level=None) self.db = sqlite3.connect(self.config.db, isolation_level=None)
self.db.execute("""CREATE TABLE IF NOT EXISTS peers (
prefix text primary key not null,
address text not null,
date integer default (strftime('%s','now')))""")
self.db.execute("CREATE INDEX IF NOT EXISTS peers_ping ON peers(date)")
self.db.execute("""CREATE TABLE IF NOT EXISTS token ( self.db.execute("""CREATE TABLE IF NOT EXISTS token (
token text primary key not null, token text primary key not null,
email text not null, email text not null,
...@@ -196,40 +193,35 @@ class main(object): ...@@ -196,40 +193,35 @@ class main(object):
return crypto.dump_certificate(crypto.FILETYPE_PEM, self.ca) return crypto.dump_certificate(crypto.FILETYPE_PEM, self.ca)
def getPrivateAddress(self, handler): def getPrivateAddress(self, handler):
return 'http://[%s]:%u' % (self.config.private, self.config.port) return self.config.private
def getBootstrapPeer(self, handler, client_prefix): def getBootstrapPeer(self, handler, client_prefix):
cert, = self.db.execute("SELECT cert FROM cert WHERE prefix = ?", cert, = self.db.execute("SELECT cert FROM cert WHERE prefix = ?",
(client_prefix,)).next() (client_prefix,)).next()
try: address = self.config.private, tunnel.PORT
prefix, address = self.db.execute("""SELECT prefix, address FROM peers self.sock.sendto('\2', address)
WHERE prefix != ? ORDER BY random() LIMIT 1""", (client_prefix,)).next() peer = None
except StopIteration: while select.select([self.sock], [], [], peer is None)[0]:
logging.info('No peer to send for bootstrap') msg = self.sock.recv(1<<16)
raise if msg[0] == '\1':
try:
peer = msg[1:].split('\n')[-2]
except IndexError:
peer = ''
if peer is None:
raise EnvironmentError("Timeout while querying [%s]:%u", *address)
if not peer or peer.split()[0] == client_prefix:
raise LookupError("No bootstrap peer found")
logging.info("Sending bootstrap peer: %s", peer)
r, w = os.pipe() r, w = os.pipe()
try: try:
threading.Thread(target=os.write, args=(w, cert)).start() threading.Thread(target=os.write, args=(w, cert)).start()
p = subprocess.Popen(('openssl', 'rsautl', '-encrypt', '-certin', '-inkey', '/proc/self/fd/%u' % r), p = subprocess.Popen(('openssl', 'rsautl', '-encrypt', '-certin', '-inkey', '/proc/self/fd/%u' % r),
stdin=subprocess.PIPE, stdout=subprocess.PIPE) stdin=subprocess.PIPE, stdout=subprocess.PIPE)
logging.info("Sending bootstrap peer (%s, %s)" % (prefix, address)) return xmlrpclib.Binary(p.communicate(peer)[0])
return xmlrpclib.Binary(p.communicate('%s %s' % (prefix, address))[0])
finally: finally:
os.close(r) os.close(r)
os.close(w) os.close(w)
def declare(self, handler, address):
client_address, _, _, _ = handler.client_address
client_ip = utils.binFromIp(client_address)
if client_ip.startswith(self.network):
prefix = client_ip[len(self.network):]
prefix, = self.db.execute("SELECT prefix FROM cert WHERE prefix <= ? ORDER BY prefix DESC LIMIT 1", (prefix,)).next()
self.db.execute("INSERT OR REPLACE INTO peers (prefix, address) VALUES (?,?)", (prefix, address))
return True
else:
logging.warning("Unauthorized connection from %s which does not start with %s"
% (utils.ipFromBin(client_ip), utils.ipFromBin(self.network.ljust(128, '0'))))
return False
if __name__ == "__main__": if __name__ == "__main__":
main() main()
import logging, sqlite3, socket, subprocess, xmlrpclib, time import logging, sqlite3, socket, subprocess, xmlrpclib, time
from urllib import splittype, splithost, splitport
import utils import utils
# used = 0 : fresh node
# used = 1 : previously used peer
# used = 2 : curently in use
class PeerDB(object):
class PeerManager:
# internal ip = temp arg/attribute # internal ip = temp arg/attribute
def __init__(self, db_path, registry, key_path, refresh_time, address, def __init__(self, db_path, registry, key_path, prefix, db_size=200):
internal_ip, prefix, ip_changed, db_size):
self._refresh_time = refresh_time
self.address = address
self._internal_ip = internal_ip
self._prefix = prefix self._prefix = prefix
self.db_size = db_size self._db_size = db_size
self._registry = registry
self._key_path = key_path self._key_path = key_path
self._ip_changed = ip_changed self._proxy = xmlrpclib.ServerProxy(registry)
self.tunnel_manager = None
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
self.sock.bind(('::', 326))
self.socket_file = self.sock.makefile()
logging.info('Connecting to peers database...') logging.info('Initialize cache ...')
self._db = sqlite3.connect(db_path, isolation_level=None) self._db = sqlite3.connect(db_path, isolation_level=None)
logging.debug('Database opened') q = self._db.execute
q("PRAGMA synchronous = OFF")
logging.info('Preparing peers database...') q("PRAGMA journal_mode = MEMORY")
self._db.execute("""CREATE TABLE IF NOT EXISTS peers ( q("""CREATE TABLE IF NOT EXISTS peer (
prefix TEXT PRIMARY KEY, prefix TEXT PRIMARY KEY,
address TEXT NOT NULL, address TEXT NOT NULL)""")
used INTEGER NOT NULL DEFAULT 0, q("""CREATE TABLE IF NOT EXISTS config (
date INTEGER DEFAULT (strftime('%s', 'now')))""") name text primary key,
self._db.execute("UPDATE peers SET used = 1 WHERE used = 2") value text)""")
self._db.execute("""CREATE INDEX IF NOT EXISTS q('ATTACH DATABASE ":memory:" AS volatile')
_peers_used ON peers(used)""") q("""CREATE TABLE volatile.stat (
self._db.execute("""CREATE TABLE IF NOT EXISTS config ( peer TEXT PRIMARY KEY REFERENCES peer(prefix) ON DELETE CASCADE,
name text primary key, try INTEGER NOT NULL DEFAULT 0)""")
value text)""") q("CREATE INDEX volatile.stat_try ON stat(try)")
self._db.execute('ATTACH DATABASE ":memory:" AS blacklist') q("INSERT INTO volatile.stat (peer) SELECT prefix FROM peer")
self._db.execute("""CREATE TABLE blacklist.flag (
prefix TEXT PRIMARY KEY,
flag INTEGER NOT NULL)""")
self._db.execute("""CREATE INDEX blacklist.blacklist_flag
ON flag(flag)""")
self._db.execute("INSERT INTO blacklist.flag VALUES (?,?)", (prefix, 1))
try: try:
a, = self._db.execute("SELECT value FROM config WHERE name='registry'").next() a = q("SELECT value FROM config WHERE name='registry'").next()[0]
except StopIteration: except StopIteration:
proxy = xmlrpclib.ServerProxy(registry) logging.info("Private IP of registry not in cache."
" Asking registry via its public IP ...")
retry = 1 retry = 1
while True: while True:
try: try:
a = proxy.getPrivateAddress() a = self._proxy.getPrivateAddress()
break break
except socket.error, e: except socket.error, e:
logging.warning(e) logging.warning(e)
time.sleep(retry) time.sleep(retry)
retry = min(60, retry * 2) retry = min(60, retry * 2)
self._db.execute("INSERT INTO config VALUES ('registry',?)", (a,)) q("INSERT INTO config VALUES ('registry',?)", (a,))
self._proxy = xmlrpclib.ServerProxy(a) self.registry_ip = utils.binFromIp(a)
logging.debug('Database prepared') logging.info("Cache initialized. Registry IP is %s", a)
self.next_refresh = time.time() def log(self):
if logging.getLogger().isEnabledFor(5):
def clear_blacklist(self, flag): logging.trace("Cache:")
logging.info('Clearing blacklist from flag %u' % flag) for prefix, address, _try in self._db.execute(
self._db.execute("DELETE FROM blacklist.flag WHERE flag = ?", "SELECT peer.*, try FROM peer, volatile.stat"
(flag,)) " WHERE prefix=peer ORDER BY prefix"):
logging.info('Blacklist cleared') logging.trace("- %s: %s%s", prefix, address,
' (blacklisted)' if _try else '')
def blacklist(self, prefix, flag):
logging.info('Blacklisting %s' % prefix) def connecting(self, prefix, connecting):
self._db.execute("DELETE FROM peers WHERE prefix = ?", (prefix,)) self._db.execute("UPDATE volatile.stat SET try=? WHERE peer=?",
self._db.execute("INSERT OR REPLACE INTO blacklist.flag VALUES (?,?)", (connecting, prefix))
(prefix, flag))
logging.debug('%s blacklisted' % prefix) def resetConnecting(self):
self._db.execute("UPDATE volatile.stat SET try=0")
def whitelist(self, prefix):
logging.info('Unblacklisting %s' % prefix) def getAddress(self, prefix):
self._db.execute("DELETE FROM blacklist.flag WHERE prefix = ?", (prefix,)) r = self._db.execute("SELECT address FROM peer, volatile.stat"
logging.debug('%s whitelisted' % prefix) " WHERE prefix=? AND prefix=peer AND try=0",
(prefix,)).fetchone()
def refresh(self): return r and r[0]
logging.info('Refreshing the peers DB...')
try: def getPeerList(self, failed=0):
self.next_refresh = time.time() + 30 # Exclude our own address from results in case it is there, which may
self._declare() # happen if a node change its certificate without clearing the cache.
except socket.error, e: # IOW, one should probably always put our own address there.
logging.info('Connection to server failed, re-bootstraping and retrying in 30s') return self._db.execute(
try: "SELECT prefix, address FROM peer, volatile.stat"
self._bootstrap() " WHERE prefix=peer AND prefix!=? AND try=? ORDER BY RANDOM()",
except socket.error, e: (self._prefix, failed))
logging.debug('socket.error : %s' % e)
def getBootstrapPeer(self):
def _declare(self):
if self.address != None:
logging.info('Sending connection info to server...')
self._proxy.declare(utils.address_str(self.address))
self.next_refresh = time.time() + self._refresh_time
logging.debug('Info sent')
else:
logging.warning("Could not send ip, unknown external config. retrying in 30s")
def getUnusedPeers(self, peer_count):
for populate in self._bootstrap, bool:
peer_list = self._db.execute("""SELECT prefix, address FROM peers WHERE used
<> 2 ORDER BY used ASC, RANDOM() LIMIT ?""",
(peer_count,)).fetchall()
if peer_list:
return peer_list
populate()
logging.warning('Can not find any new peer')
return []
def _bootstrap(self):
logging.info('Getting Boot peer...') logging.info('Getting Boot peer...')
proxy = xmlrpclib.ServerProxy(self._registry)
try: try:
bootpeer = proxy.getBootstrapPeer(self._prefix).data bootpeer = self._proxy.getBootstrapPeer(self._prefix).data
logging.debug('Boot peer received from server') except (socket.error, xmlrpclib.Fault), e:
logging.warning('Failed to bootstrap (%s)', e)
else:
p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path), p = subprocess.Popen(('openssl', 'rsautl', '-decrypt', '-inkey', self._key_path),
stdin=subprocess.PIPE, stdout=subprocess.PIPE) stdin=subprocess.PIPE, stdout=subprocess.PIPE)
bootpeer = p.communicate(bootpeer)[0].split() bootpeer = p.communicate(bootpeer)[0].split()
return self._addPeer(bootpeer) if bootpeer[0] != self._prefix:
except socket.error: self.addPeer(*bootpeer)
pass return bootpeer
except sqlite3.IntegrityError, e: logging.warning('Buggy registry sent us our own address')
if e.args[0] != 'column prefix is not unique':
raise def addPeer(self, prefix, address):
except Exception, e: logging.debug('Adding peer %s: %s', prefix, address)
logging.info('Unable to bootstrap : %s' % e) with self._db:
return False q = self._db.execute
try:
def usePeer(self, prefix): (a,), = q("SELECT address FROM peer WHERE prefix=?", (prefix,))
logging.trace('Updating peers database : using peer %s' % prefix) except ValueError:
self._db.execute("UPDATE peers SET used = 2 WHERE prefix = ?", q("DELETE FROM peer WHERE prefix IN (SELECT peer"
(prefix,)) " FROM volatile.stat ORDER BY try, RANDOM() LIMIT ?,-1)",
logging.debug('DB updated') (self._db_size,))
a = None
def unusePeer(self, prefix): if a != address:
logging.trace('Updating peers database : unusing peer %s' % prefix) q("INSERT OR REPLACE INTO peer VALUES (?,?)", (prefix, address))
self._db.execute("UPDATE peers SET used = 1 WHERE prefix = ?", q("INSERT OR REPLACE INTO volatile.stat VALUES (?,0)", (prefix,))
(prefix,))
logging.debug('DB updated')
def handle_message(self, msg):
script_type, arg = msg.split()
if script_type == 'client-connect':
logging.info('Incoming connection from %s' % (arg,))
prefix = utils.binFromSubnet(arg)
if self.tunnel_manager.checkIncomingTunnel(prefix):
self.blacklist(prefix, 2)
elif script_type == 'client-disconnect':
self.whitelist(utils.binFromSubnet(arg))
logging.info('%s has disconnected' % (arg,))
elif script_type == 'route-up':
if self._ip_changed:
address = self._ip_changed(arg)
if self.address != address:
self.address = address
logging.info('Received new external ip : %s', arg)
try:
self._declare()
except socket.error, e:
logging.info("Connection to server failed while"
" declaring external infos (%s)", e)
else:
logging.info("Unknown message received from the openvpn pipe: %s",
msg)
def readSocket(self):
msg = self.socket_file.readline()
peer = msg.replace('\n', '').split(' ')
if len(peer) != 2:
logging.debug('Invalid package recieved : %s' % msg)
return
self._addPeer(peer)
def _addPeer(self, peer):
logging.debug('Adding peer %s' % peer)
if int(self._db.execute("""SELECT COUNT(*) FROM blacklist.flag WHERE prefix = ?""", (peer[0],)).next()[0]) > 0:
logging.info('Peer is blacklisted')
return False
self._db.execute("""DELETE FROM peers WHERE used <> 2 ORDER BY used DESC, date DESC
LIMIT MAX(0, (SELECT COUNT(*) FROM peers
WHERE used <> 2) - ?)""", (str(self.db_size),))
self._db.execute("UPDATE peers SET address = ?, used = 0, date = strftime('%s','now') WHERE used = 1 and prefix = ?", (peer[1], peer[0],))
self._db.execute("INSERT OR IGNORE INTO peers (prefix, address) VALUES (?,?)", peer)
logging.debug('Peer added')
return True
...@@ -5,4 +5,4 @@ if os.environ['script_type'] == 'up': ...@@ -5,4 +5,4 @@ if os.environ['script_type'] == 'up':
os.execlp('ip', 'ip', 'link', 'set', os.environ['dev'], 'up') os.execlp('ip', 'ip', 'link', 'set', os.environ['dev'], 'up')
# Write into pipe external ip address received # Write into pipe external ip address received
os.write(int(sys.argv[1]), '%(script_type)s %(OPENVPN_external_ip)s\n' % os.environ) os.write(int(sys.argv[1]), '%(script_type)s %(common_name)s %(OPENVPN_external_ip)s\n' % os.environ)
...@@ -74,7 +74,7 @@ def router(network, subnet, subnet_size, interface_list, ...@@ -74,7 +74,7 @@ def router(network, subnet, subnet_size, interface_list,
'-C', 'out local ip %s/%s le %s' % (subnet, subnet_size, subnet_size), '-C', 'out local ip %s/%s le %s' % (subnet, subnet_size, subnet_size),
'-C', 'out local deny', '-C', 'out local deny',
# Route VIFIB ip adresses # Route VIFIB ip adresses
'-C', 'in ip %s::/%u' % (utils.ipFromBin(network), len(network)), '-C', 'in ip %s/%u' % (utils.ipFromBin(network), len(network)),
# Route only addresse in the 'local' network, # Route only addresse in the 'local' network,
# or other entire networks # or other entire networks
#'-C', 'in ip %s' % (config.internal_ip), #'-C', 'in ip %s' % (config.internal_ip),
......
import os, traceback, time, subprocess, logging import logging, random, socket, time
import socket from itertools import chain
import random from . import plib, utils
import plib
import utils
# Be carfull the refresh interval should let the routes be established PORT = 326
RTF_CACHE = 0x01000000 # cache entry
# Be careful the refresh interval should let the routes be established
class Connection: class Connection:
def __init__(self, address, write_pipe, hello, iface, prefix, encrypt, def __init__(self, address, write_pipe, hello, iface, prefix, encrypt,
ovpn_args): ovpn_args):
self.process = plib.client(iface, address, write_pipe, hello, encrypt, self.process = plib.client(iface, address, write_pipe, hello, encrypt,
*ovpn_args) '--tls-remote', '%u/%u' % (int(prefix, 2), len(prefix)),
'--connect-retry-max', '3', *ovpn_args)
self.iface = iface self.iface = iface
self.routes = 0 self.routes = 0
self._prefix = prefix self._prefix = prefix
...@@ -26,25 +27,31 @@ class Connection: ...@@ -26,25 +27,31 @@ class Connection:
return True return True
class TunnelManager: class TunnelManager(object):
def __init__(self, write_pipe, peer_db, openvpn_args, hello_interval, def __init__(self, write_pipe, peer_db, openvpn_args, hello_interval,
refresh, connection_count, iface_list, network, prefix, nSend, refresh, connection_count, iface_list, network, prefix,
encrypt): address, ip_changed, encrypt):
self._write_pipe = write_pipe self._write_pipe = write_pipe
self._peer_db = peer_db self._peer_db = peer_db
self._connecting = set()
self._connection_dict = {} self._connection_dict = {}
self._disconnected = None
self._distant_peers = []
self._iface_to_prefix = {} self._iface_to_prefix = {}
self._ovpn_args = openvpn_args self._ovpn_args = openvpn_args
self._hello = hello_interval self._hello = hello_interval
self._refresh_time = refresh self._refresh_time = refresh
self._network = network self._network = network
self._net_len = len(network)
self._iface_list = iface_list self._iface_list = iface_list
self._prefix = prefix self._prefix = prefix
self._nSend = nSend self._address = utils.address_str(address)
self._ip_changed = ip_changed
self._encrypt = encrypt self._encrypt = encrypt
self._fast_start_done = False self._served = set()
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
self.sock.bind(('::', PORT))
self.next_refresh = time.time() self.next_refresh = time.time()
self._next_tunnel_refresh = time.time() self._next_tunnel_refresh = time.time()
...@@ -57,11 +64,13 @@ class TunnelManager: ...@@ -57,11 +64,13 @@ class TunnelManager:
def refresh(self): def refresh(self):
logging.debug('Checking tunnels...') logging.debug('Checking tunnels...')
self._cleanDeads() self._cleanDeads()
if self._next_tunnel_refresh < time.time(): remove = self._next_tunnel_refresh < time.time()
if remove:
self._countRoutes() self._countRoutes()
self._removeSomeTunnels() self._removeSomeTunnels()
self._next_tunnel_refresh = time.time() + self._refresh_time self._next_tunnel_refresh = time.time() + self._refresh_time
self._makeNewTunnels() self._peer_db.log()
self._makeNewTunnels(remove)
self.next_refresh = time.time() + 5 self.next_refresh = time.time() + 5
def _cleanDeads(self): def _cleanDeads(self):
...@@ -87,92 +96,201 @@ class TunnelManager: ...@@ -87,92 +96,201 @@ class TunnelManager:
# If the process is already exited # If the process is already exited
pass pass
self.free_interface_set.add(connection.iface) self.free_interface_set.add(connection.iface)
self._peer_db.unusePeer(prefix)
del self._iface_to_prefix[connection.iface] del self._iface_to_prefix[connection.iface]
logging.trace('Connection with %s/%u killed' logging.trace('Connection with %s/%u killed'
% (hex(int(prefix, 2))[2:], len(prefix))) % (hex(int(prefix, 2))[2:], len(prefix)))
def _makeNewTunnels(self): def _makeTunnel(self, prefix, address):
tunnel_to_make = self._client_count - len(self._connection_dict) assert len(self._connection_dict) < self._client_count, (prefix, self.__dict__)
if tunnel_to_make <= 0: if prefix in self._served or prefix in self._connection_dict:
return False
assert prefix != self._prefix, self.__dict__
logging.info('Establishing a connection with %s/%u',
hex(int(prefix, 2))[2:], len(prefix))
iface = self.free_interface_set.pop()
self._connection_dict[prefix] = Connection(address, self._write_pipe,
self._hello, iface, prefix, self._encrypt, self._ovpn_args)
self._iface_to_prefix[iface] = prefix
self._peer_db.connecting(prefix, 1)
return True
def _makeNewTunnels(self, route_counted):
count = self._client_count - len(self._connection_dict)
if not count:
return return
i = 0 assert count >= 0
logging.trace('Trying to make %i new tunnels...' % tunnel_to_make) # CAVEAT: Forget any peer that didn't reply to our previous address
try: # request, either because latency is too high or some packet
for prefix, address in self._peer_db.getUnusedPeers(tunnel_to_make): # was lost. However, this means that some time should pass
logging.info('Establishing a connection with %s/%u' % # before calling _makeNewTunnels again.
(hex(int(prefix, 2))[2:], len(prefix))) self._connecting.clear()
iface = self.free_interface_set.pop() distant_peers = self._distant_peers
self._connection_dict[prefix] = Connection(address, if len(distant_peers) < count and not route_counted:
self._write_pipe, self._hello, iface, self._countRoutes()
prefix, self._encrypt, self._ovpn_args) disconnected = self._disconnected
self._iface_to_prefix[iface] = prefix if disconnected is not None:
self._peer_db.usePeer(prefix) # We aren't the registry node and we have no tunnel to or from it,
i += 1 # so it looks like we are not connected to the network, and our
logging.trace('%u new tunnels established' % (i,)) # neighbours are in the same situation.
except KeyError: self._disconnected = None
logging.warning("""Can't establish connection with %s disconnected = set(disconnected).union(distant_peers)
: no available interface""" % prefix) if disconnected:
except Exception: # We do have neighbours that are probably also disconnected,
traceback.print_exc() # so force rebootstrapping.
peer = self._peer_db.getBootstrapPeer()
if not peer:
# Registry dead ? Assume we're connected after all.
disconnected = None
elif peer[0] not in disconnected:
# Got a node that will probably help us rejoining the
# network, so connect to it.
count -= self._makeTunnel(*peer)
if disconnected is None:
# Normal operation. Choose peers to connect to by looking at the
# routing table.
while count and distant_peers:
i = random.randrange(0, len(distant_peers))
peer = distant_peers[i]
distant_peers[i] = distant_peers[-1]
del distant_peers[-1]
address = self._peer_db.getAddress(peer)
if address:
count -= self._makeTunnel(peer, address)
else:
ip = utils.ipFromBin(self._network + peer)
try:
self.sock.sendto('\2', (ip, PORT))
except socket.error, e:
logging.info('Failed to query %s (%s)', ip, e)
self._connecting.add(peer)
count -= 1
elif count:
# No route/tunnel to registry, which usually happens when starting
# up. Select peers from cache for which we have no route.
for peer, address in self._peer_db.getPeerList():
if peer not in disconnected and self._makeTunnel(peer, address):
count -= 1
if not count:
break
else:
if not (disconnected or self._served or self._connection_dict):
# Startup without any good address in the cache.
peer = self._peer_db.getBootstrapPeer()
if not (peer and self._makeTunnel(*peer)):
# Failed to bootstrap ! Last change to connect is to
# retry an address that already failed :(
for peer in self._peer_db.getPeerList(1):
if self._makeTunnel(*peer):
break
def _countRoutes(self): def _countRoutes(self):
logging.debug('Starting to count the routes on each interface...') logging.debug('Starting to count the routes on each interface...')
self._peer_db.clear_blacklist(0) del self._distant_peers[:]
possiblePeers = set() for conn in self._connection_dict.itervalues():
for iface in self._iface_to_prefix.keys(): conn.routes = 0
self._connection_dict[self._iface_to_prefix[iface]].routes = 0 a = len(self._network)
for line in open('/proc/net/ipv6_route'): b = a + len(self._prefix)
line = line.split() other = []
ip = bin(int(line[0], 16))[2:].rjust(128, '0') with open('/proc/net/ipv6_route') as f:
self._last_routing_table = f.read()
if (ip.startswith(self._network) and for line in self._last_routing_table.splitlines():
not ip.startswith(self._network + self._prefix)): line = line.split()
iface = line[-1] iface = line[-1]
subnet_size = int(line[1], 16) if iface == 'lo' or int(line[-2], 16) & RTF_CACHE:
logging.trace('Route on iface %s detected to %s/%s' continue
% (iface, line[0], subnet_size)) ip = bin(int(line[0], 16))[2:].rjust(128, '0')
if iface in self._iface_to_prefix.keys(): if ip[:a] != self._network or ip[a:b] == self._prefix:
self._connection_dict[self._iface_to_prefix[iface]].routes += 1 continue
if iface in self._iface_list and self._net_len < subnet_size < 128: prefix_len = int(line[1], 16)
prefix = ip[self._net_len:subnet_size] prefix = ip[a:prefix_len]
logging.debug('A route to %s has been discovered on the LAN' logging.trace('Route on iface %s detected to %s/%u',
% hex(int(prefix, 2))[2:]) iface, utils.ipFromBin(ip), prefix_len)
self._peer_db.blacklist(prefix, 0) nexthop = self._iface_to_prefix.get(iface)
possiblePeers.add(line[0]) if nexthop:
self._connection_dict[nexthop].routes += 1
if not self._fast_start_done and len(possiblePeers) > 4: if prefix in self._served or prefix in self._connection_dict:
nSend = min(self._peer_db.db_size, len(possiblePeers)) continue
if iface in self._iface_list:
other.append(prefix)
else:
self._distant_peers.append(prefix)
is_registry = self._peer_db.registry_ip[a:].startswith
if is_registry(self._prefix) or any(is_registry(peer)
for peer in chain(self._distant_peers, other,
self._served, self._connection_dict)):
self._disconnected = None
# XXX: When there is no new peer to connect when looking at routes
# coming from tunnels, we'd like to consider those discovered
# from the LAN. However, we don't want to create tunnels to
# nodes of the LAN so do nothing until we find a way to get
# some information from Babel.
#if not self._distant_peers:
# self._distant_peers = other
else: else:
nSend = min(2, len(possiblePeers)) self._disconnected = other
for ip in random.sample(possiblePeers, nSend): logging.debug("Routes counted: %u distant peers",
self._notifyPeer(ip) len(self._distant_peers))
for c in self._connection_dict.itervalues():
logging.debug("Routes have been counted") logging.trace('- %s: %s', c.iface, c.routes)
for p in self._connection_dict.keys():
logging.trace('Routes on iface %s : %s' % (
self._connection_dict[p].iface,
self._connection_dict[p].routes))
def killAll(self): def killAll(self):
for prefix in self._connection_dict.keys(): for prefix in self._connection_dict.keys():
self._kill(prefix, True) self._kill(prefix, True)
def checkIncomingTunnel(self, prefix): def handleTunnelEvent(self, msg):
if prefix in self._connection_dict:
if prefix < self._prefix:
return False
else:
self._kill(prefix)
return True
def _notifyPeer(self, peerIp):
try: try:
if self._peer_db.address: script_type, arg = msg.split(None, 1)
ip = '%s:%s:%s:%s:%s:%s:%s:%s' % (peerIp[0:4], peerIp[4:8], peerIp[8:12], m = getattr(self, '_ovpn_' + script_type.replace('-', '_'))
peerIp[12:16], peerIp[16:20], peerIp[20:24], peerIp[24:28], peerIp[28:32]) except (AttributeError, ValueError):
logging.trace('Notifying peer %s' % ip) logging.warning("Unknown message received from OpenVPN: %s", msg)
self._peer_db.sock.sendto('%s %s\n' % (self._prefix, utils.address_str(self._peer_db.address)), (ip, 326)) else:
except socket.error, e: logging.debug('%s: %s', script_type, arg)
logging.debug('Unable to notify %s' % ip) m(arg)
logging.debug('socket.error : %s' % e)
def _ovpn_client_connect(self, common_name):
prefix = utils.binFromSubnet(common_name)
self._served.add(prefix)
if prefix in self._connection_dict and self._prefix < prefix:
self._kill(prefix)
self._peer_db.connecting(prefix, 0)
def _ovpn_client_disconnect(self, common_name):
prefix = utils.binFromSubnet(common_name)
self._served.remove(prefix)
def _ovpn_route_up(self, arg):
common_name, ip = arg.split()
self._peer_db.connecting(utils.binFromSubnet(common_name), 0)
if self._ip_changed:
self._address = utils.address_str(self._ip_changed(ip))
def handlePeerEvent(self):
msg, address = self.sock.recvfrom(1<<16)
code = ord(msg[0])
if code == 1: # answer
# TODO: do not fail if message contains garbage
# We parse the message in a way to discard a truncated line.
for peer in msg[1:].split('\n')[:-1]:
prefix, address = peer.split()
if prefix != self._prefix:
self._peer_db.addPeer(prefix, address)
try:
self._connecting.remove(prefix)
except KeyError:
continue
self._makeTunnel(prefix, address)
elif code == 2: # request
encode = '%s %s\n'.__mod__
if self._address:
msg = [encode((self._prefix, self._address))]
else: # I don't know my IP yet!
msg = []
# Add an extra random peer, mainly for the registry.
for peer in self._peer_db.getPeerList():
msg.append(encode(peer))
break
if msg:
try:
self.sock.sendto('\1' + ''.join(msg), address)
except socket.error, e:
logging.info('Failed to reply to %s (%s)', address, e)
...@@ -4,10 +4,13 @@ from OpenSSL import crypto ...@@ -4,10 +4,13 @@ from OpenSSL import crypto
logging_levels = logging.WARNING, logging.INFO, logging.DEBUG, 5 logging_levels = logging.WARNING, logging.INFO, logging.DEBUG, 5
def setupLog(log_level): def setupLog(log_level, **kw):
logging.basicConfig(level=logging_levels[log_level], if log_level:
logging.basicConfig(level=logging_levels[log_level-1],
format='%(asctime)s %(levelname)-9s %(message)s', format='%(asctime)s %(levelname)-9s %(message)s',
datefmt='%d-%m-%Y %H:%M:%S') datefmt='%d-%m-%Y %H:%M:%S', **kw)
else:
logging.disable(logging.CRITICAL)
logging.addLevelName(5, 'TRACE') logging.addLevelName(5, 'TRACE')
logging.trace = lambda *args, **kw: logging.log(5, *args, **kw) logging.trace = lambda *args, **kw: logging.log(5, *args, **kw)
...@@ -28,19 +31,10 @@ def binFromIp(ip): ...@@ -28,19 +31,10 @@ def binFromIp(ip):
return bin(ip1)[2:].rjust(64, '0') + bin(ip2)[2:].rjust(64, '0') return bin(ip1)[2:].rjust(64, '0') + bin(ip2)[2:].rjust(64, '0')
def ipFromBin(prefix): def ipFromBin(prefix, suffix=''):
prefix = hex(int(prefix, 2))[2:] ip = prefix + suffix.rjust(128 - len(prefix), '0')
ip = '' return socket.inet_ntop(socket.AF_INET6,
for i in xrange(0, len(prefix) - 1, 4): struct.pack('>QQ', int(ip[:64], 2), int(ip[64:], 2)))
ip += prefix[i:i + 4] + ':'
return ip.rstrip(':')
def ipFromPrefix(re6stnet, prefix, prefix_len):
prefix = bin(int(prefix))[2:].rjust(prefix_len, '0')
ip_t = (re6stnet + prefix).ljust(127, '0').ljust(128, '1')
return ipFromBin(ip_t), prefix
def networkFromCa(ca_path): def networkFromCa(ca_path):
# Get network prefix from ca.crt # Get network prefix from ca.crt
...@@ -48,15 +42,14 @@ def networkFromCa(ca_path): ...@@ -48,15 +42,14 @@ def networkFromCa(ca_path):
ca = crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) ca = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
return bin(ca.get_serial_number())[3:] return bin(ca.get_serial_number())[3:]
def ipFromCert(network, cert_path): def ipFromCert(network, cert_path):
# Get ip from cert.crt # Get ip from cert.crt
with open(cert_path, 'r') as f: with open(cert_path, 'r') as f:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
subject = cert.get_subject() subject = cert.get_subject()
prefix, prefix_len = subject.CN.split('/') prefix, prefix_len = subject.CN.split('/')
return ipFromPrefix(network, prefix, int(prefix_len)) prefix = bin(int(prefix))[2:].rjust(int(prefix_len), '0')
return ipFromBin(network + prefix, '1'), prefix
def address_str(address): def address_str(address):
return ';'.join(map(','.join, address)) return ';'.join(map(','.join, address))
...@@ -68,7 +61,5 @@ def address_list(address_list): ...@@ -68,7 +61,5 @@ def address_list(address_list):
def binFromSubnet(subnet): def binFromSubnet(subnet):
prefix, subnet_size = subnet.split('/') p, l = subnet.split('/')
binary = bin(int(prefix))[2:] return bin(int(p))[2:].rjust(int(l), '0')
binary = ('0' * (int(subnet_size) - len(binary))) + binary
return binary
#!/usr/bin/env python #!/usr/bin/env python
import atexit, os, sys, select, socket, time import atexit, os, sys, select, time
import argparse, signal, subprocess, sqlite3, logging, traceback import argparse, signal, subprocess, sqlite3, logging, traceback
from re6st import plib, utils, db, tunnel from re6st import plib, utils, db, tunnel
...@@ -27,13 +27,11 @@ def getConfig(): ...@@ -27,13 +27,11 @@ def getConfig():
_('--registry', required=True, _('--registry', required=True,
help="HTTP URL of the discovery peer server," help="HTTP URL of the discovery peer server,"
" with public host (default port: 80)") " with public host (default port: 80)")
_('--peers-db-refresh', default=43200, type=int,
help='the time (seconds) to wait before refreshing the peers db')
_('-l', '--log', default='/var/log', _('-l', '--log', default='/var/log',
help='Path to re6stnet logs directory') help='Path to re6stnet logs directory')
_('-s', '--state', default='/var/lib/re6stnet', _('-s', '--state', default='/var/lib/re6stnet',
help='Path to re6stnet state directory') help='Path to re6stnet state directory')
_('-v', '--verbose', default=0, type=int, _('-v', '--verbose', default=1, type=int,
help='Log level of re6st itself') help='Log level of re6st itself')
_('-i', '--interface', action='append', dest='iface_list', default=[], _('-i', '--interface', action='append', dest='iface_list', default=[],
help='Extra interface for LAN discovery') help='Extra interface for LAN discovery')
...@@ -83,7 +81,8 @@ def main(): ...@@ -83,7 +81,8 @@ def main():
db_path = os.path.join(config.state, 'peers.db') db_path = os.path.join(config.state, 'peers.db')
# Set logging # Set logging
utils.setupLog(config.verbose) utils.setupLog(config.verbose,
filename=os.path.join(config.log, 're6stnet.log'))
logging.trace("Configuration :\n%s" % config) logging.trace("Configuration :\n%s" % config)
...@@ -124,13 +123,11 @@ def main(): ...@@ -124,13 +123,11 @@ def main():
if address: if address:
ip_changed = None ip_changed = None
peer_db = db.PeerManager(db_path, config.registry, config.key, peer_db = db.PeerDB(db_path, config.registry, config.key, prefix)
config.peers_db_refresh, address, internal_ip, prefix,
ip_changed, 200)
tunnel_manager = tunnel.TunnelManager(write_pipe, peer_db, openvpn_args, tunnel_manager = tunnel.TunnelManager(write_pipe, peer_db, openvpn_args,
config.hello, config.tunnel_refresh, config.connection_count, config.hello, config.tunnel_refresh, config.connection_count,
config.iface_list, network, prefix, 2, config.encrypt) config.iface_list, network, prefix, address, ip_changed,
peer_db.tunnel_manager = tunnel_manager config.encrypt)
# Launch routing protocol. WARNING : you have to be root to start babeld # Launch routing protocol. WARNING : you have to be root to start babeld
server_tunnels = {} server_tunnels = {}
...@@ -156,24 +153,24 @@ def main(): ...@@ -156,24 +153,24 @@ def main():
config.connection_count, config.dh, write_pipe, port, config.connection_count, config.dh, write_pipe, port,
proto, config.hello, config.encrypt, *openvpn_args)) proto, config.hello, config.encrypt, *openvpn_args))
while True: while True:
nextUpdate = min(tunnel_manager.next_refresh, peer_db.next_refresh) next = tunnel_manager.next_refresh
if forwarder: if forwarder:
nextUpdate = min(nextUpdate, forwarder.next_refresh) next = min(next, forwarder.next_refresh)
nextUpdate = max(0, nextUpdate - time.time()) r = [read_pipe, tunnel_manager.sock]
select_list = [read_pipe] r = select.select(r, [], [], max(0, next - time.time()))[0]
if peer_db.socket_file: if read_pipe in r:
select_list.append(peer_db.socket_file) tunnel_manager.handleTunnelEvent(read_pipe.readline())
ready, tmp1, tmp2 = select.select(select_list, [], [], nextUpdate) if tunnel_manager.sock in r:
if read_pipe in ready: tunnel_manager.handlePeerEvent()
peer_db.handle_message(read_pipe.readline()) t = time.time()
if time.time() >= peer_db.next_refresh: if t >= tunnel_manager.next_refresh:
peer_db.refresh()
if time.time() >= tunnel_manager.next_refresh:
tunnel_manager.refresh() tunnel_manager.refresh()
if forwarder and time.time() > forwarder.next_refresh: if forwarder and t >= forwarder.next_refresh:
forwarder.refresh() forwarder.refresh()
if peer_db.socket_file in ready: except Exception:
peer_db.readSocket() f = traceback.format_exception(*sys.exc_info())
logging.error('%s%s', f.pop(), ''.join(f))
raise
finally: finally:
router.terminate() router.terminate()
for p in server_process: for p in server_process:
...@@ -186,7 +183,6 @@ def main(): ...@@ -186,7 +183,6 @@ def main():
except: except:
pass pass
except sqlite3.Error: except sqlite3.Error:
traceback.print_exc()
os.rename(db_path, db_path + '.bak') os.rename(db_path, db_path + '.bak')
try: try:
sys.exitfunc() sys.exitfunc()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment