Commit a0466ad7 authored by Eric Zheng's avatar Eric Zheng Committed by Thomas Gambier

plugin/check_url_available: rewrite options

The check_url_available promise has been improved with the following
changes:

- The `http_code` parameter has been renamed `http-code` for consistency.
- The `check-secure` parameter has been removed. Users should specify
  `http-code = 401` instead.
- The `username` and `password` parameters have been added to optionally
  enable HTTP basic authentication.
- The logging output has been changed slightly.
parent 831b32a6
Pipeline #16707 passed with stage
in 0 seconds
"""
Some notable parameters:
promise-timeout:
Optional timeout (in seconds) for promise.
timeout:
Optional timeout (in seconds) for HTTP request.
verify, ca-cert-file, cert-file, key-file:
Optional SSL information. (See Python requests documentation.)
http-code:
(default 200) The expected response HTTP code.
ignore-code:
(default 0) If set to 1, ignore the response HTTP code.
username, password:
If supplied, enables basic HTTP authentication.
"""
from zope.interface import implementer from zope.interface import implementer
from slapos.grid.promise import interface from slapos.grid.promise import interface
from slapos.grid.promise.generic import GenericPromise from slapos.grid.promise.generic import GenericPromise
...@@ -14,7 +31,7 @@ class RunPromise(GenericPromise): ...@@ -14,7 +31,7 @@ class RunPromise(GenericPromise):
def sense(self): def sense(self):
""" """
Check if frontend URL is available Check if frontend URL is available.
""" """
url = self.getConfig('url') url = self.getConfig('url')
...@@ -22,12 +39,18 @@ class RunPromise(GenericPromise): ...@@ -22,12 +39,18 @@ class RunPromise(GenericPromise):
# and in the same time at least 1 second # and in the same time at least 1 second
default_timeout = max( default_timeout = max(
1, min(5, int(self.getConfig('promise-timeout', 20)) - 1)) 1, min(5, int(self.getConfig('promise-timeout', 20)) - 1))
timeout = int(self.getConfig('timeout', default_timeout)) expected_http_code = int(self.getConfig('http-code', 200))
expected_http_code = int(self.getConfig('http_code', '200'))
ca_cert_file = self.getConfig('ca-cert-file') ca_cert_file = self.getConfig('ca-cert-file')
cert_file = self.getConfig('cert-file') cert_file = self.getConfig('cert-file')
key_file = self.getConfig('key-file') key_file = self.getConfig('key-file')
verify = int(self.getConfig('verify', 0)) verify = int(self.getConfig('verify', 0))
username = self.getConfig('username')
password = self.getConfig('password')
if int(self.getConfig('ignore-code', 0)) == 1:
ignore_code = True
else:
ignore_code = False
if ca_cert_file: if ca_cert_file:
verify = ca_cert_file verify = ca_cert_file
...@@ -41,36 +64,54 @@ class RunPromise(GenericPromise): ...@@ -41,36 +64,54 @@ class RunPromise(GenericPromise):
else: else:
cert = None cert = None
if username and password:
credentials = (username, password)
request_type = "authenticated"
else:
credentials = None
request_type = "non-authenticated"
request_options = {
'allow_redirects': True,
'timeout': int(self.getConfig('timeout', default_timeout)),
'verify': verify,
'cert': cert,
'auth': credentials,
}
try: try:
result = requests.get( response = requests.get(url, **request_options)
url, verify=verify, allow_redirects=True, timeout=timeout, cert=cert)
except requests.exceptions.SSLError as e: except requests.exceptions.SSLError as e:
if 'certificate verify failed' in str(e): if 'certificate verify failed' in str(e):
self.logger.error( self.logger.error(
"ERROR SSL verify failed while accessing %r" % (url,)) "ERROR SSL verify failed while accessing %r", url)
else: else:
self.logger.error( self.logger.error(
"ERROR Unknown SSL error %r while accessing %r" % (e, url)) "ERROR Unknown SSL error %r while accessing %r", e, url)
return
except requests.ConnectionError as e: except requests.ConnectionError as e:
self.logger.error( self.logger.error(
"ERROR connection not possible while accessing %r" % (url, )) "ERROR connection not possible while accessing %r", url)
return
except Exception as e: except Exception as e:
self.logger.error("ERROR: %s" % (e,)) self.logger.error("ERROR: %s", e)
return
http_code = result.status_code
check_secure = int(self.getConfig('check-secure', 0))
ignore_code = int(self.getConfig('ignore-code', 0))
if http_code == 401 and check_secure == 1:
self.logger.info("%r is protected (returned %s)." % (url, http_code))
elif not ignore_code and http_code != expected_http_code:
self.logger.error("%r is not available (returned %s, expected %s)." % (
url, http_code, expected_http_code))
else: else:
self.logger.info("%r is available" % (url,)) # Log a sensible message, depending on the request/response
# parameters.
if ignore_code:
log = self.logger.info
result = "succeeded"
message = "return code ignored"
elif response.status_code == expected_http_code:
log = self.logger.info
result = "succeeded"
message = "returned expected code %d" % expected_http_code
else:
log = self.logger.error
result = "failed"
message = "returned %d, expected %d" % (response.status_code,
expected_http_code)
log("%s request to %r %s (%s)", request_type, url, result, message)
def anomaly(self): def anomaly(self):
return self._test(result_count=3, failure_amount=3) return self._test(result_count=3, failure_amount=3)
...@@ -37,6 +37,7 @@ from cryptography.hazmat.primitives import serialization ...@@ -37,6 +37,7 @@ from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID from cryptography.x509.oid import NameOID
from six.moves import BaseHTTPServer from six.moves import BaseHTTPServer
from base64 import b64encode
import datetime import datetime
import ipaddress import ipaddress
import json import json
...@@ -53,6 +54,11 @@ SLAPOS_TEST_IPV4 = os.environ.get('SLAPOS_TEST_IPV4', '127.0.0.1') ...@@ -53,6 +54,11 @@ SLAPOS_TEST_IPV4 = os.environ.get('SLAPOS_TEST_IPV4', '127.0.0.1')
SLAPOS_TEST_IPV4_PORT = 57965 SLAPOS_TEST_IPV4_PORT = 57965
HTTPS_ENDPOINT = "https://%s:%s/" % (SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV4_PORT) HTTPS_ENDPOINT = "https://%s:%s/" % (SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV4_PORT)
# Good and bad username/password for HTTP authentication tests.
TEST_GOOD_USERNAME = 'good username'
TEST_GOOD_PASSWORD = 'good password'
TEST_BAD_USERNAME = 'bad username'
TEST_BAD_PASSWORD = 'bad password'
def createKey(): def createKey():
key = rsa.generate_private_key( key = rsa.generate_private_key(
...@@ -134,7 +140,24 @@ class CertificateAuthority(object): ...@@ -134,7 +140,24 @@ class CertificateAuthority(object):
class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler): class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_GET(self): def do_GET(self):
"""
Respond to a GET request. You can configure the response as follows:
- Specify the response code in the URL, e.g. /200
- Optionally, use an underscore to specify a timeout (in seconds)
to wait before responding, e.g. /200_5
- Optionally, use an exclamation mark to require HTTP basic
authentication, using the credentials TEST_GOOD_USERNAME and
TEST_GOOD_PASSWORD defined at the top of this file, e.g. /!200
or /!200_5.
"""
path = self.path.split('/')[-1] path = self.path.split('/')[-1]
if path[0] == '!':
require_auth = True
path = path[1:]
else:
require_auth = False
if '_' in path: if '_' in path:
response, timeout = path.split('_') response, timeout = path.split('_')
response = int(response) response = int(response)
...@@ -143,7 +166,20 @@ class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler): ...@@ -143,7 +166,20 @@ class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
timeout = 0 timeout = 0
response = int(path) response = int(path)
key = b64encode(('%s:%s' % (TEST_GOOD_USERNAME,
TEST_GOOD_PASSWORD)).encode()).decode()
try:
authorization = self.headers['Authorization']
except KeyError:
authorization = None
time.sleep(timeout) time.sleep(timeout)
if require_auth and authorization != 'Basic ' + key:
self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic realm="test"')
self.end_headers()
self.wfile.write('bad credentials\n'.encode())
else:
self.send_response(response) self.send_response(response)
self.send_header("Content-type", "application/json") self.send_header("Content-type", "application/json")
...@@ -218,48 +254,28 @@ class CheckUrlAvailableMixin(TestPromisePluginMixin): ...@@ -218,48 +254,28 @@ class CheckUrlAvailableMixin(TestPromisePluginMixin):
TestPromisePluginMixin.setUp(self) TestPromisePluginMixin.setUp(self)
self.promise_name = "check-url-available.py" self.promise_name = "check-url-available.py"
self.base_content = """from slapos.promise.plugin.check_url_available import RunPromise self.success_template = \
("non-authenticated request to %r succeeded "
"(returned expected code %d)")
self.authenticated_success_template = \
("authenticated request to %r succeeded "
"(returned expected code %d)")
self.ignored_success_template = \
("non-authenticated request to %r succeeded "
"(return code ignored)")
self.authenticated_ignored_success_template = \
("authenticated request to %r succeeded "
"(return code ignored)")
def make_content(self, option_dict):
content = """from slapos.promise.plugin.check_url_available import RunPromise
extra_config_dict = { extra_config_dict = {
'url': '%(url)s',
'timeout': %(timeout)s,
'check-secure': %(check_secure)s,
'ignore-code': %(ignore_code)s,
}
""" """
for option in option_dict.items():
content += "\n '%s': %r," % option
self.base_content_verify = """from slapos.promise.plugin.check_url_available import RunPromise return content + "\n}"
extra_config_dict = {
'url': '%(url)s',
'timeout': %(timeout)s,
'check-secure': %(check_secure)s,
'ignore-code': %(ignore_code)s,
'verify': %(verify)s,
}
"""
self.base_content_ca_cert = """from slapos.promise.plugin.check_url_available import RunPromise
extra_config_dict = {
'url': '%(url)s',
'timeout': %(timeout)s,
'check-secure': %(check_secure)s,
'ignore-code': %(ignore_code)s,
'ca-cert-file': %(ca_cert_file)r,
}
"""
self.base_content_http_code = """from slapos.promise.plugin.check_url_available import RunPromise
extra_config_dict = {
'url': '%(url)s',
'timeout': %(timeout)s,
'check-secure': %(check_secure)s,
'ignore-code': %(ignore_code)s,
'http_code': %(http_code)s
}
"""
def tearDown(self): def tearDown(self):
TestPromisePluginMixin.tearDown(self) TestPromisePluginMixin.tearDown(self)
...@@ -268,12 +284,11 @@ extra_config_dict = { ...@@ -268,12 +284,11 @@ extra_config_dict = {
class TestCheckUrlAvailable(CheckUrlAvailableMixin): class TestCheckUrlAvailable(CheckUrlAvailableMixin):
def test_check_url_bad(self): def test_check_url_bad(self):
content = self.base_content % { content = self.make_content({
'url': 'https://', 'url': 'https://',
'timeout': 10, 'timeout': 10,
'check_secure': 0, 'ignore-code': 0,
'ignore_code': 0, })
}
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
with self.assertRaises(PromiseError): with self.assertRaises(PromiseError):
...@@ -287,12 +302,11 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -287,12 +302,11 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
) )
def test_check_url_malformed(self): def test_check_url_malformed(self):
content = self.base_content % { content = self.make_content({
'url': '', 'url': '',
'timeout': 10, 'timeout': 10,
'check_secure': 0, 'ignore-code': 0,
'ignore_code': 0, })
}
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
with self.assertRaises(PromiseError): with self.assertRaises(PromiseError):
...@@ -305,12 +319,11 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -305,12 +319,11 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
) )
def test_check_url_site_off(self): def test_check_url_site_off(self):
content = self.base_content % { content = self.make_content({
'url': 'https://localhost:56789/site', 'url': 'https://localhost:56789/site',
'timeout': 10, 'timeout': 10,
'check_secure': 0, 'ignore-code': 0,
'ignore_code': 0, })
}
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
with self.assertRaises(PromiseError): with self.assertRaises(PromiseError):
...@@ -325,12 +338,11 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -325,12 +338,11 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
def test_check_200(self): def test_check_200(self):
url = HTTPS_ENDPOINT + '200' url = HTTPS_ENDPOINT + '200'
content = self.base_content % { content = self.make_content({
'url': url, 'url': url,
'timeout': 10, 'timeout': 10,
'check_secure': 0, 'ignore-code': 0,
'ignore_code': 0, })
}
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
self.launcher.run() self.launcher.run()
...@@ -338,18 +350,17 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -338,18 +350,17 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
self.assertEqual(result['result']['failed'], False) self.assertEqual(result['result']['failed'], False)
self.assertEqual( self.assertEqual(
result['result']['message'], result['result']['message'],
"%r is available" % (url,) self.success_template % (url, 200)
) )
def test_check_200_verify(self): def test_check_200_verify(self):
url = HTTPS_ENDPOINT + '200' url = HTTPS_ENDPOINT + '200'
content = self.base_content_verify % { content = self.make_content({
'url': url, 'url': url,
'timeout': 10, 'timeout': 10,
'check_secure': 0, 'ignore-code': 0,
'ignore_code': 0,
'verify': 1, 'verify': 1,
} })
try: try:
old = os.environ.get('REQUESTS_CA_BUNDLE') old = os.environ.get('REQUESTS_CA_BUNDLE')
# simulate system provided CA bundle # simulate system provided CA bundle
...@@ -368,18 +379,17 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -368,18 +379,17 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
self.assertEqual(result['result']['failed'], False) self.assertEqual(result['result']['failed'], False)
self.assertEqual( self.assertEqual(
result['result']['message'], result['result']['message'],
"%r is available" % (url,) self.success_template % (url, 200)
) )
def test_check_200_verify_fail(self): def test_check_200_verify_fail(self):
url = HTTPS_ENDPOINT + '200' url = HTTPS_ENDPOINT + '200'
content = self.base_content_verify % { content = self.make_content({
'url': url, 'url': url,
'timeout': 10, 'timeout': 10,
'check_secure': 0, 'ignore-code': 0,
'ignore_code': 0,
'verify': 1, 'verify': 1,
} })
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
with self.assertRaises(PromiseError): with self.assertRaises(PromiseError):
...@@ -393,13 +403,12 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -393,13 +403,12 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
def test_check_200_verify_own(self): def test_check_200_verify_own(self):
url = HTTPS_ENDPOINT + '200' url = HTTPS_ENDPOINT + '200'
content = self.base_content_ca_cert % { content = self.make_content({
'url': url, 'url': url,
'timeout': 10, 'timeout': 10,
'check_secure': 0, 'ignore-code': 0,
'ignore_code': 0, 'ca-cert-file': self.test_server_ca_certificate_file.name
'ca_cert_file': self.test_server_ca_certificate_file.name })
}
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
self.launcher.run() self.launcher.run()
...@@ -407,17 +416,16 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -407,17 +416,16 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
self.assertEqual(result['result']['failed'], False) self.assertEqual(result['result']['failed'], False)
self.assertEqual( self.assertEqual(
result['result']['message'], result['result']['message'],
"%r is available" % (url,) self.success_template % (url, 200)
) )
def test_check_401(self): def test_check_401(self):
url = HTTPS_ENDPOINT + '401' url = HTTPS_ENDPOINT + '401'
content = self.base_content % { content = self.make_content({
'url': url, 'url': url,
'timeout': 10, 'timeout': 10,
'check_secure': 0, 'ignore-code': 0,
'ignore_code': 0, })
}
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
with self.assertRaises(PromiseError): with self.assertRaises(PromiseError):
...@@ -426,17 +434,17 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -426,17 +434,17 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
self.assertEqual(result['result']['failed'], True) self.assertEqual(result['result']['failed'], True)
self.assertEqual( self.assertEqual(
result['result']['message'], result['result']['message'],
"%r is not available (returned 401, expected 200)." % (url,) ("non-authenticated request to %r failed "
"(returned 401, expected 200)") % (url,)
) )
def test_check_401_ignore_code(self): def test_check_401_ignore_code(self):
url = HTTPS_ENDPOINT + '401' url = HTTPS_ENDPOINT + '401'
content = self.base_content % { content = self.make_content({
'url': url, 'url': url,
'timeout': 10, 'timeout': 10,
'check_secure': 0, 'ignore-code': 1,
'ignore_code': 1, })
}
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
self.launcher.run() self.launcher.run()
...@@ -444,17 +452,17 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -444,17 +452,17 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
self.assertEqual(result['result']['failed'], False) self.assertEqual(result['result']['failed'], False)
self.assertEqual( self.assertEqual(
result['result']['message'], result['result']['message'],
"%r is available" % (url,) self.ignored_success_template % (url,)
) )
def test_check_401_check_secure(self): def test_check_512_http_code(self):
url = HTTPS_ENDPOINT + '401' url = HTTPS_ENDPOINT + '512'
content = self.base_content % { content = self.make_content({
'url': url, 'url': url,
'timeout': 10, 'timeout': 10,
'check_secure': 1, 'ignore-code': 0,
'ignore_code': 0, 'http-code': 512,
} })
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
self.launcher.run() self.launcher.run()
...@@ -462,18 +470,57 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -462,18 +470,57 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
self.assertEqual(result['result']['failed'], False) self.assertEqual(result['result']['failed'], False)
self.assertEqual( self.assertEqual(
result['result']['message'], result['result']['message'],
"%r is protected (returned 401)." % (url,) self.success_template % (url, 512)
) )
def test_check_512_http_code(self): # Test bad HTTP code.
url = HTTPS_ENDPOINT + '512' def test_check_bad_http_code(self):
content = self.base_content_http_code % { url = HTTPS_ENDPOINT + '412'
content = self.make_content({
'url': url, 'url': url,
'timeout': 10, 'timeout': 10,
'check_secure': 0, 'ignore-code': 0,
'ignore_code': 0, 'http-code': 732,
'http_code': 512, })
} self.writePromise(self.promise_name, content)
self.configureLauncher()
with self.assertRaises(PromiseError):
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
self.assertEqual(
result['result']['message'],
("non-authenticated request to %r failed "
"(returned 412, expected 732)") % (url,)
)
# Test normal authentication success.
def test_check_authenticate_success(self):
url = HTTPS_ENDPOINT + '!200'
content = self.make_content({
'url': url,
'username': TEST_GOOD_USERNAME,
'password': TEST_GOOD_PASSWORD,
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(
result['result']['message'],
self.authenticated_success_template % (url, 200)
)
# Test that supplying a username/password still succeeds when the
# server doesn't require them.
def test_check_authenticate_no_password_needed(self):
url = HTTPS_ENDPOINT + '200'
content = self.make_content({
'url': url,
'username': TEST_GOOD_USERNAME,
'password': TEST_GOOD_PASSWORD,
})
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
self.launcher.run() self.launcher.run()
...@@ -481,19 +528,94 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin): ...@@ -481,19 +528,94 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
self.assertEqual(result['result']['failed'], False) self.assertEqual(result['result']['failed'], False)
self.assertEqual( self.assertEqual(
result['result']['message'], result['result']['message'],
"%r is available" % (url,) self.authenticated_success_template % (url, 200)
) )
# Test authentication failure due to bad password.
def test_check_authenticate_bad_password(self):
url = HTTPS_ENDPOINT + '!200'
content = self.make_content({
'url': url,
'username': TEST_BAD_USERNAME,
'password': TEST_BAD_PASSWORD,
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
with self.assertRaises(PromiseError):
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
self.assertEqual(
result['result']['message'],
("authenticated request to %r failed "
"(returned 401, expected 200)") % (url,)
)
# Test authentication failure due to no password being given to a
# protected server.
def test_check_authenticate_no_password_given(self):
url = HTTPS_ENDPOINT + '!200'
content = self.make_content({
'url': url,
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
with self.assertRaises(PromiseError):
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
self.assertEqual(
result['result']['message'],
("non-authenticated request to %r failed "
"(returned 401, expected 200)") % (url,)
)
# Test that authentication and HTTP code can be used together.
def test_check_authenticate_http_code(self):
url = HTTPS_ENDPOINT + '!412'
content = self.make_content({
'url': url,
'username': TEST_GOOD_USERNAME,
'password': TEST_GOOD_PASSWORD,
'http-code': 412
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(
result['result']['message'],
self.authenticated_success_template % (url, 412)
)
# Test that authentication and igore-code can be used together.
def test_check_authenticate_ignore_code(self):
url = HTTPS_ENDPOINT + '!404'
content = self.make_content({
'url': url,
'username': TEST_GOOD_USERNAME,
'password': TEST_GOOD_PASSWORD,
'ignore-code': 1
})
self.writePromise(self.promise_name, content)
self.configureLauncher()
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(
result['result']['message'],
self.authenticated_ignored_success_template % (url,)
)
class TestCheckUrlAvailableTimeout(CheckUrlAvailableMixin): class TestCheckUrlAvailableTimeout(CheckUrlAvailableMixin):
def test_check_200_timeout(self): def test_check_200_timeout(self):
url = HTTPS_ENDPOINT + '200_5' url = HTTPS_ENDPOINT + '200_5'
content = self.base_content % { content = self.make_content({
'url': url, 'url': url,
'timeout': 1, 'timeout': 1,
'check_secure': 0, 'ignore-code': 0,
'ignore_code': 0, })
}
self.writePromise(self.promise_name, content) self.writePromise(self.promise_name, content)
self.configureLauncher() self.configureLauncher()
with self.assertRaises(PromiseError): with self.assertRaises(PromiseError):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment