Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
U
url-checker
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Analytics
Analytics
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Commits
Open sidebar
Romain Courteaud
url-checker
Commits
f8287237
Commit
f8287237
authored
Oct 18, 2019
by
Romain Courteaud
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Check http status
parent
8c8fef11
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
129 additions
and
183 deletions
+129
-183
urlchecker_bot.py
urlchecker_bot.py
+18
-5
urlchecker_db.py
urlchecker_db.py
+8
-0
urlchecker_http.py
urlchecker_http.py
+103
-178
No files found.
urlchecker_bot.py
View file @
f8287237
...
...
@@ -4,7 +4,7 @@ from urlchecker_configuration import createConfiguration, logConfiguration
from
urlchecker_platform
import
logPlatform
from
urlchecker_status
import
logStatus
from
urlchecker_dns
import
getResolverDict
,
expandDomainList
,
getServerIpDict
from
urlchecker_http
import
getUrlHostname
from
urlchecker_http
import
getUrlHostname
,
checkHttpStatus
from
urlchecker_network
import
isTcpPortOpen
...
...
@@ -48,6 +48,7 @@ class WebBot:
print
(
domain_list
)
# Get the list of server to check
# XXX Check DNS expiration
server_ip_dict
=
getServerIpDict
(
self
.
_db
,
status_id
,
resolver_dict
,
domain_list
,
"A"
)
...
...
@@ -55,14 +56,26 @@ class WebBot:
# Check TCP port for the list of IP found
# XXX For now, check http/https only
server_ip_list
=
[
x
for
x
in
server_ip_dict
.
keys
()]
url_dict
=
{}
for
server_ip
in
server_ip_list
:
isTcpPortOpen
(
self
.
_db
,
server_ip
,
80
,
status_id
)
isTcpPortOpen
(
self
.
_db
,
server_ip
,
443
,
status_id
)
print
(
server_ip_dict
)
# XXX Check SSL certificate expiration
for
port
,
protocol
in
[(
80
,
"http"
),
(
443
,
"https"
)]:
if
isTcpPortOpen
(
self
.
_db
,
server_ip
,
port
,
status_id
):
for
hostname
in
server_ip_dict
[
server_ip
]:
url
=
"%s://%s"
%
(
protocol
,
hostname
)
if
url
not
in
url_dict
:
url_dict
[
url
]
=
[]
url_dict
[
url
].
append
(
server_ip
)
# XXX
If https ok, check SSL certificate
# XXX
put back orignal url list
# Check HTTP Status
for
url
in
url_dict
:
for
ip
in
url_dict
[
url
]:
checkHttpStatus
(
self
.
_db
,
status_id
,
url
,
ip
,
__version__
)
# XXX Check location header and check new url recursively
# XXX Parse HTML, fetch found link, css, js, image
# XXX Check HTTP Cache
def
stop
(
self
):
self
.
_running
=
False
...
...
urlchecker_db.py
View file @
f8287237
...
...
@@ -57,11 +57,18 @@ class LogDB:
rdtype
=
peewee
.
TextField
()
response
=
peewee
.
TextField
()
class
HttpCodeChange
(
BaseModel
):
status
=
peewee
.
ForeignKeyField
(
Status
)
ip
=
peewee
.
TextField
(
index
=
True
)
url
=
peewee
.
TextField
(
index
=
True
)
status_code
=
peewee
.
IntegerField
()
self
.
Status
=
Status
self
.
ConfigurationChange
=
ConfigurationChange
self
.
PlatformChange
=
PlatformChange
self
.
NetworkChange
=
NetworkChange
self
.
DnsChange
=
DnsChange
self
.
HttpCodeChange
=
HttpCodeChange
def
createTables
(
self
):
# http://www.sqlite.org/pragma.html#pragma_user_version
...
...
@@ -73,6 +80,7 @@ class LogDB:
[
self
.
Status
,
self
.
ConfigurationChange
,
self
.
HttpCodeChange
,
self
.
NetworkChange
,
self
.
PlatformChange
,
self
.
DnsChange
,
...
...
urlchecker_http.py
View file @
f8287237
from
urllib.parse
import
urlparse
,
urlunsplit
import
requests
import
socke
t
from
urllib.parse
import
urlparse
,
urlunspli
t
import
sys
import
traceback
import
time
from
forcediphttpsadapter.adapters
import
ForcedIPHTTPSAdapter
import
dns.resolver
import
miniupnpc
import
platform
from
urlchecker_db
import
LogDB
import
configparser
import
os
__version__
=
"0.0.3"
PREFERRED_TYPE
=
"text/html"
TIMEOUT
=
2
CONFIG_SECTION
=
"URLCHECKER"
class
BotError
(
Exception
):
pass
def
getUrlHostname
(
url
):
return
urlparse
(
url
).
hostname
def
getUserAgent
(
self
,
version
=
"0"
):
return
"%s/%s (+%s)"
%
(
"URLCHECKER"
,
version
,
"https://lab.nexedi.com/romain/url-checker"
,
)
def
request
(
method
,
url
,
headers
=
None
,
stream
=
False
,
timeout
=
TIMEOUT
,
allow_redirects
=
False
,
verify
=
True
,
session
=
requests
,
version
=
None
,
**
kwargs
,
):
if
headers
is
None
:
headers
=
{}
if
"Accept"
not
in
headers
:
headers
[
"Accept"
]
=
"%s;q=0.9,*/*;q=0.8"
%
PREFERRED_TYPE
if
"User-Agent"
not
in
headers
:
# XXX user agent
headers
[
"User-Agent"
]
=
getUserAgent
(
version
)
kwargs
[
"stream"
]
=
stream
kwargs
[
"timeout"
]
=
timeout
kwargs
[
"allow_redirects"
]
=
allow_redirects
kwargs
[
"verify"
]
=
verify
args
=
[
method
,
url
]
kwargs
[
"headers"
]
=
headers
try
:
response
=
session
.
request
(
*
args
,
**
kwargs
)
except
requests
.
exceptions
.
SSLError
:
# XXX Enter into unknown host
response
=
requests
.
models
.
Response
()
response
.
status_code
=
526
except
requests
.
exceptions
.
ConnectionError
:
response
=
requests
.
models
.
Response
()
response
.
status_code
=
523
except
requests
.
exceptions
.
Timeout
:
response
=
requests
.
models
.
Response
()
response
.
status_code
=
524
except
requests
.
exceptions
.
TooManyRedirects
:
response
=
requests
.
models
.
Response
()
response
.
status_code
=
520
except
:
# XXX Put it in body
print
(
traceback
.
print_exception
(
*
(
sys
.
exc_info
())))
response
=
requests
.
models
.
Response
()
response
.
status_code
=
0
return
response
def
logHttpStatus
(
db
,
ip
,
url
,
code
,
status_id
):
with
db
.
_db
.
atomic
():
try
:
# Check previous parameter value
previous_entry
=
(
db
.
HttpCodeChange
.
select
()
.
where
(
db
.
HttpCodeChange
.
ip
==
ip
,
db
.
HttpCodeChange
.
url
==
url
)
.
order_by
(
db
.
HttpCodeChange
.
status
.
desc
())
.
get
()
)
except
db
.
HttpCodeChange
.
DoesNotExist
:
previous_entry
=
None
if
(
previous_entry
is
None
)
or
(
previous_entry
.
status_code
!=
code
):
previous_entry
=
db
.
HttpCodeChange
.
create
(
status
=
status_id
,
ip
=
ip
,
url
=
url
,
status_code
=
code
)
return
previous_entry
.
id
class
WebBot
:
def
__init__
(
self
):
self
.
config
=
configparser
.
ConfigParser
(
empty_lines_in_values
=
False
)
self
.
config
[
CONFIG_SECTION
]
=
{
"INTERVAL"
:
-
1
}
def
initDB
(
self
,
sqlite_path
):
self
.
_db
=
LogDB
(
sqlite_path
)
self
.
_db
.
createTables
()
def
checkHttpStatus
(
db
,
status_id
,
url
,
ip
,
bot_version
):
parsed_url
=
urlparse
(
url
)
hostname
=
parsed_url
.
hostname
def
getUserAgent
(
self
):
return
"%s/%s (+%s)"
%
(
"URLCHECKER"
,
__version__
,
"https://lab.nexedi.com/romain/url-checker"
,
session
=
requests
.
Session
()
# SNI Support
if
parsed_url
.
scheme
==
"https"
:
# Provide SNI support
base_url
=
urlunsplit
(
(
parsed_url
.
scheme
,
parsed_url
.
netloc
,
""
,
""
,
""
)
)
session
.
mount
(
base_url
,
ForcedIPHTTPSAdapter
(
dest_ip
=
ip
))
def
request
(
self
,
method
,
response
=
request
(
"GET"
,
url
,
headers
=
None
,
stream
=
False
,
timeout
=
TIMEOUT
,
allow_redirects
=
False
,
verify
=
True
,
session
=
requests
,
**
kwargs
,
):
if
headers
is
None
:
headers
=
{}
if
"Accept"
not
in
headers
:
headers
[
"Accept"
]
=
"%s;q=0.9,*/*;q=0.8"
%
PREFERRED_TYPE
if
"User-Agent"
not
in
headers
:
# XXX user agent
headers
[
"User-Agent"
]
=
self
.
getUserAgent
()
kwargs
[
"stream"
]
=
stream
kwargs
[
"timeout"
]
=
timeout
kwargs
[
"allow_redirects"
]
=
allow_redirects
kwargs
[
"verify"
]
=
verify
args
=
[
method
,
url
]
kwargs
[
"headers"
]
=
headers
try
:
response
=
session
.
request
(
*
args
,
**
kwargs
)
except
requests
.
exceptions
.
SSLError
:
# XXX Enter into unknown host
response
=
requests
.
models
.
Response
()
response
.
status_code
=
526
except
requests
.
exceptions
.
ConnectionError
:
response
=
requests
.
models
.
Response
()
response
.
status_code
=
523
except
requests
.
exceptions
.
Timeout
:
response
=
requests
.
models
.
Response
()
response
.
status_code
=
524
except
requests
.
exceptions
.
TooManyRedirects
:
response
=
requests
.
models
.
Response
()
response
.
status_code
=
520
except
:
# XXX Put it in body
print
(
traceback
.
print_exception
(
*
(
sys
.
exc_info
())))
response
=
requests
.
models
.
Response
()
response
.
status_code
=
0
return
response
def
check
(
self
,
url
):
parsed_url
=
urlparse
(
url
)
# response = self.request("GET", url)
# print(url, response.status_code)
# Get the list of available IPv4 frontend CDN
hostname
=
parsed_url
.
hostname
try
:
dns_info_list
=
socket
.
getaddrinfo
(
hostname
,
"http"
,
socket
.
AF_INET
)
except
socket
.
gaierror
:
dns_info_list
=
[]
ip_list
=
[
x
[
4
][
0
]
for
x
in
dns_info_list
]
for
ip
in
ip_list
:
session
=
requests
.
Session
()
# SNI Support
if
parsed_url
.
scheme
==
"https"
:
# Provide SNI support
base_url
=
urlunsplit
(
(
parsed_url
.
scheme
,
parsed_url
.
netloc
,
""
,
""
,
""
)
)
session
.
mount
(
base_url
,
ForcedIPHTTPSAdapter
(
dest_ip
=
ip
))
response
=
self
.
request
(
"GET"
,
url
,
headers
=
{
"Host"
:
hostname
},
session
=
session
)
self
.
_db
.
storeQuery
(
ip
,
url
,
response
.
status_code
)
def
iterateLoop
(
self
):
for
url
in
self
.
config
[
CONFIG_SECTION
][
"URL"
].
split
():
self
.
check
(
url
)
def
stop
(
self
):
print
(
"Bye bye"
)
print
(
time
.
strftime
(
"%Y-%m-%d %H:%M:%S"
))
self
.
_running
=
False
if
hasattr
(
self
,
"_db"
):
self
.
_db
.
close
()
def
run
(
self
):
print
(
time
.
strftime
(
"%Y-%m-%d %H:%M:%S"
))
self
.
initDB
(
self
.
config
[
CONFIG_SECTION
][
"SQLITE"
])
self
.
_db
.
storeEntry
(
platform
=
platform
.
platform
())
print
(
"Platform"
,
platform
.
platform
())
print
(
"Python"
,
platform
.
python_build
(),
platform
.
python_compiler
(),
platform
.
python_branch
(),
platform
.
python_implementation
(),
platform
.
python_revision
(),
platform
.
python_version
(),
)
print
(
"Hostname"
,
socket
.
gethostname
())
myresolver
=
dns
.
resolver
.
Resolver
()
print
(
"Resolvers"
,
myresolver
.
nameservers
)
u
=
miniupnpc
.
UPnP
()
u
.
discoverdelay
=
200
u
.
discover
()
try
:
u
.
selectigd
()
print
(
"external ip: {}"
.
format
(
u
.
externalipaddress
()))
except
Exception
:
pass
self
.
_running
=
True
try
:
while
self
.
_running
:
self
.
iterateLoop
()
interval
=
self
.
config
.
getint
(
CONFIG_SECTION
,
"INTERVAL"
)
if
interval
<
0
:
self
.
stop
()
else
:
time
.
sleep
(
interval
)
except
KeyboardInterrupt
:
self
.
stop
()
except
:
self
.
stop
()
print
(
"Oups, error"
)
raise
def
create_bot
(
envvar
=
"URLCHECKER_SETTINGS"
,
cfgfile
=
None
,
mapping
=
None
):
bot
=
WebBot
()
if
(
envvar
is
not
None
)
and
(
envvar
in
os
.
environ
):
bot
.
config
.
read
([
os
.
environ
.
get
(
envvar
)])
if
cfgfile
is
not
None
:
print
(
cfgfile
)
bot
.
config
.
read
([
cfgfile
])
if
mapping
is
not
None
:
bot
.
config
.
read_dict
({
CONFIG_SECTION
:
mapping
})
for
parameter
in
[
"URL"
,
"SQLITE"
]:
if
parameter
not
in
bot
.
config
[
CONFIG_SECTION
]:
raise
AttributeError
(
"Config %s not defined"
%
parameter
)
return
bot
headers
=
{
"Host"
:
hostname
},
session
=
session
,
version
=
bot_version
,
)
logHttpStatus
(
db
,
ip
,
url
,
response
.
status_code
,
status_id
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment