Commit 575563a4 authored by Xavier Thompson's avatar Xavier Thompson

slapformat: WIP: Move Interface class down

parent 99e28608
...@@ -288,184 +288,6 @@ class Computer(object): ...@@ -288,184 +288,6 @@ class Computer(object):
pass pass
class Interface(object):
ipv4_interface : str
ipv6_interface : str
ipv4_network : ipaddress.IPv4Network
ipv6_network : ipaddress.IPv6Network
tap_ipv4_network : ipaddress.IPv4Network
conf : FormatConfig
def __init__(self, conf):
self.conf = conf
self.ipv4_interface = conf.interface_name
self.ipv6_interface = conf.ipv6_interface or conf.interface_name
self.ipv4_network = self.getIPv4Network(conf.ipv4_local_network)
self.ipv6_network = self.getIPv6Network()
self.tap_ipv4_network = self.getTapIPv4Network(conf)
def getIPv4Network(self, cidr):
if cidr:
# XXX allow ipv4_local_network to be None ?
return ipaddress.IPv4Network(cidr, strict=False)
def getPartitionIPv4Addr(self, index):
network = self.ipv4_network
return ipaddress.IPv4Interface((network[index + 2], network.prefixlen))
def getTapIPv4Network(self, conf):
tap_gateway_interface = conf.tap_gateway_interface
if conf.create_tap and tap_gateway_interface:
try:
addresses = netifaces.ifaddresses(tap_gateway_interface)[AF_INET]
except ValueError:
self.conf.abort(
"Interface %s (intended as tap gateway interface) does not exist",
tap_gateway_interface
)
except KeyError:
self.conf.abort(
"%s must have at least one IPv4 address assigned",
tap_gateway_interface
)
result = None
for a in addresses:
address = a['addr'].split('%')[0]
netmask = a['netmask'].split('/')[-1]
ip = ipaddress.IPv4Interface('%s/%s' % (address, netmask))
network = ip.network
if not result or network.prefixlen < result.prefixlen:
result = network
return result
def getTapIPv4Gateway(self, _index):
network = self.tap_ipv4_network
if network:
return ipaddress.IPv4Interface((network[1], network.prefixlen))
def getTapIPv4Range(self, index):
network = self.tap_ipv4_network
if network:
prefixlen = conf.tap_ipv4_prefix
i = index + 2 if prefixlen == 32 else index + 1
k = 1 if prefixlen < 31 else 0
addr = network[(i << (32 - prefixlen)) + k]
return ipaddress.IPv4Interface((addr, prefixlen))
def getIPv6Network(self):
try:
addresses = netifaces.ifaddresses(self.ipv6_interface)[AF_INET6]
except KeyError:
self.conf.abort(
"%s must have at least one IPv6 address assigned",
self.ipv6_interface
)
result = None
for a in addresses:
address = a['addr'].split('%')[0]
netmask = a['netmask'].split('/')[-1]
ip = ipaddress.IPv6Interface('%s/%s' % (address, netmask))
network = ip.network
if network.is_global:
if not result or network.prefixlen < result.prefixlen:
result = network
return result
def getIPv6LinkLocalAddress(self, interface):
try:
addresses = netifaces.ifaddresses(interface)[AF_INET6]
except KeyError:
addresses = ()
for a in addresses:
address = a['addr'].split('%')[0]
netmask = a['netmask'].split('/')[-1]
ip = ipaddress.IPv6Interface('%s/%s' % (address, netmask))
if ip.is_link_local:
return ip
self.conf.abort(
"No link-local IPv6 address found on %s",
interface
)
def getComputerIPv6Addr(self):
network = self.ipv6_network
return ipaddress.IPv6Interface((network[1], network.prefixlen))
def getPartitionIPv6Addr(self, index):
network = self.ipv6_network
return ipaddress.IPv6Interface((network[index + 2], network.prefixlen))
def getPartitionIPv6Range(self, index):
network = self.ipv6_network
prefixlen = network.prefixlen + 16
if prefixlen > 128: # XXX move this check elsewhere
self.conf.abort("IPv6 network %s is too small for IPv6 ranges", network)
bits = 128 - network.prefixlen
addr = network[(1 << (bits - 2)) + (index << (128 - prefixlen))]
return ipaddress.IPv6Network((addr, prefixlen))
def getTapIPv6Range(self, index):
network = self.ipv6_network
prefixlen = network.prefixlen + 16
if prefixlen > 128: # XXX move this check elsewhere
self.conf.abort("IPv6 network %s is too small for IPv6 ranges", network)
bits = 128 - network.prefixlen
addr = network[(2 << (bits - 2)) + (index << (128 - prefixlen)) + 1]
return ipaddress.IPv6Interface((addr, prefixlen))
def enableIPv6NonLocalBind(self):
call(['sysctl', 'net.ipv6.ip_nonlocal_bind=1'])
network = str(self.ipv6_network)
_, result = call(['ip', '-6', 'route', 'show', 'table', 'local', network])
if not 'dev lo' in result:
call(['ip', '-6', 'route', 'add', 'local', network, 'dev', 'lo'])
def addAddress(self, ip, reason, interface=None):
interface = interface or getattr(self, 'ipv%d_interface' % ip.version)
af = {4: AF_INET, 6: AF_INET6}[ip.version]
address = str(ip.ip)
netmask = str(ip.network.netmask)
for iface in netifaces.interfaces():
if iface != interface:
ifaddresses = netifaces.ifaddresses(iface).get(af, ())
if address in (q['addr'].split('%')[0] for q in ifaddresses):
self.conf.abort(
"Cannot add address %s (%s) to %s because it already exists on %s",
ip, reason, interface, iface
)
ifaddresses = netifaces.ifaddresses(interface).get(af, ())
for q in ifaddresses:
if address == q['addr'].split('%')[0]:
if netmask not in q['netmask']:
self.conf.abort(
"Address %s (%s) is already on %s but with another netmask %s",
ip, reason, interface, q['netmask']
)
break
else:
call(['ip', 'addr', 'add', str(ip), 'dev', interface])
def checkAddress(self, ip, reason, interface=None):
interface = interface or getattr(self, 'ipv%d_interface' % ip.version)
flag = str(-ip.version)
_, result = call(['ip', flag, 'addr', 'show', interface])
cidr = str(ip)
for line in result.splitlines():
if cidr in line:
for state in ('tentative', 'dadfailed'):
if state in line:
call(['ip', 'addr', 'del', cidr, 'dev', interface])
self.conf.abort(
"Address %s (%s) on %s was in state %s so was removed: %r",
cidr, reason, interface, state, line
)
return
self.conf.abort(
"Address %s (%s) is unexpectedly not present on %s",
cidr, reason, interface
)
class Partition(object): class Partition(object):
reference : str reference : str
index: int index: int
...@@ -698,6 +520,184 @@ class Tap(object): ...@@ -698,6 +520,184 @@ class Tap(object):
call(['ip', '-6', 'route', 'add', network, 'dev', tap, 'via', cidr]) call(['ip', '-6', 'route', 'add', network, 'dev', tap, 'via', cidr])
class Interface(object):
ipv4_interface : str
ipv6_interface : str
ipv4_network : ipaddress.IPv4Network
ipv6_network : ipaddress.IPv6Network
tap_ipv4_network : ipaddress.IPv4Network
conf : FormatConfig
def __init__(self, conf):
self.conf = conf
self.ipv4_interface = conf.interface_name
self.ipv6_interface = conf.ipv6_interface or conf.interface_name
self.ipv4_network = self.getIPv4Network(conf.ipv4_local_network)
self.ipv6_network = self.getIPv6Network()
self.tap_ipv4_network = self.getTapIPv4Network(conf)
def getIPv4Network(self, cidr):
if cidr:
# XXX allow ipv4_local_network to be None ?
return ipaddress.IPv4Network(cidr, strict=False)
def getPartitionIPv4Addr(self, index):
network = self.ipv4_network
return ipaddress.IPv4Interface((network[index + 2], network.prefixlen))
def getTapIPv4Network(self, conf):
tap_gateway_interface = conf.tap_gateway_interface
if conf.create_tap and tap_gateway_interface:
try:
addresses = netifaces.ifaddresses(tap_gateway_interface)[AF_INET]
except ValueError:
self.conf.abort(
"Interface %s (intended as tap gateway interface) does not exist",
tap_gateway_interface
)
except KeyError:
self.conf.abort(
"%s must have at least one IPv4 address assigned",
tap_gateway_interface
)
result = None
for a in addresses:
address = a['addr'].split('%')[0]
netmask = a['netmask'].split('/')[-1]
ip = ipaddress.IPv4Interface('%s/%s' % (address, netmask))
network = ip.network
if not result or network.prefixlen < result.prefixlen:
result = network
return result
def getTapIPv4Gateway(self, _index):
network = self.tap_ipv4_network
if network:
return ipaddress.IPv4Interface((network[1], network.prefixlen))
def getTapIPv4Range(self, index):
network = self.tap_ipv4_network
if network:
prefixlen = conf.tap_ipv4_prefix
i = index + 2 if prefixlen == 32 else index + 1
k = 1 if prefixlen < 31 else 0
addr = network[(i << (32 - prefixlen)) + k]
return ipaddress.IPv4Interface((addr, prefixlen))
def getIPv6Network(self):
try:
addresses = netifaces.ifaddresses(self.ipv6_interface)[AF_INET6]
except KeyError:
self.conf.abort(
"%s must have at least one IPv6 address assigned",
self.ipv6_interface
)
result = None
for a in addresses:
address = a['addr'].split('%')[0]
netmask = a['netmask'].split('/')[-1]
ip = ipaddress.IPv6Interface('%s/%s' % (address, netmask))
network = ip.network
if network.is_global:
if not result or network.prefixlen < result.prefixlen:
result = network
return result
def getIPv6LinkLocalAddress(self, interface):
try:
addresses = netifaces.ifaddresses(interface)[AF_INET6]
except KeyError:
addresses = ()
for a in addresses:
address = a['addr'].split('%')[0]
netmask = a['netmask'].split('/')[-1]
ip = ipaddress.IPv6Interface('%s/%s' % (address, netmask))
if ip.is_link_local:
return ip
self.conf.abort(
"No link-local IPv6 address found on %s",
interface
)
def getComputerIPv6Addr(self):
network = self.ipv6_network
return ipaddress.IPv6Interface((network[1], network.prefixlen))
def getPartitionIPv6Addr(self, index):
network = self.ipv6_network
return ipaddress.IPv6Interface((network[index + 2], network.prefixlen))
def getPartitionIPv6Range(self, index):
network = self.ipv6_network
prefixlen = network.prefixlen + 16
if prefixlen > 128: # XXX move this check elsewhere
self.conf.abort("IPv6 network %s is too small for IPv6 ranges", network)
bits = 128 - network.prefixlen
addr = network[(1 << (bits - 2)) + (index << (128 - prefixlen))]
return ipaddress.IPv6Network((addr, prefixlen))
def getTapIPv6Range(self, index):
network = self.ipv6_network
prefixlen = network.prefixlen + 16
if prefixlen > 128: # XXX move this check elsewhere
self.conf.abort("IPv6 network %s is too small for IPv6 ranges", network)
bits = 128 - network.prefixlen
addr = network[(2 << (bits - 2)) + (index << (128 - prefixlen)) + 1]
return ipaddress.IPv6Interface((addr, prefixlen))
def enableIPv6NonLocalBind(self):
call(['sysctl', 'net.ipv6.ip_nonlocal_bind=1'])
network = str(self.ipv6_network)
_, result = call(['ip', '-6', 'route', 'show', 'table', 'local', network])
if not 'dev lo' in result:
call(['ip', '-6', 'route', 'add', 'local', network, 'dev', 'lo'])
def addAddress(self, ip, reason, interface=None):
interface = interface or getattr(self, 'ipv%d_interface' % ip.version)
af = {4: AF_INET, 6: AF_INET6}[ip.version]
address = str(ip.ip)
netmask = str(ip.network.netmask)
for iface in netifaces.interfaces():
if iface != interface:
ifaddresses = netifaces.ifaddresses(iface).get(af, ())
if address in (q['addr'].split('%')[0] for q in ifaddresses):
self.conf.abort(
"Cannot add address %s (%s) to %s because it already exists on %s",
ip, reason, interface, iface
)
ifaddresses = netifaces.ifaddresses(interface).get(af, ())
for q in ifaddresses:
if address == q['addr'].split('%')[0]:
if netmask not in q['netmask']:
self.conf.abort(
"Address %s (%s) is already on %s but with another netmask %s",
ip, reason, interface, q['netmask']
)
break
else:
call(['ip', 'addr', 'add', str(ip), 'dev', interface])
def checkAddress(self, ip, reason, interface=None):
interface = interface or getattr(self, 'ipv%d_interface' % ip.version)
flag = str(-ip.version)
_, result = call(['ip', flag, 'addr', 'show', interface])
cidr = str(ip)
for line in result.splitlines():
if cidr in line:
for state in ('tentative', 'dadfailed'):
if state in line:
call(['ip', 'addr', 'del', cidr, 'dev', interface])
self.conf.abort(
"Address %s (%s) on %s was in state %s so was removed: %r",
cidr, reason, interface, state, line
)
return
self.conf.abort(
"Address %s (%s) is unexpectedly not present on %s",
cidr, reason, interface
)
# Utilities # Utilities
def call(args, throw=True): def call(args, throw=True):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment