Commit d14ab1cb authored by Vincent Pelletier's avatar Vincent Pelletier

test: Test cli.rerequest and cli.updater.

parent c5634e69
...@@ -142,6 +142,67 @@ def readApp(environ, start_response): ...@@ -142,6 +142,67 @@ def readApp(environ, start_response):
start_response('200 OK', []) start_response('200 OK', [])
return [environ['wsgi.input'].read()] return [environ['wsgi.input'].read()]
ON_EVENT_RAISE = object()
ON_EVENT_EXPIRE = object()
class UntilEvent(object):
"""
Like utils.until, but with a threading.Event replacing
KeyboardInterrupt as the early interruption source.
"""
def __init__(self, event):
"""
event (Event)
Event to wait upon. Set and cleared outside this class.
"""
self._event = event
self._action = ON_EVENT_RAISE
self._wait_event = threading.Event()
self._wait_event.clear()
@property
def action(self):
"""
Retrieve current action. Not very useful.
"""
return self._action
@action.setter
def action(self, value):
"""
Change the action which will happen on next event wakeup.
"""
if value not in (ON_EVENT_RAISE, ON_EVENT_EXPIRE):
raise ValueError
self._action = value
def wait(self, timeout=10, clear=False):
"""
Wait for event to be waited upon at least once.
timeout (float)
Maximum number of seconds to wait for.
clear (bool)
Whether to reset this flag once reached.
"""
result = self._wait_event.wait(timeout)
if result and clear:
self._wait_event.clear()
return result
def __call__(self, deadline):
"""
Sleep until deadline is reached or self._event is set.
Raises utils.SleepInterrupt if self._event is set.
"""
now = datetime.datetime.utcnow()
while now < deadline:
self._wait_event.set()
assert self._event.wait()
if self._action is ON_EVENT_EXPIRE:
now = deadline
else:
raise utils.SleepInterrupt
return now
class CaucaseTest(unittest.TestCase): class CaucaseTest(unittest.TestCase):
""" """
Test a complete caucase setup: spawn a caucase-http server on CAUCASE_NETLOC Test a complete caucase setup: spawn a caucase-http server on CAUCASE_NETLOC
...@@ -224,18 +285,6 @@ class CaucaseTest(unittest.TestCase): ...@@ -224,18 +285,6 @@ class CaucaseTest(unittest.TestCase):
return 1 return 1
return 0 return 0
def _serverUntil(self, deadline):
"""
Sleep until deadline is reached or self._server_event is set.
Raises utils.SleepInterrupt if self._server_event is set.
"""
now = datetime.datetime.utcnow()
while now < deadline:
if self._server_event.wait((deadline - now).total_seconds()):
raise utils.SleepInterrupt
now = datetime.datetime.utcnow()
return now
def _startServer(self, *argv): def _startServer(self, *argv):
""" """
Start caucased server Start caucased server
...@@ -251,7 +300,7 @@ class CaucaseTest(unittest.TestCase): ...@@ -251,7 +300,7 @@ class CaucaseTest(unittest.TestCase):
#'--threshold', '31', #'--threshold', '31',
#'--key-len', '2048', #'--key-len', '2048',
) + argv, ) + argv,
'until': self._serverUntil, 'until': UntilEvent(self._server_event),
} }
) )
server.daemon = True server.daemon = True
...@@ -1754,5 +1803,88 @@ class CaucaseTest(unittest.TestCase): ...@@ -1754,5 +1803,88 @@ class CaucaseTest(unittest.TestCase):
'123456789abcd\r\nef0', '123456789abcd\r\nef0',
) )
def testUpdater(self):
"""
Test updater basic functionality.
Scenario is:
- a CSR is generated somewhere, corresponding private key can be discarded
- CSR is sent to a remote machine, where deployment is happening and
a service needs a certificate
- on the machine where deployment happens, a new CSR + private key pair
is produced, using transmitted CSR as a template, with
"caucase-rerequest"
- the new CSR is submitted to caucase using "caucase-updater"
"""
basename = self._getBaseName()
key_path = self._createPrivateKey(basename)
csr_path = self._createBasicCSR(basename, key_path)
re_key_path = key_path + '2'
re_csr_path = csr_path + '2'
re_crt_path = re_csr_path + '.crt'
self.assertFalse(os.path.exists(re_key_path))
self.assertFalse(os.path.exists(re_csr_path))
cli.rerequest(
argv=(
'--csr', re_csr_path,
'--key', re_key_path,
'--template', csr_path,
),
)
self.assertTrue(os.path.exists(re_key_path))
self.assertTrue(os.path.exists(re_csr_path))
user_key_path = self._createFirstUser()
self.assertEqual(
[],
self._runClient(
'--user-key', user_key_path,
'--list-csr',
).splitlines()[2:-1],
)
self.assertFalse(os.path.exists(re_crt_path))
updater_event = threading.Event()
until_updater = UntilEvent(updater_event)
updater_thread = threading.Thread(
target=cli.updater,
kwargs={
'argv': (
'--ca-url', self._caucase_url,
'--cas-ca', self._client_ca_crt,
'--key', re_key_path,
'--csr', re_csr_path,
'--crt', re_crt_path,
'--ca', self._client_ca_crt,
'--crl', self._client_crl,
),
'until': until_updater,
},
)
updater_thread.daemon = True
updater_thread.start()
try:
self.assertTrue(until_updater.wait(clear=True))
# CSR must have been submitted
csr_line_list = self._runClient(
'--user-key', user_key_path,
'--list-csr',
).splitlines()[2:-1]
self.assertEqual(1, len(csr_line_list), csr_line_list)
# Sign it
self._runClient(
'--user-key', user_key_path,
'--sign-csr', csr_line_list[0].split(None, 1)[0],
)
# Wake updater
until_updater.action = ON_EVENT_EXPIRE
updater_event.set()
# Wait for it to call "until()" again
self.assertTrue(until_updater.wait())
# It must have retrieved the certificate now.
self.assertTrue(os.path.exists(re_crt_path))
finally:
until_updater.action = ON_EVENT_RAISE
updater_event.set()
updater_thread.join(2)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment