Commit 1f2862fb authored by Alain Takoudjou's avatar Alain Takoudjou

Add certificate Authority web tool

parent 8ba67db8
...@@ -48,6 +48,8 @@ setup(name=name, ...@@ -48,6 +48,8 @@ setup(name=name,
'erp5.util', 'erp5.util',
'PyRSS2Gen', 'PyRSS2Gen',
'dnspython', 'dnspython',
'pyOpenSSL', # manage ssl certificates
'requests', # http requests
] + additional_install_requires, ] + additional_install_requires,
extras_require = { extras_require = {
'lampconfigure': ["mysqlclient"], #needed for MySQL Database access 'lampconfigure': ["mysqlclient"], #needed for MySQL Database access
...@@ -66,6 +68,9 @@ setup(name=name, ...@@ -66,6 +68,9 @@ setup(name=name,
'agent = slapos.agent.agent:main', 'agent = slapos.agent.agent:main',
'check-web-page-http-cache-hit = slapos.promise.check_web_page_http_cache_hit:main', 'check-web-page-http-cache-hit = slapos.promise.check_web_page_http_cache_hit:main',
'check-feed-as-promise = slapos.checkfeedaspromise:main', 'check-feed-as-promise = slapos.checkfeedaspromise:main',
'certificate_authority = slapos.certificate_authority.run:start',
'ca-web-request = slapos.certificate_authority.request:requestCertificateWeb',
'ca-sign = slapos.certificate_authority.sign:main',
'clouddestroy = slapos.cloudmgr.destroy:main', 'clouddestroy = slapos.cloudmgr.destroy:main',
'cloudgetprivatekey = slapos.cloudmgr.getprivatekey:main', 'cloudgetprivatekey = slapos.cloudmgr.getprivatekey:main',
'cloudgetpubliciplist = slapos.cloudmgr.getpubliciplist:main', 'cloudgetpubliciplist = slapos.cloudmgr.getpubliciplist:main',
......
# -*- coding: utf-8 -*-
import os
import sys
import subprocess
import time
import ConfigParser
import uuid
import ssl
import logging
import cgi, errno
import urlparse
from OpenSSL import crypto, SSL
import requests
import traceback
def popenCommunicate(command_list, input=None):
subprocess_kw = dict(stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if input is not None:
subprocess_kw.update(stdin=subprocess.PIPE)
popen = subprocess.Popen(command_list, **subprocess_kw)
result = popen.communicate(input)[0]
if popen.returncode is None:
popen.kill()
if popen.returncode != 0:
raise ValueError('Issue during calling %r, result was:\n%s' % (
command_list, result))
return result
class CertificateBase(object):
def __init__(self):
pass
def validateCertAndKey(self, cert_file, key_file):
ctx = SSL.Context(SSL.TLSv1_METHOD)
ctx.use_privatekey(self.freadPkey(key_file))
ctx.use_certificate(self.freadX509(cert_file))
try:
ctx.check_privatekey()
except SSL.Error:
return False
else:
return True
def verifyCertificateChain(self, cert_pem, trusted_cert_list):
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
# Create and fill a X509Sore with trusted certs
store = crypto.X509Store()
for trusted_cert in trusted_cert_list:
store.add_cert(crypto.load_certificate(crypto.FILETYPE_PEM, trusted_cert))
store_ctx = crypto.X509StoreContext(store, cert)
# Returns None if certificate can be validated
result = store_ctx.verify_certificate()
if result is None:
return True
else:
return False
def generatePrivatekey(self, output_file, size=2048):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, size)
try:
key_fd = os.open(output_file, os.O_CREAT|os.O_WRONLY|os.O_EXCL)
except OSError, e:
if e.errno != errno.EEXIST:
raise
else:
os.write(key_fd, crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
os.close(key_fd)
os.chmod(output_file, 0640)
def generateCertificateRequest(self, key_file, output_file, cn,
country, state, locality='', email='', organization='',
organization_unit='', digest="sha1"):
with open(key_file) as fkey:
key = crypto.load_privatekey(crypto.FILETYPE_PEM, fkey.read())
req = crypto.X509Req()
subject = req.get_subject()
subject.CN = cn
subject.C = country
subject.ST = state
subject.L = locality
subject.O = organization
subject.OU = organization_unit
subject.emailAddress = email
req.set_pubkey(key)
req.sign(key, digest)
with open(output_file, 'w') as req_file:
req_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, req))
os.chmod(output_file, 0644)
def checkCertificateValidity(self, ca_cert_file, cert_file, key_file=None):
with open(ca_cert_file) as f_ca:
ca_cert = f_ca.read()
with open(cert_file) as f_cert:
cert = f_cert.read()
# XXX Considering only one trusted certificate here
if not self.verifyCertificateChain(cert, [ca_cert]):
return False
if key_file:
#with open(key_file) as f_key:
# key = f_key.read()
return self.validateCertAndKey(cert_file, key_file)
return True
def readCertificateRequest(self, csr):
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
return req
def freadCertificateRequest(self, csr_file):
with open(csr_file) as f_csr:
return self.readCertificateRequest(f_csr.read())
def freadX509(self, cert_file):
with open(cert_file) as f_cert:
return self.readX509(f_cert.read())
def readX509(self, cert):
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
return x509
def readPkey(self, key):
return crypto.load_privatekey(crypto.FILETYPE_PEM, key)
def freadPkey(self, key_file):
with open(key_file) as f_key:
return self.readPkey(f_key.read())
def getSubject(self, cert):
subject = cert.get_subject()
subject_dict = dict(subject.get_components())
return subject_dict
class CertificateAuthority(CertificateBase):
def __init__(self, openssl_binary,
openssl_configuration=None, certificate='cacert.pem',
key='cakey.pem', ca_directory=None):
self.key = key
self.certificate = certificate
self.openssl_binary = openssl_binary
self.openssl_configuration = openssl_configuration
self.ca_directory = ca_directory
if self.ca_directory is None:
self.ca_directory = os.getcwd()
for file in ['crlnumber', 'serial']:
if not os.path.exists(os.path.join(self.ca_directory, file)):
with open(os.path.join(self.ca_directory, file), 'w') as f:
f.write('01')
if not os.path.exists(os.path.join(self.ca_directory, 'index.txt')):
with open(os.path.join(self.ca_directory, 'index.txt'), 'w') as f:
f.write('')
def checkAuthority(self):
if self.openssl_configuration is None:
raise ValueError('Openssl configuration file not found!')
file_list = [ self.key, self.certificate ]
ca_ready = True
for f in file_list:
if not os.path.exists(f):
ca_ready = False
break
if ca_ready:
return
for f in file_list:
if os.path.exists(f):
os.unlink(f)
try:
# no CA, let us create new one
popenCommunicate([self.openssl_binary, 'req', '-nodes', '-config',
self.openssl_configuration, '-new', '-x509', '-extensions',
'v3_ca', '-keyout', self.key, '-out', self.certificate,
'-days', '10950'], 'Certificate Authority %s\n' % uuid.uuid1())
except:
try:
for f in file_list:
if os.path.exists(f):
os.unlink(f)
except:
# do not raise during cleanup
pass
raise
def signCertificateRequest(self, csr_file, output_file):
if self.openssl_configuration is None:
raise ValueError('Openssl configuration file not found!')
popenCommunicate([self.openssl_binary, 'ca', '-batch', '-config',
self.openssl_configuration, '-out', output_file,
'-infiles', csr_file])
class CertificateAuthorityRequest(CertificateBase):
def __init__(self, key, certificate, cacertificate, ca_url,
max_retry=10, logger=None):
self.key = key
self.certificate = certificate
self.cacertificate = cacertificate
self.ca_url = ca_url
self.logger = logger
self.max_retry = max_retry
if self.logger is None:
self.logger = logging.getLogger('Certificate Request')
self.logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.generatePrivatekey(self.key)
def _request(self, method, url, data={}):
try:
req = getattr(requests, method)
if data:
return req(url, data=data)
else:
return req(url)
except requests.ConnectionError, e:
self.logger.error("Got ConnectionError while sending request to CA. Url is %s\n%s" % (
url, str(e)))
return None
def getCACertificateWeb(self):
# If cert file exists exist
if os.path.exists(self.cacertificate) and os.stat(self.cacertificate).st_size > 0:
return
ca_cert_url = '%s/get/cacert.pem' % self.ca_url
self.logger.info("getting CA certificate file %s" % ca_cert_url)
response = None
while not response or response.status_code != 200:
response = self._request('get', ca_cert_url)
if response is not None:
try:
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, response.text)
except crypto.Error, e:
traceback.print_exc()
response = None
else:
break
# sleep a bit then try again until ca cert is ready
time.sleep(10)
with open(self.cacertificate, 'w') as f:
f.write(response.text)
def signCertificateWeb(self, csr_file):
if os.path.exists(self.certificate) and os.stat(self.certificate).st_size > 0:
return
with open(csr_file) as fcsr:
csr = fcsr.read()
#req = self.freadCertificateRequest(csr_file)
#common_name = self.getCN(req)
data = {'csr': csr}
retry = 0
sleep_time = 10
request_url = '%s/request' % self.ca_url
# Save Cert in tmp to check later
cert_temp = '%s.tmp' % self.certificate
csr_key_file = '%s.key' % csr_file
csr_key = ""
self.logger.info("Request signed certificate from CA...")
if os.path.exists(csr_key_file):
with open(csr_key_file) as fkey:
csr_key = fkey.read()
if csr_key:
self.logger.info("Csr was already sent to CA, key is: %s" % csr_key)
else:
response = self._request('post', request_url, data=data)
while (not response or response.status_code != 200) and retry < self.max_retry:
self.logger.error("%s: Failed to send CSR. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % sleep_time)
time.sleep(sleep_time)
retry += 1
response = self._request('post', request_url, data=data)
if response.status_code != 200:
raise Exception("ERROR: failed to post CSR after % retry. Exiting..." % max_retry)
self.logger.info("CSR succefully sent.")
self.logger.debug("Server reponse with csr key is %s" % response.text)
csr_key = response.text
with open(csr_key_file, 'w') as fkey:
fkey.write(response.text)
self.logger.info("Waiting for signed certificate...")
reply_url = '%s/get/%s.cert.pem' % (self.ca_url, csr_key)
response = self._request('get', reply_url)
while not response or response.status_code != 200:
time.sleep(sleep_time)
response = self._request('get', reply_url)
with open(cert_temp, 'w') as cf:
cf.write(response.text)
os.chmod(cert_temp, 0640)
self.logger.info("Validating signed certificate...")
if self.checkCertificateValidity(self.cacertificate, cert_temp, self.key):
os.rename(cert_temp, self.certificate)
else:
raise Exception("Error: Certificate validation failed. " \
"This signed certificate should be revoked!")
self.logger.info("Certificate correctly saved at %s." % self.certificate)
# -*- coding: utf-8 -*-
# vim: set et sts=2:
import ConfigParser
import datetime
import flask
import logging
import logging.handlers
import os
import argparse
import traceback
from slapos.certificate_authority.certificate_authority import CertificateAuthorityRequest
def parseArguments():
"""
Parse arguments for monitor instance.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--cert_file',
default='cert.pem',
help='Path for Certificate file')
parser.add_argument('--ca_cert_file',
default='cacert.pem',
help='Path for CA Cert file.')
parser.add_argument('--key_file',
default='key.pem',
help='Path of key file')
parser.add_argument('--csr_file',
default='csr.pem',
help='Path where to store csr file.')
parser.add_argument('--ca_url',
help='Certificate Authority URL')
parser.add_argument('--cn',
help='Common name to use in certificate request')
parser.add_argument('--country',
default='XX',
help='Country Name')
parser.add_argument('--state',
default="(State,)",
help='State Name')
parser.add_argument('--locality',
default='City',
help='City or Locality Name')
parser.add_argument('--email',
default='xx@example.com',
help='The email to use')
parser.add_argument('--organization',
default='Company',
help='The Organisation Name')
parser.add_argument('--organization_unit',
default='Company Unit',
help='The Organisation Unit Name')
return parser.parse_args()
def requestCertificateWeb():
config = parseArguments()
ca = CertificateAuthorityRequest(config.key_file, config.cert_file,
config.ca_cert_file, config.ca_url)
ca.getCACertificateWeb()
if os.path.exists(config.cert_file):
# XXX- should check later if the certificate is expired
return
# When renew certificate, another verification can be done
if not os.path.exists(config.csr_file):
ca.generateCertificateRequest(config.key_file, config.csr_file,
cn=config.cn, country=config.country, state=config.state,
locality=config.locality, email=config.email,
organization=config.organization,
organization_unit=config.organization_unit, digest="sha1")
ca.signCertificateWeb(config.csr_file)
# -*- coding: utf-8 -*-
# vim: set et sts=2:
# pylint: disable-msg=W0311,C0301,C0103,C0111,R0904,R0903
import ConfigParser
import datetime
import flask
import logging
import logging.handlers
import os, errno
import argparse
import traceback
from slapos.certificate_authority.web import app
from slapos.certificate_authority.certificate_authority import CertificateAuthority
def parseArguments():
"""
Parse arguments for monitor instance.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--ca_dir',
help='Certificate authority base directory')
parser.add_argument('--openssl_bin',
default='openssl',
help='Path of openssl binary file')
parser.add_argument('--config_file',
help='Openssl configuration file. Default: $ca_dir/openssl.cnf')
parser.add_argument('--certs_dir',
help='Directory where to put generated certificates. Default: $ca_dir/certs')
parser.add_argument('--key_dir',
help='Directory where to put CA keys. Default: $ca_dir/private')
parser.add_argument('--crl_dir',
help='Directory for revoked certificate list. Default: $ca_dir/crl')
parser.add_argument('--cert_file',
help='Path for Certificate file. Defaul: $ca_dir/cacert.pem')
parser.add_argument('--key_file',
help='Path of key file. Default: $key_dir/cakey.pem')
parser.add_argument('--host',
default=[],
help='Host or IP of ca server.')
parser.add_argument('--port',
default='9086',
help='Port for ca server.')
parser.add_argument('--debug',
action="store_true", dest="debug",
help='Enable debug mode.')
parser.add_argument('--log_file',
help='Path for log output')
parser.add_argument('--trusted_host',
default=[],
action='append', dest='trusted_host_list',
help='IP Allowed to call signcert method.')
return parser.parse_args()
def getLogger(debug=False, log_file=None):
logger = logging.getLogger("CertificateAuthority")
logger.setLevel(logging.INFO)
if not log_file:
logger.addHandler(logging.StreamHandler())
else:
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(handler)
logger.info('Configured logging to file %r' % log_file)
if debug:
logger.setLevel(logging.DEBUG)
return logger
def getConfig(self, key):
if key in self.keys():
temp_dict = dict()
temp_dict.update(self)
return temp_dict[key]
else:
raise KeyError
def start():
"""
start certificate authority service
"""
flask.config.Config.__getattr__ = getConfig
options = parseArguments()
if not options.ca_dir:
options.ca_dir = os.getcwd()
else:
options.ca_dir = os.path.abspath(options.ca_dir)
if not options.config_file:
options.config_file = os.path.join(options.ca_dir, 'openssl.cnf')
if not options.certs_dir:
options.certs_dir = os.path.join(options.ca_dir, 'certs')
if not options.key_dir:
options.key_dir = os.path.join(options.ca_dir, 'private')
if not options.crl_dir:
options.crl_dir = os.path.join(options.ca_dir, 'crl')
if not options.cert_file:
options.cert_file = os.path.join(options.ca_dir, 'cacert.pem')
if not options.key_file:
options.key_file = os.path.join(options.ca_dir, 'private', 'cakey.pem')
logger = getLogger(options.debug, options.log_file)
ca = CertificateAuthority(options.openssl_bin,
openssl_configuration=options.config_file, certificate=options.cert_file,
key=options.key_file, ca_directory=options.ca_dir)
#config = Config()
app.config.update(
ca_dir=options.ca_dir,
trusted_host_list=options.trusted_host_list,
csr_dir=os.path.join(options.ca_dir, 'csr'),
req_dir=os.path.join(options.ca_dir, 'req'),
cert_dir=os.path.join(options.ca_dir, 'certs'),
newcert_dir=os.path.join(options.ca_dir, 'newcerts'),
crl_dir=os.path.join(options.ca_dir, 'crl'),
key_dir=os.path.join(options.ca_dir, 'private'),
ca=ca,
)
for key in ['csr', 'req', 'cert', 'crl', 'key', 'newcert']:
try:
path = app.config['%s_dir' % key]
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
continue
else:
raise
# Generate certificate Authority cert and key
ca.checkAuthority()
app.logger.addHandler(logger)
app.logger.info("Certificate Authority server started on http://%s:%s" % (
options.host, options.port))
app.run(
debug=options.debug,
host=options.host,
port=int(options.port)
)
# -*- coding: utf-8 -*-
# vim: set et sts=2:
import logging
import os
import argparse
import requests
import sys
import prettytable
import glob
from slapos.certificate_authority.certificate_authority import CertificateBase
def parseArguments():
"""
Parse arguments for monitor instance.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--ca_dir',
help='Path for Certificate Authority folder')
parser.add_argument('--key',
action='append', dest='key_list', default=[],
help='Key of the certificate Request to sign')
parser.add_argument('--ca_url',
help='Certificate Authority URL')
parser.add_argument('--list',
default=False, action="store_true",
help='List certificates to sign')
parser.add_argument('--sign',
default=False, action="store_true",
help='Request sign certificate')
return parser
def main():
parser = parseArguments()
options = parser.parse_args()
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stdout ,level=logging.DEBUG)
if not options.ca_dir:
options.ca_dir = os.getcwd()
else:
options.ca_dir = os.path.abspath(options.ca_dir)
options.req = os.path.join(options.ca_dir, 'req')
if options.list:
exit(listCertificateRequest(options))
elif options.sign and len(options.key_list) > 0:
exit(requestSigncertificate(options))
parser.print_help()
exit(1)
def listCertificateRequest(options):
pt = prettytable.PrettyTable(['CN', 'Country', 'City', 'email', 'key'])
req_directory = os.path.join(options.ca_dir, 'req')
x509 = CertificateBase()
file_list = filter(os.path.isfile,
glob.glob("%s/*.csr.pem" % req_directory)
)
for file in file_list:
# XXX file name is key.csr.pem
key = os.path.basename(file).split('.')[0]
req = x509.freadCertificateRequest(file)
subject = x509.getSubject(req)
pt.add_row([subject['CN'], subject['C'], subject['ST'],
subject['emailAddress'], key])
for line in pt.get_string(border=True, padding_width=0).split('\n'):
#logging.info(line)
print line
return 0
def requestSigncertificate(options):
req_directory = os.path.join(options.ca_dir, 'req')
code = 0
x509 = CertificateBase()
for key in options.key_list:
csr_file = os.path.join(req_directory, '%s.csr.pem' % key)
if not os.path.exists(csr_file):
logging.error("Cannot get csr on CA from specified key: %s" % key)
return -2
req = x509.freadCertificateRequest(csr_file)
cn = x509.getSubject(req)['CN']
data = {'key': key}
logging.info("Signing %s..." % cn)
response = requests.post('%s/signcert' % options.ca_url, data=data)
if response.status_code != 200:
code = -1*response.status_code
logging.error("ERROR %s: Failed to sign certifice from %s.\n%s" % (
response.status_code, cn, response.text))
else:
logging.info("%s is signed, server responded with: %s" % (cn,
response.text))
return code
body {
min-height: 2000px;
padding-top: 70px;
}
\ No newline at end of file
<!doctype html>
<title>Flaskr</title>
<link rel=stylesheet type=text/css href="{{ url_for('static', filename='style.css') }}">
<div class=page>
<h1>Certificate Authority Public</h1>
<ul>
{% for filename in filename_list -%}
<a href="/get/{{ filename }}">{{ filename }}</a>
{% endfor -%}
</ul>
</div>
\ No newline at end of file
<!doctype html>
<title>Certificate Authority Web</title>
<!-- Latest compiled and minified CSS -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
<!-- Optional theme -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap-theme.min.css" integrity="sha384-rHyoN1iRsVXV4nD0JutlnGaslCJuC7uwjduW9SVrLvRYooPp2bWYgmgJQIXwl/Sp" crossorigin="anonymous">
<!-- Latest compiled and minified JavaScript -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/js/bootstrap.min.js" integrity="sha384-Tc5IQib027qvyjSMfHjOMaLkfuWVxZxUPnCJA7l2mCWNIpG9mGCD8wGNIcPD7Txa" crossorigin="anonymous"></script>
<link rel=stylesheet type=text/css href="{{ url_for('static', filename='style.css') }}">
<div class=page>
<nav class="navbar navbar-default navbar-fixed-top"></nav>
<div class="container">
</div> <!-- /container -->
{% block body %}{% endblock %}
</div>
\ No newline at end of file
# -*- coding: utf-8 -*-
import os
import sys
import traceback
from OpenSSL import crypto
import uuid
from flask import (Flask, request, redirect, url_for, render_template,
jsonify, session, abort, send_file)
app = Flask(__name__)
@app.errorhandler(403)
def error403(res):
return "403: Forbidden. Your are not allowed to access %s" % res, 403
return response
@app.errorhandler(404)
def error404(msg=""):
return "404: Resource not found.\n%s\n" % msg, 404
@app.errorhandler(501)
def error501(msg=""):
return "501: Internal Error. %s\n" % msg, 501
@app.errorhandler(412)
def error412(msg):
return "412: Precondition Failed: %s.\n" % msg, 412
@app.errorhandler(400)
def error400():
return "400: Bad Request", 400
def writefile(path, content, mode=0640):
with open(path, 'w') as fd:
fd.write(content)
os.chmod(path, mode)
@app.route('/')
def index():
# page to list certificates, also connection link
cert_list = [os.path.basename(app.config.ca.certificate)]
cert_list.extend([x for x in os.listdir(app.config.cert_dir)])
return render_template("index.html", filename_list=cert_list)
@app.route('/get/<string:name>', methods=['GET'])
def getfile(name):
ca_name = os.path.basename(app.config.ca.certificate)
if name == ca_name:
return send_file(app.config.ca.certificate,
attachment_filename=ca_name,
as_attachment=True)
else:
cert_file = os.path.join(app.config.cert_dir, name)
if os.path.exists(cert_file) and os.path.isfile(cert_file):
filename = os.path.basename(cert_file)
return send_file(cert_file,
attachment_filename=filename,
as_attachment=True)
return abort(404, name)
@app.route('/request', methods=['POST'])
def do_request():
csr_content = request.form.get('csr', '').encode('utf-8')
if not csr_content:
return abort(400)
try:
req = app.config.ca.readCertificateRequest(csr_content)
# XXX - Maybe more check if the csr is valid with CA
except crypto.Error, e:
return abort(412, str(e))
cert_id = str(uuid.uuid1())
csr_keyfile = '%s+%s.csr.pem' % (cert_id, str(uuid.uuid4()))
  • Here cert_id is the id That I send to the user, he will download the certificate with something like: URL/cert_id.cert.pem, then I generate the csr with two UUID. When signing from command line, we need to know those two uuid, the csr is key.csr.pem with key=cert_id+uuid4(). So to call signcert method it required to have allowed host + know the full key. If another process doesn't have access to the CA partition, he won't be able to get the key (by listing the csr request directory). As for now there is not UI to sign certificate, I can use this to sign certificate from a process into the CA partition. I imagine monitor can use this to sign the certificate. If it run into CA partition, he will have access to csr keys.

Please register or sign in to reply
request_file = os.path.join(app.config.req_dir, csr_keyfile)
if os.path.exists(request_file):
# The request already exist
raise Exception("Certificate Signature Request file should be unique")
try:
writefile(request_file, csr_content)
except OSError, e:
raise
return cert_id
@app.route('/signcert', methods=['POST'])
def do_signcert():
"""
This method should be called by a list of host allowed, can be used to sign with command line
For security, it's good to only allow local ip (127.0.0.1)
  • Host-based security is worthless. Any other partition on the same machine as the ca can then emit certificates. This is not the right approach to securing this fuction.

  • I aggree there is security issue if another partition use the trusted host to call sign cert. I imagine this method will be improved at some point. I wanted to propose something that can allow to sign from script (console), so in order to improve security it's required to send a key as parameter. See on top my comment.

Please register or sign in to reply
"""
key = request.form.get('key', '').encode('utf-8')
if not key:
return abort(400)
remote_client = request.remote_addr
x_forwarded_for_list = request.headers.getlist("X-Forwarded-For")
if remote_client not in app.config.trusted_host_list or \
(x_forwarded_for_list and x_forwarded_for_list[0] not in app.config.trusted_host_list):
return abort(403) # Forbidden
req_file = os.path.join(app.config.req_dir, '%s.csr.pem' % key)
if os.path.exists(req_file):
cert_id = key.split('+')[0]
output = os.path.join(app.config.cert_dir, '%s.cert.pem' % cert_id)
try:
if os.path.exists(output):
# This should not happend normally
return abort(412, "The certificate already exists.")
app.config.ca.signCertificateRequest(req_file, output)
os.rename(req_file, os.path.join(app.config.csr_dir, '%s.csr.pem' % cert_id))
except Exception, e:
if os.path.exists(output):
try:
os.unlink(output)
except:
pass
traceback.print_exc()
return abort(501, str(e))
else:
return abort(403)
return "Certificate is signed"
@app.route('/renewcert/<string:serial>', methods=['POST'])
def renewcert(serial):
  • Renewing is like requesting a new certificate, except it can be automated easily: requester must sign his renewal request with his old private key (to prove it is the legitimate requester) and some piece of information about the old certificate (I don't know if serial is enough, as CA should not have to keep all issued certs in its memory, and it must check that the client is requesting the same kind of certificate - so maybe client must send the whole old certificate), then submit a new signature request.

    Server, if request signature matches the old certificate, and the old certificate is signed by the correct CA key, and the old certificate is not revoked, and the new certificate has the same attributes (except validity period, at least), then CA can automatically issue a new certificate.

  • I will improve the client request method to check at every run if the certificate is expired, then automatically request renew and backup the expired certificate. But, may be in some cases to restart to process which was using the expired certificate, does this should be managed by this request script ? Do you have any idea ?

Please register or sign in to reply
"""
this method is used to renew expired certificate.
"""
return abort(200, "Done. The method is not implemented yet!")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment