Commit 9dc45792 authored by Jérome Perrin's avatar Jérome Perrin Committed by Léo-Paul Géneau

software/erp5/test: refactor to use managed resources

parent 1be22409
......@@ -51,6 +51,7 @@ setup(name=name,
'backports.lzma',
'cryptography',
'pyOpenSSL',
'typing; python_version<"3"',
],
test_suite='test',
)
from . import ERP5InstanceTestCase
from . import setUpModule
from slapos.testing.utils import findFreeTCPPort
from BaseHTTPServer import HTTPServer
from BaseHTTPServer import BaseHTTPRequestHandler
import OpenSSL.SSL
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.x509.oid import NameOID
import hashlib
import json
import multiprocessing
import logging
import os
import requests
import re
import shutil
import subprocess
import tempfile
import time
from BaseHTTPServer import BaseHTTPRequestHandler
from typing import Dict
import mock
import OpenSSL.SSL
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from slapos.testing.testcase import ManagedResource
from slapos.testing.utils import (CrontabMixin, ManagedHTTPServer,
findFreeTCPPort)
from . import ERP5InstanceTestCase, setUpModule
setUpModule # pyflakes
class TestHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = {
'Path': self.path,
'Incoming Headers': self.headers.dict
}
response = json.dumps(response, indent=2)
self.end_headers()
self.wfile.write(response)
class TestFrontendXForwardedFor(ERP5InstanceTestCase):
__partition_reference__ = 'xff'
http_server_process = None
frontend_caucase_dir = None
frontend_caucased_process = None
backend_caucase_dir = None
backend_caucased_process = None
class EchoHTTPServer(ManagedHTTPServer):
"""An HTTP Server responding with the request path and incoming headers,
encoded in json.
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': self.headers.dict
},
indent=2,
)
self.end_headers()
self.wfile.write(response)
@classmethod
def getInstanceSoftwareType(cls):
return 'balancer'
log_message = logging.getLogger(__name__ + '.HeaderEchoHandler').info
@classmethod
def setUpClass(cls):
# start a dummy web server echoing headers.
http_server_port = findFreeTCPPort(cls._ipv4_address)
server = HTTPServer(
(cls._ipv4_address, http_server_port),
TestHandler)
cls.http_server_process = multiprocessing.Process(
target=server.serve_forever, name='HTTPServer')
cls.http_server_process.start()
cls.http_server_netloc = '%s:%s' % (cls._ipv4_address, http_server_port)
# start a caucased and generate a valid client certificate.
cls.computer_partition_root_path = os.path.abspath(os.curdir)
cls.frontend_caucase_dir = tempfile.mkdtemp()
frontend_caucased_dir = os.path.join(cls.frontend_caucase_dir, 'caucased')
os.mkdir(frontend_caucased_dir)
frontend_user_dir = os.path.join(cls.frontend_caucase_dir, 'user')
os.mkdir(frontend_user_dir)
frontend_service_dir = os.path.join(cls.frontend_caucase_dir, 'service')
os.mkdir(frontend_service_dir)
frontend_caucased_netloc = '%s:%s' % (cls._ipv4_address, findFreeTCPPort(cls._ipv4_address))
cls.frontend_caucased_url = 'http://' + frontend_caucased_netloc
cls.user_certificate = frontend_user_key = os.path.join(frontend_user_dir, 'client.key.pem')
frontend_user_csr = os.path.join(frontend_user_dir, 'client.csr.pem')
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(frontend_user_key, 'wb') as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'user'),
])).sign(key, hashes.SHA256(), default_backend())
with open(frontend_user_csr, 'wb') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM))
class CaucaseService(ManagedResource):
"""A caucase service.
"""
url = None # type: str
directory = None # type: str
_caucased_process = None # type: subprocess.Popen
cls.software_release_root_path = os.path.join(
cls.slap._software_root,
hashlib.md5(cls.getSoftwareURL()).hexdigest(),
def open(self):
# type: () -> None
# start a caucased and server certificate.
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
caucased_path = os.path.join(cls.software_release_root_path, 'bin', 'caucased')
caucase_path = os.path.join(cls.software_release_root_path, 'bin', 'caucase')
cls.frontend_caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(frontend_caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(frontend_caucased_dir, 'server.key.pem'),
'--netloc', frontend_caucased_netloc,
'--service-auto-approve-count', '1',
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
caucased_path = os.path.join(software_release_root_path, 'bin', 'caucased')
self.directory = tempfile.mkdtemp()
caucased_dir = os.path.join(self.directory, 'caucased')
os.mkdir(caucased_dir)
os.mkdir(os.path.join(caucased_dir, 'user'))
os.mkdir(os.path.join(caucased_dir, 'service'))
backend_caucased_netloc = '%s:%s' % (self._cls._ipv4_address, findFreeTCPPort(self._cls._ipv4_address))
self.url = 'http://' + backend_caucased_netloc
self._caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(30):
try:
if requests.get(cls.frontend_caucased_url).status_code == 200:
if requests.get(self.url).status_code == 200:
break
except Exception:
pass
......@@ -118,173 +94,184 @@ class TestFrontendXForwardedFor(ERP5InstanceTestCase):
else:
raise RuntimeError('caucased failed to start.')
cau_args = [
caucase_path,
'--ca-url', cls.frontend_caucased_url,
'--ca-crt', os.path.join(frontend_user_dir, 'service-ca-crt.pem'),
'--crl', os.path.join(frontend_user_dir, 'service.crl'),
'--user-ca-crt', os.path.join(frontend_user_dir, 'user-ca-crt.pem'),
'--user-crl', os.path.join(frontend_user_dir, 'user.crl'),
]
def close(self):
# type: () -> None
self._caucased_process.terminate()
self._caucased_process.wait()
shutil.rmtree(self.directory)
cas_args = [
caucase_path,
'--ca-url', cls.frontend_caucased_url,
'--ca-crt', os.path.join(frontend_service_dir, 'service-ca-crt.pem'),
'--crl', os.path.join(frontend_service_dir, 'service.crl'),
'--user-ca-crt', os.path.join(frontend_service_dir, 'user-ca-crt.pem'),
'--user-crl', os.path.join(frontend_service_dir, 'user.crl'),
]
caucase_process = subprocess.Popen(
cau_args + [
'--mode', 'user',
'--send-csr', frontend_user_csr,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
result = caucase_process.communicate()
csr_id = result[0].split()[0]
class BalancerTestCase(ERP5InstanceTestCase):
subprocess.check_call(
cau_args + [
'--mode', 'user',
'--get-crt', csr_id, frontend_user_key,
],
@classmethod
def getInstanceSoftwareType(cls):
return 'balancer'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
return {
'tcpv4-port': 8000,
'computer-memory-percent-threshold': 100,
# XXX what is this ? should probably not be needed here
'name': cls.__name__,
'monitor-passwd': 'secret',
'apachedex-configuration': '--erp5-base +erp5 .*/VirtualHostRoot/erp5(/|\\?|$) --base +other / --skip-user-agent Zabbix --error-detail --js-embed --quiet',
'apachedex-promise-threshold': 100,
'haproxy-server-check-path': '/',
'zope-family-dict': {
'default': ['dummy_http_server'],
},
'dummy_http_server': [[cls.getManagedResource("backend_web_server", EchoHTTPServer).netloc, 1, False]],
'backend-path-dict': {
'default': '',
},
'ssl-authentication-dict': {},
'ssl': {
'caucase-url': cls.getManagedResource("caucase", CaucaseService).url,
}
}
@classmethod
def getInstanceParameterDict(cls):
# type: () -> Dict
return {'_': json.dumps(cls._getInstanceParameterDict())}
def setUp(self):
self.default_balancer_url = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])['default']
class CaucaseClientCertificate(ManagedResource):
"""A client certificate issued by a caucase services.
"""
ca_crt_file = None # type: str
crl_file = None # type: str
csr_file = None # type: str
cert_file = None # type: str
key_file = None # type: str
def open(self):
# type: () -> None
self.tmpdir = tempfile.mkdtemp()
self.ca_crt_file = os.path.join(self.tmpdir, 'ca-crt.pem')
self.crl_file = os.path.join(self.tmpdir, 'ca-crl.pem')
self.csr_file = os.path.join(self.tmpdir, 'csr.pem')
self.cert_file = os.path.join(self.tmpdir, 'crt.pem')
self.key_file = os.path.join(self.tmpdir, 'key.pem')
def close(self):
# type: () -> None
shutil.rmtree(self.tmpdir)
def request(self, common_name, caucase):
# type: (str, CaucaseService) -> None
"""Generate certificate and request signature to the caucase service.
This overwrite any previously requested certificate for this instance.
"""
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
caucase_path = os.path.join(software_release_root_path, 'bin', 'caucase')
cls.client_certificate = frontend_service_key = os.path.join(frontend_service_dir, 'crt.pem')
frontend_service_csr = os.path.join(frontend_service_dir, 'csr.pem')
cas_args = [
caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
]
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(frontend_service_key, 'wb') as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'service'),
])).sign(key, hashes.SHA256(), default_backend())
with open(frontend_service_csr, 'wb') as f:
with open(self.key_file, 'wb') as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(
NameOID.COMMON_NAME,
common_name,
),
])).sign(
key,
hashes.SHA256(),
default_backend(),
)
with open(self.csr_file, 'wb') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM))
caucase_process = subprocess.Popen(
csr_id = subprocess.check_output(
cas_args + [
'--send-csr', frontend_service_csr,
'--send-csr', self.csr_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
result = caucase_process.communicate()
csr_id = result[0].split()[0]
).split()[0]
assert csr_id
for _ in range(30):
if not subprocess.call(
cas_args + [
'--get-crt', csr_id, frontend_service_key,
'--get-crt', csr_id, self.cert_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) == 0:
break
else:
time.sleep(1)
else:
raise RuntimeError('getting service certificate failed.')
with open(self.cert_file) as f:
assert 'BEGIN CERTIFICATE' in f.read()
# start a caucased and server certificate.
cls.backend_caucase_dir = tempfile.mkdtemp()
backend_caucased_dir = os.path.join(cls.backend_caucase_dir, 'caucased')
os.mkdir(backend_caucased_dir)
backend_caucased_netloc = '%s:%s' % (cls._ipv4_address, findFreeTCPPort(cls._ipv4_address))
cls.backend_caucased_url = 'http://' + backend_caucased_netloc
cls.backend_caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(backend_caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(backend_caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(10):
try:
if requests.get(cls.backend_caucased_url).status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError('caucased failed to start.')
super(TestFrontendXForwardedFor, cls).setUpClass()
class TestFrontendXForwardedFor(BalancerTestCase):
__partition_reference__ = 'xff'
@classmethod
def getInstanceParameterDict(cls):
return {
'_': json.dumps({
'tcpv4-port': 3306,
'computer-memory-percent-threshold': 100,
# XXX what is this ? should probably not be needed here
'name': cls.__name__,
'monitor-passwd': 'secret',
'apachedex-configuration': '',
'apachedex-promise-threshold': 100,
'haproxy-server-check-path': '/',
'zope-family-dict': {
'default': ['dummy_http_server'],
'default-auth': ['dummy_http_server'],
},
'dummy_http_server': [[cls.http_server_netloc, 1, False]],
'backend-path-dict': {
'default': '/',
'default-auth': '/',
},
'ssl-authentication-dict': {
'default': False,
'default-auth': True,
},
'ssl': {
'caucase-url': cls.backend_caucased_url,
'frontend-caucase-url-list': [cls.frontend_caucased_url],
},
})
}
def _getInstanceParameterDict(cls):
# type: () -> Dict
frontend_caucase = cls.getManagedResource('frontend_caucase', CaucaseService)
certificate = cls.getManagedResource('client_certificate', CaucaseClientCertificate)
certificate.request(u'shared frontend', frontend_caucase)
@classmethod
def _cleanup(cls, snapshot_name):
if cls.http_server_process:
cls.http_server_process.terminate()
if cls.frontend_caucased_process:
cls.frontend_caucased_process.terminate()
if cls.frontend_caucase_dir:
shutil.rmtree(cls.frontend_caucase_dir)
if cls.backend_caucased_process:
cls.backend_caucased_process.terminate()
if cls.backend_caucase_dir:
shutil.rmtree(cls.backend_caucase_dir)
super(TestFrontendXForwardedFor, cls)._cleanup(snapshot_name)
parameter_dict = super(TestFrontendXForwardedFor, cls)._getInstanceParameterDict()
# add another "-auth" backend, that will have ssl-authentication enabled
parameter_dict['zope-family-dict']['default-auth'] = ['dummy_http_server']
parameter_dict['backend-path-dict']['default-auth'] = '/'
parameter_dict['ssl-authentication-dict'] = {
'default': False,
'default-auth': True,
}
parameter_dict['ssl']['frontend-caucase-url-list'] = [frontend_caucase.url]
return parameter_dict
def test_x_forwarded_for_added_when_verified_connection(self):
# type: () -> None
client_certificate = self.getManagedResource('client_certificate', CaucaseClientCertificate)
for backend in ('default', 'default-auth'):
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])[backend]
result = requests.get(
balancer_url,
headers={'X-Forwarded-For': '1.2.3.4'},
cert=self.client_certificate,
cert=(client_certificate.cert_file, client_certificate.key_file),
verify=False,
).json()
self.assertEqual(result['Incoming Headers'].get('x-forwarded-for').split(', ')[0], '1.2.3.4')
def test_x_forwarded_for_stripped_when_not_verified_connection(self):
# type: () -> None
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default']
result = requests.get(
balancer_url,
......
......@@ -316,3 +316,4 @@ funcsigs = 1.0.2
mysqlclient = 1.3.12
pexpect = 4.8.0
ptyprocess = 0.6.0
typing = 3.7.4.3
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment