all: Finalise python3 support.
Basically, wrap stdout and stderr whenever they do not have an encoding with an ascii-encoding writer, and write unicode to stdout & stderr. wsgi.errors is defined in the reference implementation as being a StringIO, so follow that. Stop using argparse.FileType to get rid of python3 "file not closed" errors. Also, fix setup access to CHANGES.txt . Also, fix 2to3 involvement. Also, replace test.captureStdout with extra tool arguments.
Showing
This diff is collapsed.
... | ... | @@ -25,7 +25,6 @@ Test suite |
""" | ||
# pylint: disable=too-many-lines, too-many-public-methods | ||
from __future__ import absolute_import | ||
import contextlib | ||
from Cookie import SimpleCookie | ||
import datetime | ||
import errno | ||
... | ... | @@ -305,25 +304,14 @@ def print_buffer_on_error(func): |
try: | ||
return func(self, *args, **kw) | ||
except Exception: # pragma: no cover | ||
sys.stdout.write(utils.toBytes(os.linesep)) | ||
sys.stdout.write(self.caucase_test_output.getvalue()) | ||
stdout = utils.toUnicodeWritableStream(sys.stdout) | ||
stdout.write(os.linesep) | ||
stdout.write( | ||
self.caucase_test_output.getvalue().decode('ascii', 'replace'), | ||
) | ||
raise | ||
return wrapper | ||
@contextlib.contextmanager | ||
def captureStdout(): | ||
""" | ||
Replace stdout with a BytesIO object for the duration of the context manager, | ||
and provide it to caller. | ||
""" | ||
orig_stdout = sys.stdout | ||
sys.stdout = stdout = BytesIO() | ||
try: | ||
yield stdout | ||
finally: | ||
sys.stdout = orig_stdout | ||
|
||
@unittest.skipIf(sys.version_info >= (3, ), 'Caucase currently supports python 2 only') | ||
class CaucaseTest(unittest.TestCase): | ||
""" | ||
Test a complete caucase setup: spawn a caucase-http server on CAUCASE_NETLOC | ||
... | ... | @@ -356,6 +344,9 @@ class CaucaseTest(unittest.TestCase): |
self._server_backup_path = os.path.join(server_dir, 'backup') | ||
self._server_cors_store = os.path.join(server_dir, 'cors.key') | ||
# pylint: enable=bad-whitespace | ||
# Using a BytesIO for caucased output here, because stdout/stderr do not | ||
# necessarily have a known encoding, for example when output is a pipe | ||
# (to a file, ...). caucased must deal with this. | ||
self.caucase_test_output = BytesIO() | ||
os.mkdir(self._server_backup_path) | ||
... | ... | @@ -497,7 +488,7 @@ class CaucaseTest(unittest.TestCase): |
) | ||
def _getClientCRL(self): | ||
with open(self._client_crl) as crl_pem_file: | ||
with open(self._client_crl, 'rb') as crl_pem_file: | ||
return x509.load_pem_x509_crl( | ||
crl_pem_file.read(), | ||
_cryptography_backend | ||
... | ... | @@ -612,20 +603,24 @@ class CaucaseTest(unittest.TestCase): |
Returns stdout. | ||
""" | ||
with captureStdout() as stdout: | ||
try: | ||
cli.main( | ||
argv=( | ||
'--ca-url', self._caucase_url, | ||
'--ca-crt', self._client_ca_crt, | ||
'--user-ca-crt', self._client_user_ca_crt, | ||
'--crl', self._client_crl, | ||
'--user-crl', self._client_user_crl, | ||
) + argv, | ||
) | ||
except SystemExit: | ||
pass | ||
return stdout.getvalue() | ||
# Using a BytesIO for caucased output here, because stdout/stderr do not | ||
# necessarily have a known encoding, for example when output is a pipe | ||
# (to a file, ...). caucase must deal with this. | ||
stdout = BytesIO() | ||
try: | ||
cli.main( | ||
argv=( | ||
'--ca-url', self._caucase_url, | ||
'--ca-crt', self._client_ca_crt, | ||
'--user-ca-crt', self._client_user_ca_crt, | ||
'--crl', self._client_crl, | ||
'--user-crl', self._client_user_crl, | ||
) + argv, | ||
stdout=stdout, | ||
) | ||
except SystemExit: | ||
pass | ||
return stdout.getvalue().decode('ascii') | ||
@staticmethod | ||
def _setCertificateRemainingLifeTime(key, crt, delta): | ||
... | ... | @@ -1676,7 +1671,10 @@ class CaucaseTest(unittest.TestCase): |
""" | ||
Non-standard shorthand for invoking the WSGI application. | ||
""" | ||
environ.setdefault('wsgi.errors', self.caucase_test_output) | ||
environ.setdefault( | ||
'wsgi.errors', | ||
utils.toUnicodeWritableStream(self.caucase_test_output), | ||
) | ||
environ.setdefault('wsgi.url_scheme', 'http') | ||
environ.setdefault('SERVER_NAME', server_name) | ||
environ.setdefault('SERVER_PORT', str(server_http_port)) | ||
... | ... | @@ -2294,29 +2292,31 @@ class CaucaseTest(unittest.TestCase): |
os.unlink(self._server_db) | ||
os.unlink(self._server_key) | ||
with captureStdout() as stdout: | ||
cli.key_id([ | ||
'--private-key', user_key_path, user2_key_path, user2_new_key_path, | ||
]) | ||
stdout = BytesIO() | ||
cli.key_id( | ||
['--private-key', user_key_path, user2_key_path, user2_new_key_path], | ||
stdout=stdout, | ||
) | ||
key_id_dict = dict( | ||
line.rsplit(' ', 1) | ||
line.decode('ascii').rsplit(' ', 1) | ||
for line in stdout.getvalue().splitlines() | ||
) | ||
key_id = key_id_dict.pop(user_key_path) | ||
key2_id = key_id_dict.pop(user2_key_path) | ||
new_key2_id = key_id_dict.pop(user2_new_key_path) | ||
self.assertFalse(key_id_dict) | ||
with captureStdout() as stdout: | ||
cli.key_id([ | ||
'--backup', backup_path, | ||
]) | ||
stdout = BytesIO() | ||
cli.key_id( | ||
['--backup', backup_path], | ||
stdout=stdout, | ||
) | ||
self.assertItemsEqual( | ||
[ | ||
backup_path, | ||
' ' + key_id, | ||
' ' + key2_id, | ||
], | ||
stdout.getvalue().splitlines(), | ||
stdout.getvalue().decode('ascii').splitlines(), | ||
) | ||
try: | ||
... | ... | @@ -2410,17 +2410,18 @@ class CaucaseTest(unittest.TestCase): |
if not backup_path_list: # pragma: no cover | ||
raise AssertionError('Backup file not created after 1 second') | ||
backup_path, = glob.glob(backup_glob) | ||
with captureStdout() as stdout: | ||
cli.key_id([ | ||
'--backup', backup_path, | ||
]) | ||
stdout = BytesIO() | ||
cli.key_id( | ||
['--backup', backup_path], | ||
stdout=stdout, | ||
) | ||
self.assertItemsEqual( | ||
[ | ||
backup_path, | ||
' ' + key_id, | ||
' ' + new_key2_id, | ||
], | ||
stdout.getvalue().splitlines(), | ||
stdout.getvalue().decode('ascii').splitlines(), | ||
) | ||
# Now, push a lot of data to exercise chunked checksum in backup & | ||
... | ... | @@ -2444,17 +2445,18 @@ class CaucaseTest(unittest.TestCase): |
if not backup_path_list: # pragma: no cover | ||
raise AssertionError('Backup file took too long to be created') | ||
backup_path, = glob.glob(backup_glob) | ||
with captureStdout() as stdout: | ||
cli.key_id([ | ||
'--backup', backup_path, | ||
]) | ||
stdout = BytesIO() | ||
cli.key_id( | ||
['--backup', backup_path], | ||
stdout=stdout, | ||
) | ||
self.assertItemsEqual( | ||
[ | ||
backup_path, | ||
' ' + key_id, | ||
' ' + new_key2_id, | ||
], | ||
stdout.getvalue().splitlines(), | ||
stdout.getvalue().decode('ascii').splitlines(), | ||
) | ||
self._stopServer() | ||
os.unlink(self._server_db) | ||
... | ... | @@ -2510,23 +2512,24 @@ class CaucaseTest(unittest.TestCase): |
self.assertTrue(os.path.exists(exported_ca), exported_ca) | ||
server_db2 = self._server_db + '2' | ||
self.assertFalse(os.path.exists(server_db2), server_db2) | ||
with captureStdout() as stdout: | ||
caucase.http.manage( | ||
argv=( | ||
'--db', server_db2, | ||
'--import-ca', exported_ca, | ||
'--import-crl', self._client_crl, | ||
# Twice, for code coverage... | ||
'--import-crl', self._client_crl, | ||
), | ||
) | ||
stdout = BytesIO() | ||
caucase.http.manage( | ||
argv=( | ||
'--db', server_db2, | ||
'--import-ca', exported_ca, | ||
'--import-crl', self._client_crl, | ||
# Twice, for code coverage... | ||
'--import-crl', self._client_crl, | ||
), | ||
stdout=stdout, | ||
) | ||
self.assertTrue(os.path.exists(server_db2), server_db2) | ||
self.assertEqual( | ||
[ | ||
'Imported 1 CA certificates', | ||
'Revoked 1 certificates (1 were already revoked)', | ||
], | ||
stdout.getvalue().splitlines(), | ||
stdout.getvalue().decode('ascii').splitlines(), | ||
) | ||
finally: | ||
caucase.http.getBytePass = getBytePass_orig | ||
... | ... | @@ -2729,7 +2732,7 @@ class CaucaseTest(unittest.TestCase): |
until_network_issue = UntilEvent(network_issue_event) | ||
# pylint: disable=protected-access | ||
cli.RetryingCaucaseClient._until = until_network_issue | ||
cli.RetryingCaucaseClient._log_file = self.caucase_test_output | ||
cli.RetryingCaucaseClient._log_file = StringIO() | ||
# pylint: enable=protected-access | ||
until_network_issue.action = ON_EVENT_EXPIRE | ||
original_HTTPConnection = cli.RetryingCaucaseClient.HTTPConnection | ||
... | ... |