Commit 3ce28dea authored by Michael Tremer's avatar Michael Tremer

importer: Import each source individually

When importing source data fails for one source, we won't import
anything at all.

This patch changes that behaviour so that we will import all data one
after the other but continue if there was a problem with one source.
Signed-off-by: default avatarMichael Tremer <michael.tremer@ipfire.org>
parent c5092d41
......@@ -96,6 +96,9 @@ EXTENDED_SOURCES = {
# ],
}
# List all sources
SOURCES = set(WHOIS_SOURCES|EXTENDED_SOURCES)
class Downloader(object):
def __init__(self):
self.proxy = None
......
......@@ -26,6 +26,7 @@ import re
import socket
import sys
import telnetlib
import urllib.error
# Load our location module
import location
......@@ -403,125 +404,137 @@ class CLI(object):
def handle_update_whois(self, ns):
downloader = location.importer.Downloader()
# Download all sources
with self.db.transaction():
# Create some temporary tables to store parsed data
self.db.execute("""
CREATE TEMPORARY TABLE _autnums(number integer NOT NULL, organization text NOT NULL, source text NOT NULL)
ON COMMIT DROP;
CREATE UNIQUE INDEX _autnums_number ON _autnums(number);
CREATE TEMPORARY TABLE _organizations(handle text NOT NULL, name text NOT NULL, source text NOT NULL)
ON COMMIT DROP;
CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle);
CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text NOT NULL, original_countries text[] NOT NULL, source text NOT NULL)
ON COMMIT DROP;
CREATE INDEX _rirdata_search ON _rirdata USING BTREE(family(network), masklen(network));
CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network);
""")
# Remove all previously imported content
self.db.execute("""
TRUNCATE TABLE networks;
""")
# Fetch all valid country codes to check parsed networks aganist...
rows = self.db.query("SELECT * FROM countries ORDER BY country_code")
validcountries = []
for row in rows:
validcountries.append(row.country_code)
# Did we run successfully?
error = False
for source_key in location.importer.WHOIS_SOURCES:
for single_url in location.importer.WHOIS_SOURCES[source_key]:
for block in downloader.request_blocks(single_url):
self._parse_block(block, source_key, validcountries)
# Fetch all valid country codes to check parsed networks aganist
validcountries = self.countries
# Process all parsed networks from every RIR we happen to have access to,
# insert the largest network chunks into the networks table immediately...
families = self.db.query("SELECT DISTINCT family(network) AS family FROM _rirdata ORDER BY family(network)")
for family in (row.family for row in families):
smallest = self.db.get("SELECT MIN(masklen(network)) AS prefix FROM _rirdata WHERE family(network) = %s", family)
# Iterate over all potential sources
for source in location.importer.SOURCES:
with self.db.transaction():
# Create some temporary tables to store parsed data
self.db.execute("""
CREATE TEMPORARY TABLE _autnums(number integer NOT NULL,
organization text NOT NULL, source text NOT NULL) ON COMMIT DROP;
CREATE UNIQUE INDEX _autnums_number ON _autnums(number);
CREATE TEMPORARY TABLE _organizations(handle text NOT NULL,
name text NOT NULL, source text NOT NULL) ON COMMIT DROP;
CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle);
CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text NOT NULL,
original_countries text[] NOT NULL, source text NOT NULL)
ON COMMIT DROP;
CREATE INDEX _rirdata_search ON _rirdata
USING BTREE(family(network), masklen(network));
CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network);
""")
self.db.execute("INSERT INTO networks(network, country, original_countries, source) \
SELECT network, country, original_countries, source FROM _rirdata WHERE masklen(network) = %s AND family(network) = %s", smallest.prefix, family)
# Remove all previously imported content
self.db.execute("DELETE FROM networks WHERE source = %s", source)
# ... determine any other prefixes for this network family, ...
prefixes = self.db.query("SELECT DISTINCT masklen(network) AS prefix FROM _rirdata \
WHERE family(network) = %s ORDER BY masklen(network) ASC OFFSET 1", family)
try:
# Fetch WHOIS sources
for url in location.importer.WHOIS_SOURCES.get(source, []):
for block in downloader.request_blocks(url):
self._parse_block(block, source, validcountries)
# Fetch extended sources
for url in location.importer.EXTENDED_SOURCES.get(source, []):
for line in downloader.request_lines(url):
self._parse_line(line, source, validcountries)
except urllib.error.URLError as e:
log.error("Could not retrieve data from %s: %s" % (source, e)
error = True
# Continue with the next source
continue
# ... and insert networks with this prefix in case they provide additional
# information (i. e. subnet of a larger chunk with a different country)
for prefix in (row.prefix for row in prefixes):
self.db.execute("""
WITH candidates AS (
# Process all parsed networks from every RIR we happen to have access to,
# insert the largest network chunks into the networks table immediately...
families = self.db.query("SELECT DISTINCT family(network) AS family FROM _rirdata \
ORDER BY family(network)")
for family in (row.family for row in families):
smallest = self.db.get("SELECT MIN(masklen(network)) AS prefix FROM _rirdata \
WHERE family(network) = %s", family)
self.db.execute("INSERT INTO networks(network, country, original_countries, source) \
SELECT network, country, original_countries, source FROM _rirdata \
WHERE masklen(network) = %s AND family(network) = %s", smallest.prefix, family)
# ... determine any other prefixes for this network family, ...
prefixes = self.db.query("SELECT DISTINCT masklen(network) AS prefix FROM _rirdata \
WHERE family(network) = %s ORDER BY masklen(network) ASC OFFSET 1", family)
# ... and insert networks with this prefix in case they provide additional
# information (i. e. subnet of a larger chunk with a different country)
for prefix in (row.prefix for row in prefixes):
self.db.execute("""
WITH candidates AS (
SELECT
_rirdata.network,
_rirdata.country,
_rirdata.original_countries,
_rirdata.source
FROM
_rirdata
WHERE
family(_rirdata.network) = %s
AND
masklen(_rirdata.network) = %s
),
filtered AS (
SELECT
DISTINCT ON (c.network)
c.network,
c.country,
c.original_countries,
c.source,
masklen(networks.network),
networks.country AS parent_country
FROM
candidates c
LEFT JOIN
networks
ON
c.network << networks.network
ORDER BY
c.network,
masklen(networks.network) DESC NULLS LAST
)
INSERT INTO
networks(network, country, original_countries, source)
SELECT
_rirdata.network,
_rirdata.country,
_rirdata.original_countries,
_rirdata.source
network,
country,
original_countries,
source
FROM
_rirdata
filtered
WHERE
family(_rirdata.network) = %s
AND
masklen(_rirdata.network) = %s
),
filtered AS (
SELECT
DISTINCT ON (c.network)
c.network,
c.country,
c.original_countries,
c.source,
masklen(networks.network),
networks.country AS parent_country
FROM
candidates c
LEFT JOIN
networks
ON
c.network << networks.network
ORDER BY
c.network,
masklen(networks.network) DESC NULLS LAST
parent_country IS NULL
OR
country <> parent_country
ON CONFLICT DO NOTHING""",
family, prefix,
)
INSERT INTO
networks(network, country, original_countries, source)
SELECT
network,
country,
original_countries,
source
FROM
filtered
WHERE
parent_country IS NULL
OR
country <> parent_country
ON CONFLICT DO NOTHING""",
family, prefix,
)
self.db.execute("""
INSERT INTO autnums(number, name, source)
SELECT _autnums.number, _organizations.name, _organizations.source FROM _autnums
JOIN _organizations ON _autnums.organization = _organizations.handle
ON CONFLICT (number) DO UPDATE SET name = excluded.name;
""")
# Download all extended sources
for source_key in location.importer.EXTENDED_SOURCES:
for single_url in location.importer.EXTENDED_SOURCES[source_key]:
# Download data
for line in downloader.request_lines(single_url):
self._parse_line(line, source_key, validcountries)
self.db.execute("""
INSERT INTO autnums(number, name, source)
SELECT _autnums.number, _organizations.name, _organizations.source FROM _autnums
JOIN _organizations ON _autnums.organization = _organizations.handle
ON CONFLICT (number) DO UPDATE SET name = excluded.name;
""")
# Download and import (technical) AS names from ARIN
# Download and import (technical) AS names from ARIN
with self.db.transaction():
self._import_as_names_from_arin()
# Return a non-zero exit code for errors
return 1 if error else 0
def _check_parsed_network(self, network):
"""
Assistive function to detect and subsequently sort out parsed
......@@ -1501,6 +1514,14 @@ class CLI(object):
# Default to None
return None
@property
def countries(self):
# Fetch all valid country codes to check parsed networks aganist
rows = self.db.query("SELECT * FROM countries ORDER BY country_code")
# Return all countries
return [row.country_code for row in rows]
def handle_import_countries(self, ns):
with self.db.transaction():
# Drop all data that we have
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment