Commit 66f2c73b authored by Tom Niget's avatar Tom Niget

even more python2 to python3

parent 1b0048c3
......@@ -14,9 +14,8 @@ REGISTRY2_SERIAL = '0x120010db80043'
CA_DAYS = 1000
# Quick check to avoid wasting time if there is an error.
with open(os.devnull, "wb") as f:
for x in 're6stnet', 're6st-conf', 're6st-registry':
subprocess.check_call(('./py', x, '--help'), stdout=f)
for x in 're6stnet', 're6st-conf', 're6st-registry':
subprocess.check_call(('./py', x, '--help'), stdout=subprocess.DEVNULL)
#
# Underlying network:
#
......@@ -205,10 +204,9 @@ for m in machine6, machine7, machine8:
# Test connectivity first. Run process, hide output and check
# return code
null = file(os.devnull, "r+")
for ip in '10.1.1.2', '10.1.1.3', '10.2.1.2', '10.2.1.3':
if machine1.Popen(('ping', '-c1', ip), stdout=null).wait():
print('Failed to ping %s' % ip)
if machine1.Popen(('ping', '-c1', ip), stdout=subprocess.DEVNULL).wait():
print(('Failed to ping %s' % ip))
break
else:
print("Connectivity IPv4 OK!")
......@@ -225,7 +223,7 @@ def new_network(registry, reg_addr, serial, ca):
" -subj /CN=re6st.example.com/emailAddress=re6st@example.com"
" -set_serial %s -days %u"
% (registry.name, ca, serial, CA_DAYS), shell=True)
with open(ca) as f:
with open(ca, "rb") as f:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
fingerprint = "sha256:" + hashlib.sha256(
crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)).hexdigest()
......@@ -368,12 +366,12 @@ def node_by_ll(addr):
if a.startswith('10.42.'):
assert not p % 8
_ll[socket.inet_ntoa(socket.inet_aton(
a)[:p/8].ljust(4, '\0'))] = n, t
a)[:p//8].ljust(4, b'\0'))] = n, t
elif a.startswith('2001:db8:'):
assert not p % 8
a = socket.inet_ntop(socket.AF_INET6,
socket.inet_pton(socket.AF_INET6,
a)[:p/8].ljust(16, '\0'))
a)[:p//8].ljust(16, b'\0'))
elif not a.startswith('fe80::'):
continue
_ll[a] = n, t
......@@ -424,7 +422,7 @@ def route_svg(ipv4, z = 4, default = type('', (), {'short': None})):
gv.append('}\n')
return subprocess.Popen(('neato', '-Tsvg'),
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
).communicate('\n'.join(gv))[0]
).communicate('\n'.join(gv).encode("utf-8"))[0].decode("utf-8")
if args.port:
import http.server, socketserver
......@@ -450,34 +448,34 @@ if args.port:
if page < 2:
body = route_svg(page)
else:
body = registry.Popen(('python', '-c', r"""if 1:
body = registry.Popen(('python3', '-c', r"""if 1:
import math, json
from re6st.registry import RegistryClient
g = json.loads(RegistryClient(
'http://localhost/').topology())
r = set(g.pop('', ()))
a = set()
for v in g.itervalues():
for v in g.values():
a.update(v)
g.update(dict.fromkeys(a.difference(g), ()))
print 'digraph {'
print('digraph {')
a = 2 * math.pi / len(g)
z = 4
m2 = '%u/80' % (2 << 64)
title = lambda n: '2|80' if n == m2 else n
g = sorted((title(k), k in r, v) for k, v in g.iteritems())
g = sorted((title(k), k in r, v) for k, v in g.items())
for i, (n, r, v) in enumerate(g):
print '"%s"[pos="%s,%s!"%s];' % (title(n),
print('"%s"[pos="%s,%s!"%s];' % (title(n),
z * math.cos(a * i), z * math.sin(a * i),
'' if r else ', style=dashed')
'' if r else ', style=dashed'))
for v in v:
print '"%s" -> "%s";' % (n, title(v))
print '}'
"""), stdout=subprocess.PIPE, cwd="..").communicate()[0]
print('"%s" -> "%s";' % (n, title(v)))
print('}')
"""), stdout=subprocess.PIPE, cwd="..").communicate()[0].decode("utf-8")
if body:
body = subprocess.Popen(('neato', '-Tsvg'),
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
).communicate(body)[0]
).communicate(body.encode("utf-8"))[0].decode("utf-8")
if not body:
self.send_error(500)
return
......@@ -508,7 +506,8 @@ if args.port:
for i, x in enumerate(self.pages)),
body[body.find('<svg'):])
self.send_response(200)
self.send_header('Content-Length', len(body))
body = body.encode("utf-8")
self.send_header('Content-Length', str(len(body)))
self.send_header('Content-type', mt + '; charset=utf-8')
self.end_headers()
self.wfile.write(body)
......
......@@ -18,10 +18,10 @@
import re
import os
from new import function
from nemu.iproute import backticks, get_if_data, route, \
get_addr_data, get_all_route_data, interface
from nemu.interface import Switch, Interface
from types import FunctionType
def _get_all_route_data():
ipdata = backticks([IP_PATH, "-o", "route", "list"]) # "table", "all"
......@@ -65,7 +65,7 @@ def __init__(self, *args, **kw):
self.name = self.name.split('@',1)[0]
interface.__init__ = __init__
get_addr_data.orig = function(get_addr_data.__code__,
get_addr_data.orig = FunctionType(get_addr_data.__code__,
get_addr_data.__globals__)
def _get_addr_data():
byidx, bynam = get_addr_data.orig()
......
......@@ -5,7 +5,7 @@ from . import utils, version, x509
class Cache(object):
def __init__(self, db_path, registry, cert, db_size=200):
def __init__(self, db_path, registry, cert: x509.Cert, db_size=200):
self._prefix = cert.prefix
self._db_size = db_size
self._decrypt = cert.decrypt
......@@ -89,8 +89,10 @@ class Cache(object):
logging.info("Getting new network parameters from registry...")
try:
# TODO: When possible, the registry should be queried via the re6st.
network_config = self._registry.getNetworkConfig(self._prefix)
logging.debug('config %r' % network_config) # todo
x = json.loads(zlib.decompress(
self._registry.getNetworkConfig(self._prefix)))
network_config))
base64_list = x.pop('', ())
config = {}
for k, v in x.items():
......
#!/usr/bin/python2
#!/usr/bin/env python3
import argparse, atexit, binascii, errno, hashlib
import os, subprocess, sqlite3, sys, time
from OpenSSL import crypto
......@@ -13,7 +13,8 @@ def create(path, text=None, mode=0o666):
finally:
os.close(fd)
def loadCert(pem):
def loadCert(pem: bytes):
assert pem
return crypto.load_certificate(crypto.FILETYPE_PEM, pem)
def main():
......@@ -140,7 +141,7 @@ def main():
req.set_pubkey(pkey)
req.sign(pkey, 'sha512')
req = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
req = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req).decode("ascii")
# First make sure we can open certificate file for writing,
# to avoid using our token for nothing.
......
#!/usr/bin/python2
#!/usr/bin/env python3
import atexit, errno, logging, os, shutil, signal
import socket, struct, subprocess, sys
from collections import deque
......
#!/usr/bin/python2
#!/usr/bin/env python3
import http.client, logging, os, socket, sys
from http.server import BaseHTTPRequestHandler
from socketserver import ThreadingTCPServer
......@@ -29,13 +29,13 @@ class RequestHandler(BaseHTTPRequestHandler):
path = self.path
query = {}
else:
query = dict(parse_qsl(query, keep_blank_values=1,
strict_parsing=1))
query = dict(parse_qsl(query, keep_blank_values=True,
strict_parsing=True))
_, path = path.split('/')
if not _:
return self.server.handle_request(self, path, query)
except Exception:
logging.info(self.requestline, exc_info=1)
logging.info(self.requestline, exc_info=True)
self.send_error(http.client.BAD_REQUEST)
def log_error(*args):
......
......@@ -34,13 +34,13 @@ class Array(object):
def __init__(self, item):
self._item = item
def encode(self, buffer, value):
def encode(self, buffer: bytes, value: list):
buffer += uint16.pack(len(value))
encode = self._item.encode
for value in value:
encode(buffer, value)
def decode(self, buffer, offset=0):
def decode(self, buffer: bytes, offset=0) -> tuple[int, list]:
r = []
o = offset + 2
decode = self._item.decode
......@@ -52,13 +52,13 @@ class Array(object):
class String(object):
@staticmethod
def encode(buffer, value):
buffer += value + b'\x00'
def encode(buffer: bytes, value: str):
buffer += value.encode("utf-8") + b'\x00'
@staticmethod
def decode(buffer, offset=0):
def decode(buffer: bytes, offset=0) -> tuple[int, str]:
i = buffer.index(0, offset)
return i + 1, buffer[offset:i]
return i + 1, buffer[offset:i].decode("utf-8")
class Buffer(object):
......@@ -195,7 +195,7 @@ class Babel(object):
logging.debug("Can't connect to %r (%r)", self.socket_path, e)
return e
s.send(b'\x01')
s.setblocking(0)
s.setblocking(False)
del self.select
self.socket = s
return self.select(*args)
......
#!/usr/bin/python -S
#!/usr/bin/env -S python3 -S
import os, sys
script_type = os.environ['script_type']
......
#!/usr/bin/python -S
#!/usr/bin/env -S python3 -S
import os, sys
script_type = os.environ['script_type']
......@@ -7,7 +7,7 @@ external_ip = os.getenv('trusted_ip') or os.environ['trusted_ip6']
# Write into pipe connect/disconnect events
fd = int(sys.argv[1])
os.write(fd, repr((script_type, (os.environ['common_name'], os.environ['dev'],
int(os.environ['tls_serial_0']), external_ip))))
int(os.environ['tls_serial_0']), external_ip))).encode("utf-8"))
if script_type == 'client-connect':
if os.read(fd, 1) == b'\x00':
......
import binascii
import logging, errno, os
from typing import Optional
from . import utils
here = os.path.realpath(os.path.dirname(__file__))
ovpn_server = os.path.join(here, 'ovpn-server')
ovpn_client = os.path.join(here, 'ovpn-client')
ovpn_log = None
ovpn_log: Optional[str] = None
def openvpn(iface, encrypt, *args, **kw):
args = ['openvpn',
......@@ -80,9 +83,9 @@ def router(ip, ip4, rt6, hello_interval, log_path, state_path, pidfile,
'-C', 'redistribute local deny',
'-C', 'redistribute ip %s/%s eq %s' % (ip, n, n)]
if hmac_sign:
def key(cmd, id, value):
def key(cmd, id: str, value):
cmd += '-C', ('key type blake2s128 id %s value %s' %
(id, value.encode('hex')))
(id, binascii.hexlify(value)))
key(cmd, 'sign', hmac_sign)
default += ' key sign'
if hmac_accept is not None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment