Commit 74f2a5c6 authored by Romain Courteaud's avatar Romain Courteaud

First tests

parent 16f75e94
import unittest
from urlchecker_db import LogDB
import urlchecker_http
from urlchecker_http import getUrlHostname, getUserAgent, request, logHttpStatus, checkHttpStatus
from urlchecker_status import logStatus
import httpretty
import mock
import peewee
class UrlCheckerStatusTestCase(unittest.TestCase):
def setUp(self):
self.db = LogDB(":memory:")
self.db.createTables()
################################################
# getUrlHostname
################################################
def test_getUrlHostname(self):
result = getUrlHostname("http://example.org/foo?bar=1")
assert result == "example.org"
################################################
# getUserAgent
################################################
def test_getUserAgent_default(self):
result = getUserAgent()
assert result == "URLCHECKER/0 (+https://lab.nexedi.com/romain/url-checker)"
def test_getUserAgent_default(self):
result = getUserAgent(None)
assert result == "URLCHECKER/0 (+https://lab.nexedi.com/romain/url-checker)"
def test_getUserAgent_default(self):
result = getUserAgent("0.0.3")
assert result == "URLCHECKER/0.0.3 (+https://lab.nexedi.com/romain/url-checker)"
################################################
# request
################################################
def test_request_arguments(self):
url_to_proxy = 'http://example.org/'
with mock.patch("urlchecker_http.requests.request") as mock_request:
response = request(
url_to_proxy
)
assert mock_request.call_count == 1
mock_request.assert_called_with('GET', url_to_proxy, allow_redirects=False,
headers={'Accept': 'text/html;q=0.9,*/*;q=0.8', 'User-Agent': 'URLCHECKER/0 (+https://lab.nexedi.com/romain/url-checker)'},
stream=False, timeout=2, verify=True)
@httpretty.activate
def test_request_defaultHeaders(self):
url_to_proxy = 'http://example.org/'
httpretty.register_uri(
httpretty.GET,
url_to_proxy,
status=418
)
response = request(
url_to_proxy
)
last_request = httpretty.last_request()
assert len(last_request.headers) == 5, last_request.headers.keys()
assert last_request.headers["Accept"] == "text/html;q=0.9,*/*;q=0.8"
assert last_request.headers["Accept-Encoding"] == "gzip, deflate"
assert last_request.headers["Connection"] == "keep-alive"
assert last_request.headers["Host"] == "example.org"
assert last_request.headers["User-Agent"] == "URLCHECKER/0 (+https://lab.nexedi.com/romain/url-checker)"
assert len(last_request.body) == 0
assert response.status_code == 418
@httpretty.activate
def test_request_customHeaders(self):
url_to_proxy = 'http://example.org/'
httpretty.register_uri(
httpretty.GET,
url_to_proxy,
)
request(
url_to_proxy,
headers={
'foo': 'bar',
'User-Agent': 'foouseragent',
'Accept': 'fooaccept'
}
)
last_request = httpretty.last_request()
assert len(last_request.headers) == 6, last_request.headers.keys()
assert last_request.headers["Accept"] == "fooaccept"
assert last_request.headers["Accept-Encoding"] == "gzip, deflate"
assert last_request.headers["Connection"] == "keep-alive"
assert last_request.headers["foo"] == "bar"
assert last_request.headers["Host"] == "example.org"
assert last_request.headers["User-Agent"] == "foouseragent"
assert len(last_request.body) == 0
def test_request_connectionError(self):
url_to_proxy = 'http://example.org/'
httpretty.register_uri(
httpretty.GET,
url_to_proxy
)
with mock.patch("urlchecker_http.requests.request") as mock_request:
def sideEffect(*args, **kw):
raise urlchecker_http.requests.exceptions.ConnectionError()
mock_request.side_effect = sideEffect
response = request(
url_to_proxy
)
assert mock_request.call_count == 1
assert response.status_code == 523, response.status_code
def test_request_timeout(self):
url_to_proxy = 'http://example.org/'
httpretty.register_uri(
httpretty.GET,
url_to_proxy
)
with mock.patch("urlchecker_http.requests.request") as mock_request:
def sideEffect(*args, **kw):
raise urlchecker_http.requests.exceptions.Timeout()
mock_request.side_effect = sideEffect
response = request(
url_to_proxy
)
assert mock_request.call_count == 1
assert response.status_code == 524, response.status_code
def test_request_tooManyRedirect(self):
url_to_proxy = 'http://example.org/'
httpretty.register_uri(
httpretty.GET,
url_to_proxy
)
with mock.patch("urlchecker_http.requests.request") as mock_request:
def sideEffect(*args, **kw):
raise urlchecker_http.requests.exceptions.TooManyRedirects()
mock_request.side_effect = sideEffect
response = request(
url_to_proxy
)
assert mock_request.call_count == 1
assert response.status_code == 520, response.status_code
def test_request_sslError(self):
url_to_proxy = 'http://example.org/'
httpretty.register_uri(
httpretty.GET,
url_to_proxy
)
with mock.patch("urlchecker_http.requests.request") as mock_request:
def sideEffect(*args, **kw):
raise urlchecker_http.requests.exceptions.SSLError()
mock_request.side_effect = sideEffect
response = request(
url_to_proxy
)
assert mock_request.call_count == 1
assert response.status_code == 526, response.status_code
################################################
# logHttpStatus
################################################
def test_logHttpStatus_insertFirst(self):
ip = "127.0.0.1"
url = "http://example.org"
status_code = 200
status_id = logStatus(self.db, "foo")
result = logHttpStatus(self.db, ip, url, status_code, status_id)
assert self.db.HttpCodeChange.select().count() == 1
assert self.db.HttpCodeChange.get().ip == ip
assert self.db.HttpCodeChange.get().url == url
assert self.db.HttpCodeChange.get().status_code == status_code
assert self.db.HttpCodeChange.get().status_id == status_id
def test_logHttpStatus_insertOnlyOnePerStatusIdIPUrl(self):
ip = "127.0.0.1"
url = "http://example.org"
status_code = 200
status_id = logStatus(self.db, "foo")
result = logHttpStatus(self.db, ip, url, status_code, status_id)
try:
logHttpStatus(self.db, ip, url, status_code + 1, status_id)
except peewee.IntegrityError:
assert self.db.HttpCodeChange.select().count() == 1
assert self.db.HttpCodeChange.get().status == result
else:
raise NotImplementedError('Expected IntegrityError')
def test_logHttpStatus_skipIdenticalPreviousValues(self):
ip = "127.0.0.1"
url = "http://example.org"
status_code = 200
status_id = logStatus(self.db, "foo")
result = logHttpStatus(self.db, ip, url, status_code, status_id)
status_id_2 = logStatus(self.db, "foo")
result_2 = logHttpStatus(self.db, ip, url, status_code, status_id_2)
assert result_2 == result
assert self.db.HttpCodeChange.select().count() == 1
assert self.db.HttpCodeChange.get().ip == ip
assert self.db.HttpCodeChange.get().url == url
assert self.db.HttpCodeChange.get().status_code == status_code
assert self.db.HttpCodeChange.get().status_id == status_id
def test_logHttpStatus_insertWhenDifferentStatusCode(self):
ip = "127.0.0.1"
url = "http://example.org"
status_code = 200
status_code_2 = status_code + 1
status_id = logStatus(self.db, "foo")
result = logHttpStatus(self.db, ip, url, status_code, status_id)
status_id_2 = logStatus(self.db, "foo")
result_2 = logHttpStatus(self.db, ip, url, status_code_2, status_id_2)
assert result_2 != result
assert self.db.HttpCodeChange.select().count() == 2
assert self.db.HttpCodeChange.get(self.db.HttpCodeChange.status == status_id).ip == ip
assert self.db.HttpCodeChange.get(self.db.HttpCodeChange.status == status_id).url == url
assert self.db.HttpCodeChange.get(self.db.HttpCodeChange.status == status_id).status_code == status_code
assert self.db.HttpCodeChange.get(self.db.HttpCodeChange.status == status_id_2).ip == ip
assert self.db.HttpCodeChange.get(self.db.HttpCodeChange.status == status_id_2).url == url
assert self.db.HttpCodeChange.get(self.db.HttpCodeChange.status == status_id_2).status_code == status_code_2
def test_logHttpStatus_insertDifferentUrl(self):
ip = "127.0.0.1"
ip_2 = ip + "2"
url = "http://example.org"
url_2 = url + "2"
status_code = 200
status_id = logStatus(self.db, "foo")
result = logHttpStatus(self.db, ip, url, status_code, status_id)
result_2 = logHttpStatus(self.db, ip_2, url, status_code, status_id)
result_3 = logHttpStatus(self.db, ip, url_2, status_code, status_id)
result_4 = logHttpStatus(self.db, ip_2, url_2, status_code, status_id)
assert self.db.HttpCodeChange.select().count() == 4
################################################
# checkHttpStatus
################################################
@httpretty.activate
def test_checkHttpStatus_http(self):
ip = "127.0.0.1"
url = "http://example.org/foo?bar=1"
bot_version = 1
httpretty.register_uri(
httpretty.GET,
"http://127.0.0.1/foo?bar=1",
status=418
)
status_id = logStatus(self.db, "foo")
checkHttpStatus(self.db, status_id, url, ip, bot_version)
last_request = httpretty.last_request()
assert len(last_request.headers) == 5, last_request.headers.keys()
assert last_request.headers["Accept"] == "text/html;q=0.9,*/*;q=0.8"
assert last_request.headers["Accept-Encoding"] == "gzip, deflate"
assert last_request.headers["Connection"] == "keep-alive"
assert last_request.headers["Host"] == "example.org"
assert last_request.headers["User-Agent"] == "URLCHECKER/1 (+https://lab.nexedi.com/romain/url-checker)"
assert len(last_request.body) == 0
assert self.db.HttpCodeChange.select().count() == 1
assert self.db.HttpCodeChange.get().ip == ip
assert self.db.HttpCodeChange.get().url == url
assert self.db.HttpCodeChange.get().status_code == 418
assert self.db.HttpCodeChange.get().status_id == status_id
def test_checkHttpStatus_https(self):
ip = "127.0.0.1"
url = "https://example.org/foo?bar=1"
bot_version = 2
status_id = logStatus(self.db, "foo")
with mock.patch("urlchecker_http.request") as mock_request:
checkHttpStatus(self.db, status_id, url, ip, bot_version)
assert mock_request.call_count == 1
assert mock_request.call_args.args == ('https://example.org/foo?bar=1',)
assert len(mock_request.call_args.kwargs) == 3, mock_request.call_args.kwargs
assert mock_request.call_args.kwargs['headers'] == {'Host': 'example.org'}
assert mock_request.call_args.kwargs['session'] is not None
assert mock_request.call_args.kwargs['version'] == 2
assert self.db.HttpCodeChange.select().count() == 1
assert self.db.HttpCodeChange.get().ip == ip
assert self.db.HttpCodeChange.get().url == url
# XXX No idea how to mock SSL
assert self.db.HttpCodeChange.get().status_code == 1
assert self.db.HttpCodeChange.get().status_id == status_id
def test_checkHttpStatus_relativeUrl(self):
ip = "127.0.0.1"
url = "foo?bar=1"
bot_version = 1
status_id = logStatus(self.db, "foo")
try:
checkHttpStatus(self.db, status_id, url, ip, bot_version)
except NotImplementedError as err:
assert str(err) == 'Unhandled url: foo?bar=1'
else:
raise NotImplementedError('Expected NotImplementedError')
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(UrlCheckerStatusTestCase))
return suite
if __name__ == "__main__":
unittest.main(defaultTest="suite")
import unittest
from urlchecker_db import LogDB
from urlchecker_status import logStatus
class UrlCheckerStatusTestCase(unittest.TestCase):
def setUp(self):
self.db = LogDB(":memory:")
self.db.createTables()
def test_logStatus_insert(self):
result = logStatus(self.db, "foo")
assert self.db.Status.select().count() == 1
assert self.db.Status.get(self.db.Status.text == "foo").id == result
def test_logStatus_insertTwice(self):
result1 = logStatus(self.db, "foo")
result2 = logStatus(self.db, "foo")
assert self.db.Status.select().count() == 2
assert result1 < result2
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(UrlCheckerStatusTestCase))
return suite
if __name__ == "__main__":
unittest.main(defaultTest="suite")
...@@ -6,7 +6,7 @@ from playhouse.sqlite_ext import SqliteExtDatabase ...@@ -6,7 +6,7 @@ from playhouse.sqlite_ext import SqliteExtDatabase
class LogDB: class LogDB:
def __init__(self, sqlite_path): def __init__(self, sqlite_path):
self._db = SqliteExtDatabase( self._db = SqliteExtDatabase(
sqlite_path, pragmas=(("journal_mode", "WAL"),) sqlite_path, pragmas=(("journal_mode", "WAL"), ("foreign_keys", 1))
) )
self._db.connect() self._db.connect()
...@@ -62,6 +62,8 @@ class LogDB: ...@@ -62,6 +62,8 @@ class LogDB:
ip = peewee.TextField(index=True) ip = peewee.TextField(index=True)
url = peewee.TextField(index=True) url = peewee.TextField(index=True)
status_code = peewee.IntegerField() status_code = peewee.IntegerField()
class Meta:
primary_key = peewee.CompositeKey("status", "ip", "url")
self.Status = Status self.Status = Status
self.ConfigurationChange = ConfigurationChange self.ConfigurationChange = ConfigurationChange
......
...@@ -113,6 +113,6 @@ def getServerIpDict(db, status_id, resolver_dict, domain_list, rdtype): ...@@ -113,6 +113,6 @@ def getServerIpDict(db, status_id, resolver_dict, domain_list, rdtype):
if address not in server_ip_dict: if address not in server_ip_dict:
server_ip_dict[address] = [] server_ip_dict[address] = []
if domain_text not in server_ip_dict[address]: if domain_text not in server_ip_dict[address]:
# Do not duplicate the domain # Do not duplicate the domain
server_ip_dict[address].append(domain_text) server_ip_dict[address].append(domain_text)
return server_ip_dict return server_ip_dict
...@@ -13,7 +13,7 @@ def getUrlHostname(url): ...@@ -13,7 +13,7 @@ def getUrlHostname(url):
return urlparse(url).hostname return urlparse(url).hostname
def getUserAgent(self, version="0"): def getUserAgent(version):
return "%s/%s (+%s)" % ( return "%s/%s (+%s)" % (
"URLCHECKER", "URLCHECKER",
version, version,
...@@ -22,16 +22,10 @@ def getUserAgent(self, version="0"): ...@@ -22,16 +22,10 @@ def getUserAgent(self, version="0"):
def request( def request(
method,
url, url,
headers=None, headers=None,
stream=False,
timeout=TIMEOUT,
allow_redirects=False,
verify=True,
session=requests, session=requests,
version=None, version=0
**kwargs,
): ):
if headers is None: if headers is None:
...@@ -42,11 +36,12 @@ def request( ...@@ -42,11 +36,12 @@ def request(
# XXX user agent # XXX user agent
headers["User-Agent"] = getUserAgent(version) headers["User-Agent"] = getUserAgent(version)
kwargs["stream"] = stream kwargs = {}
kwargs["timeout"] = timeout kwargs["stream"] = False
kwargs["allow_redirects"] = allow_redirects kwargs["timeout"] = TIMEOUT
kwargs["verify"] = verify kwargs["allow_redirects"] = False
args = [method, url] kwargs["verify"] = True
args = ["GET", url]
kwargs["headers"] = headers kwargs["headers"] = headers
...@@ -89,33 +84,34 @@ def logHttpStatus(db, ip, url, code, status_id): ...@@ -89,33 +84,34 @@ def logHttpStatus(db, ip, url, code, status_id):
previous_entry = db.HttpCodeChange.create( previous_entry = db.HttpCodeChange.create(
status=status_id, ip=ip, url=url, status_code=code status=status_id, ip=ip, url=url, status_code=code
) )
return previous_entry.id return previous_entry.status
def checkHttpStatus(db, status_id, url, ip, bot_version): def checkHttpStatus(db, status_id, url, ip, bot_version):
parsed_url = urlparse(url) parsed_url = urlparse(url)
hostname = parsed_url.hostname hostname = parsed_url.hostname
request_kw = {}
session = requests.Session()
# SNI Support # SNI Support
if parsed_url.scheme == "https": if parsed_url.scheme == "https":
# Provide SNI support # Provide SNI support
base_url = urlunsplit( base_url = urlunsplit(
(parsed_url.scheme, parsed_url.netloc, "", "", "") (parsed_url.scheme, parsed_url.netloc, "", "", "")
) )
session = requests.Session()
session.mount(base_url, ForcedIPHTTPSAdapter(dest_ip=ip)) session.mount(base_url, ForcedIPHTTPSAdapter(dest_ip=ip))
request_kw['session'] = session
ip_url = url ip_url = url
elif parsed_url.scheme == "http": elif parsed_url.scheme == "http":
# Force IP location # Force IP location
parsed_url = parsed_url._replace(netloc=ip) parsed_url = parsed_url._replace(netloc=ip)
ip_url = parsed_url.geturl() ip_url = parsed_url.geturl()
else:
raise NotImplementedError('Unhandled url: %s' % url)
response = request( response = request(
"GET",
ip_url, ip_url,
headers={"Host": hostname}, headers={"Host": hostname},
session=session,
version=bot_version, version=bot_version,
**request_kw
) )
logHttpStatus(db, ip, url, response.status_code, status_id) logHttpStatus(db, ip, url, response.status_code, status_id)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment