Commit 87ebda84 authored by Vincent Pelletier's avatar Vincent Pelletier

wsgi: Rework request routing.

Prepares for CORS headers auto-generation.
parent b95aa3ea
...@@ -98,6 +98,24 @@ class Application(object): ...@@ -98,6 +98,24 @@ class Application(object):
""" """
self._cau = cau self._cau = cau
self._cas = cas self._cas = cas
self._context_dict = {
'cau': cau,
'cas': cas,
}
self._routing_dict = {
'crl': {
'GET': self.getCRL,
},
'csr': {
'GET': self.getCSR,
'PUT': self.putCSR,
'DELETE': self.deleteCSR,
},
'crt': {
'GET': self.getCRT,
'PUT': self.putCRT,
},
}
def __call__(self, environ, start_response): def __call__(self, environ, start_response):
""" """
...@@ -107,22 +125,26 @@ class Application(object): ...@@ -107,22 +125,26 @@ class Application(object):
try: # Convert non-wsgi exceptions into WSGI exceptions try: # Convert non-wsgi exceptions into WSGI exceptions
path_item_list = [x for x in environ['PATH_INFO'].split('/') if x] path_item_list = [x for x in environ['PATH_INFO'].split('/') if x]
try: try:
context_id, method_id = path_item_list[:2] context_id, base_path = path_item_list[:2]
except ValueError: except ValueError:
raise NotFound raise NotFound
if context_id == 'cau':
context = self._cau
elif context_id == 'cas':
context = self._cas
else:
raise NotFound
if method_id.startswith('_'):
raise NotFound
try: try:
method = getattr(self, method_id) context = self._context_dict[context_id]
except AttributeError: method_dict = self._routing_dict[base_path]
except KeyError:
raise NotFound raise NotFound
status, header_list, result = method(context, environ, path_item_list[2:]) request_method = environ['REQUEST_METHOD']
if request_method == 'OPTIONS':
status = STATUS_NO_CONTENT
header_list = [
]
result = []
else:
try:
method = method_dict[request_method]
except KeyError:
raise BadMethod
status, header_list, result = method(context, environ, path_item_list[2:])
except ApplicationError: except ApplicationError:
raise raise
except exceptions.NotFound: except exceptions.NotFound:
...@@ -208,14 +230,12 @@ class Application(object): ...@@ -208,14 +230,12 @@ class Application(object):
raise BadRequest('Invalid json') raise BadRequest('Invalid json')
@staticmethod @staticmethod
def crl(context, environ, subpath): def getCRL(context, environ, subpath):
""" """
Handle /{context}/crl urls. Handle GET /{context}/crl .
""" """
if subpath: if subpath:
raise NotFound raise NotFound
if environ['REQUEST_METHOD'] != 'GET':
raise BadMethod
data = context.getCertificateRevocationList() data = context.getCertificateRevocationList()
return ( return (
STATUS_OK, STATUS_OK,
...@@ -226,145 +246,153 @@ class Application(object): ...@@ -226,145 +246,153 @@ class Application(object):
[data], [data],
) )
def csr(self, context, environ, subpath): def getCSR(self, context, environ, subpath):
""" """
Handle /{context}/csr urls. Handle GET /{context}/csr/{csr_id} and GET /{context}/csr.
""" """
http_method = environ['REQUEST_METHOD'] if subpath:
if http_method == 'GET':
if subpath:
try:
csr_id, = subpath
except ValueError:
raise NotFound
try:
csr_id = int(csr_id)
except ValueError:
raise BadRequest('Invalid integer')
data = context.getCertificateSigningRequest(csr_id)
content_type = 'application/pkcs10'
else:
self._authenticate(environ)
data = json.dumps(context.getCertificateRequestList())
content_type = 'application/json'
return (
STATUS_OK,
[
('Content-Type', content_type),
('Content-Length', str(len(data))),
],
[data],
)
elif http_method == 'PUT':
if subpath:
raise NotFound
csr_id = context.appendCertificateSigningRequest(self._read(environ))
return (
STATUS_CREATED,
[
('Location', str(csr_id)),
],
[],
)
elif http_method == 'DELETE':
try: try:
csr_id, = subpath csr_id, = subpath
except ValueError: except ValueError:
raise NotFound raise NotFound
self._authenticate(environ)
try: try:
context.deletePendingCertificateSigningRequest(csr_id) csr_id = int(csr_id)
except exceptions.NotFound: except ValueError:
raise NotFound raise BadRequest('Invalid integer')
return (STATUS_NO_CONTENT, [], []) data = context.getCertificateSigningRequest(csr_id)
content_type = 'application/pkcs10'
else:
self._authenticate(environ)
data = json.dumps(context.getCertificateRequestList())
content_type = 'application/json'
return (
STATUS_OK,
[
('Content-Type', content_type),
('Content-Length', str(len(data))),
],
[data],
)
def putCSR(self, context, environ, subpath):
"""
Handle PUT /{context}/csr .
"""
if subpath:
raise NotFound
csr_id = context.appendCertificateSigningRequest(self._read(environ))
return (
STATUS_CREATED,
[
('Location', str(csr_id)),
],
[],
)
def deleteCSR(self, context, environ, subpath):
"""
Handle DELETE /{context}/csr/{csr_id} .
"""
try:
csr_id, = subpath
except ValueError:
raise NotFound
self._authenticate(environ)
try:
context.deletePendingCertificateSigningRequest(csr_id)
except exceptions.NotFound:
raise NotFound
return (STATUS_NO_CONTENT, [], [])
def getCRT(self, context, environ, subpath):
"""
Handle GET /{context}/crt/{crt_id} urls.
"""
try:
crt_id, = subpath
except ValueError:
raise NotFound
if crt_id == 'ca.crt.pem':
data = context.getCACertificate()
content_type = 'application/pkix-cert'
elif crt_id == 'ca.crt.json':
data = json.dumps(context.getValidCACertificateChain())
content_type = 'application/json'
else: else:
raise BadMethod try:
crt_id = int(crt_id)
except ValueError:
raise BadRequest('Invalid integer')
data = context.getCertificate(crt_id)
content_type = 'application/pkix-cert'
return (
STATUS_OK,
[
('Content-Type', content_type),
('Content-Length', str(len(data))),
],
[data],
)
def crt(self, context, environ, subpath): def putCRT(self, context, environ, subpath):
""" """
Handle /{context}/crt urls. Handle PUT /{context}/crt/{crt_id} urls.
""" """
http_method = environ['REQUEST_METHOD']
try: try:
crt_id, = subpath crt_id, = subpath
except ValueError: except ValueError:
raise NotFound raise NotFound
if http_method == 'GET': if crt_id == 'renew':
if crt_id == 'ca.crt.pem': payload = utils.unwrap(
data = context.getCACertificate() self._readJSON(environ),
content_type = 'application/pkix-cert' lambda x: x['crt_pem'],
elif crt_id == 'ca.crt.json': context.digest_list,
data = json.dumps(context.getValidCACertificateChain()) )
content_type = 'application/json' data = context.renew(
else: crt_pem=payload['crt_pem'].encode('ascii'),
try: csr_pem=payload['renew_csr_pem'].encode('ascii'),
crt_id = int(crt_id) )
except ValueError:
raise BadRequest('Invalid integer')
data = context.getCertificate(crt_id)
content_type = 'application/pkix-cert'
return ( return (
STATUS_OK, STATUS_OK,
[ [
('Content-Type', content_type), ('Content-Type', 'application/pkix-cert'),
('Content-Length', str(len(data))), ('Content-Length', str(len(data))),
], ],
[data], [data],
) )
elif http_method == 'PUT': elif crt_id == 'revoke':
if crt_id == 'renew': data = self._readJSON(environ)
if data['digest'] is None:
self._authenticate(environ)
payload = utils.nullUnwrap(data)
if 'revoke_crt_pem' not in payload:
context.revokeSerial(payload['revoke_serial'])
return (STATUS_NO_CONTENT, [], [])
else:
payload = utils.unwrap( payload = utils.unwrap(
self._readJSON(environ), data,
lambda x: x['crt_pem'], lambda x: x['revoke_crt_pem'],
context.digest_list, context.digest_list,
) )
data = context.renew( context.revoke(
crt_pem=payload['crt_pem'].encode('ascii'), crt_pem=payload['revoke_crt_pem'].encode('ascii'),
csr_pem=payload['renew_csr_pem'].encode('ascii'), )
) return (STATUS_NO_CONTENT, [], [])
return (
STATUS_OK,
[
('Content-Type', 'application/pkix-cert'),
('Content-Length', str(len(data))),
],
[data],
)
elif crt_id == 'revoke':
data = self._readJSON(environ)
if data['digest'] is None:
self._authenticate(environ)
payload = utils.nullUnwrap(data)
if 'revoke_crt_pem' not in payload:
context.revokeSerial(payload['revoke_serial'])
return (STATUS_NO_CONTENT, [], [])
else:
payload = utils.unwrap(
data,
lambda x: x['revoke_crt_pem'],
context.digest_list,
)
context.revoke(
crt_pem=payload['revoke_crt_pem'].encode('ascii'),
)
return (STATUS_NO_CONTENT, [], [])
else:
try:
crt_id = int(crt_id)
except ValueError:
raise BadRequest('Invalid integer')
body = self._read(environ)
if not body:
template_csr = None
elif environ.get('CONTENT_TYPE') == 'application/pkcs10':
template_csr = utils.load_certificate_request(body)
else:
raise BadRequest('Bad Content-Type')
self._authenticate(environ)
context.createCertificate(
csr_id=crt_id,
template_csr=template_csr,
)
return (STATUS_NO_CONTENT, [], [])
else: else:
raise BadMethod try:
crt_id = int(crt_id)
except ValueError:
raise BadRequest('Invalid integer')
body = self._read(environ)
if not body:
template_csr = None
elif environ.get('CONTENT_TYPE') == 'application/pkcs10':
template_csr = utils.load_certificate_request(body)
else:
raise BadRequest('Bad Content-Type')
self._authenticate(environ)
context.createCertificate(
csr_id=crt_id,
template_csr=template_csr,
)
return (STATUS_NO_CONTENT, [], [])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment