Commit 85e44d9a authored by Michael Tremer's avatar Michael Tremer

importer: Change download behaviour

The downloader used to open a connection to the web server hosting our
content which would have been decompressed (if necessary) on the fly and
also been parsed on the fly so that it could have been fed into the
database easily.

Some webservers do not seem to be patient enough to keep the connection
open if things take a little bit longer than usual. That caused the
import to fail.

This patch changes the behaviour that we would download all content
first, store it locally, and then start processing it.

Fixes: #12852
Signed-off-by: default avatarMichael Tremer <michael.tremer@ipfire.org>
Cc: Peter Müller <peter.mueller@ipfire.org>
parent aa23e03d
......@@ -19,6 +19,7 @@
import gzip
import logging
import tempfile
import urllib.request
# Initialise logging
......@@ -106,75 +107,76 @@ class Downloader(object):
log.info("Using proxy %s" % url)
self.proxy = url
def request(self, url, data=None, return_blocks=False):
def retrieve(self, url, data=None):
"""
This method will fetch the content at the given URL
and will return a file-object to a temporary file.
If the content was compressed, it will be decompressed on the fly.
"""
# Open a temporary file to buffer the downloaded content
t = tempfile.SpooledTemporaryFile(max_size=100 * 1024 * 1024)
# Create a new request
req = urllib.request.Request(url, data=data)
# Configure proxy
if self.proxy:
req.set_proxy(self.proxy, "http")
return DownloaderContext(self, req, return_blocks=return_blocks)
class DownloaderContext(object):
def __init__(self, downloader, request, return_blocks=False):
self.downloader = downloader
self.request = request
# Should we return one block or a single line?
self.return_blocks = return_blocks
# Save the response object
self.response = None
def __enter__(self):
log.info("Retrieving %s..." % self.request.full_url)
log.info("Retrieving %s..." % req.full_url)
# Send request
self.response = urllib.request.urlopen(self.request)
res = urllib.request.urlopen(req)
# Log the response headers
log.debug("Response Headers:")
for header in self.headers:
log.debug(" %s: %s" % (header, self.get_header(header)))
for header in res.headers:
log.debug(" %s: %s" % (header, res.headers[header]))
return self
# Write the payload to the temporary file
with res as f:
while True:
buf = f.read(65536)
if not buf:
break
def __exit__(self, type, value, traceback):
pass
t.write(buf)
def __iter__(self):
"""
Makes the object iterable by going through each block
"""
if self.return_blocks:
return iterate_over_blocks(self.body)
# Rewind the temporary file
t.seek(0)
return iterate_over_lines(self.body)
# Fetch the content type
content_type = res.headers.get("Content-Type")
@property
def headers(self):
if self.response:
return self.response.headers
# Decompress any gzipped response on the fly
if content_type in ("application/x-gzip", "application/gzip"):
t = gzip.GzipFile(fileobj=t, mode="rb")
def get_header(self, name):
if self.headers:
return self.headers.get(name)
# Return the temporary file handle
return t
@property
def body(self):
def request_blocks(self, url, data=None):
"""
Returns a file-like object with the decoded content
of the response.
This method will fetch the data from the URL and return an
iterator for each block in the data.
"""
content_type = self.get_header("Content-Type")
# Download the data first
t = self.retrieve(url, data=data)
# Decompress any gzipped response on the fly
if content_type in ("application/x-gzip", "application/gzip"):
return gzip.GzipFile(fileobj=self.response, mode="rb")
# Then, split it into blocks
return iterate_over_blocks(t)
def request_lines(self, url, data=None):
"""
This method will fetch the data from the URL and return an
iterator for each line in the data.
"""
# Download the data first
t = self.retrieve(url, data=data)
# Return the response by default
return self.response
# Then, split it into lines
return iterate_over_lines(t)
def read_blocks(f):
......
......@@ -435,9 +435,8 @@ class CLI(object):
for source_key in location.importer.WHOIS_SOURCES:
for single_url in location.importer.WHOIS_SOURCES[source_key]:
with downloader.request(single_url, return_blocks=True) as f:
for block in f:
self._parse_block(block, source_key, validcountries)
for block in downloader.request_blocks(single_url):
self._parse_block(block, source_key, validcountries)
# Process all parsed networks from every RIR we happen to have access to,
# insert the largest network chunks into the networks table immediately...
......@@ -518,9 +517,8 @@ class CLI(object):
for single_url in location.importer.EXTENDED_SOURCES[source_key]:
with self.db.transaction():
# Download data
with downloader.request(single_url) as f:
for line in f:
self._parse_line(line, source_key, validcountries)
for line in downloader.request_lines(single_url):
self._parse_line(line, source_key, validcountries)
# Download and import (technical) AS names from ARIN
self._import_as_names_from_arin()
......@@ -871,50 +869,46 @@ class CLI(object):
# technical, not intended for human consumption, as description fields in
# organisation handles for other RIRs are - however, this is what we have got,
# and in some cases, it might be still better than nothing)
with downloader.request("https://ftp.arin.net/info/asn.txt", return_blocks=False) as f:
for line in f:
# Convert binary line to string...
line = str(line)
# ... valid lines start with a space, followed by the number of the Autonomous System ...
if not line.startswith(" "):
continue
for line in downloader.request_lines("https://ftp.arin.net/info/asn.txt"):
# Valid lines start with a space, followed by the number of the Autonomous System ...
if not line.startswith(" "):
continue
# Split line and check if there is a valid ASN in it...
asn, name = line.split()[0:2]
# Split line and check if there is a valid ASN in it...
asn, name = line.split()[0:2]
try:
asn = int(asn)
except ValueError:
log.debug("Skipping ARIN AS names line not containing an integer for ASN")
continue
try:
asn = int(asn)
except ValueError:
log.debug("Skipping ARIN AS names line not containing an integer for ASN")
continue
# Filter invalid ASNs...
if not self._check_parsed_asn(asn):
continue
# Filter invalid ASNs...
if not self._check_parsed_asn(asn):
continue
# Skip any AS name that appears to be a placeholder for a different RIR or entity...
if re.match(r"^(ASN-BLK|)(AFCONC|AFRINIC|APNIC|ASNBLK|LACNIC|RIPE|IANA)(?:\d?$|\-)", name):
continue
# Skip any AS name that appears to be a placeholder for a different RIR or entity...
if re.match(r"^(ASN-BLK|)(AFCONC|AFRINIC|APNIC|ASNBLK|LACNIC|RIPE|IANA)(?:\d?$|\-)", name):
continue
# Bail out in case the AS name contains anything we do not expect here...
if re.search(r"[^a-zA-Z0-9-_]", name):
log.debug("Skipping ARIN AS name for %s containing invalid characters: %s" % \
(asn, name))
# Bail out in case the AS name contains anything we do not expect here...
if re.search(r"[^a-zA-Z0-9-_]", name):
log.debug("Skipping ARIN AS name for %s containing invalid characters: %s" % \
(asn, name))
# Things look good here, run INSERT statement and skip this one if we already have
# a (better?) name for this Autonomous System...
self.db.execute("""
INSERT INTO autnums(
number,
name,
source
) VALUES (%s, %s, %s)
ON CONFLICT (number) DO NOTHING""",
asn,
# Things look good here, run INSERT statement and skip this one if we already have
# a (better?) name for this Autonomous System...
self.db.execute("""
INSERT INTO autnums(
number,
name,
"ARIN",
)
source
) VALUES (%s, %s, %s)
ON CONFLICT (number) DO NOTHING""",
asn,
name,
"ARIN",
)
def handle_update_announcements(self, ns):
server = ns.server[0]
......@@ -1262,8 +1256,11 @@ class CLI(object):
downloader = location.importer.Downloader()
try:
with downloader.request("https://ip-ranges.amazonaws.com/ip-ranges.json", return_blocks=False) as f:
aws_ip_dump = json.load(f.body)
# Fetch IP ranges
f = downloader.retrieve("https://ip-ranges.amazonaws.com/ip-ranges.json")
# Parse downloaded file
aws_ip_dump = json.load(f)
except Exception as e:
log.error("unable to preprocess Amazon AWS IP ranges: %s" % e)
return
......@@ -1386,12 +1383,11 @@ class CLI(object):
]
for url in ip_urls:
try:
with downloader.request(url, return_blocks=False) as f:
fcontent = f.body.readlines()
except Exception as e:
log.error("Unable to download Spamhaus DROP URL %s: %s" % (url, e))
return
# Fetch IP list
f = downloader.retrieve(url)
# Split into lines
fcontent = f.readlines()
# Conduct a very basic sanity check to rule out CDN issues causing bogus DROP
# downloads.
......@@ -1408,7 +1404,6 @@ class CLI(object):
# the override table in case they are valid...
with self.db.transaction():
for sline in fcontent:
# The response is assumed to be encoded in UTF-8...
sline = sline.decode("utf-8")
......@@ -1443,18 +1438,13 @@ class CLI(object):
)
for url in asn_urls:
try:
with downloader.request(url, return_blocks=False) as f:
fcontent = f.body.readlines()
except Exception as e:
log.error("Unable to download Spamhaus DROP URL %s: %s" % (url, e))
return
# Fetch URL
f = downloader.retrieve(url)
# Iterate through every line, filter comments and add remaining ASNs to
# the override table in case they are valid...
with self.db.transaction():
for sline in fcontent:
for sline in t.readlines():
# The response is assumed to be encoded in UTF-8...
sline = sline.decode("utf-8")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment