Commit cafa2664 authored by Jérome Perrin's avatar Jérome Perrin Committed by Arnaud Fontaine

certificate_authority: py3

parent 6441ea23
...@@ -60,11 +60,9 @@ def binary_search(binary): ...@@ -60,11 +60,9 @@ def binary_search(binary):
class CertificateAuthorityBusy(Exception): class CertificateAuthorityBusy(Exception):
"""Exception raised when certificate authority is busy""" """Exception raised when certificate authority is busy"""
pass
class CertificateAuthorityDamaged(Exception): class CertificateAuthorityDamaged(Exception):
"""Exception raised when certificate authority is damaged""" """Exception raised when certificate authority is damaged"""
pass
class CertificateAuthorityTool(BaseTool): class CertificateAuthorityTool(BaseTool):
"""CertificateAuthorityTool """CertificateAuthorityTool
...@@ -103,7 +101,8 @@ class CertificateAuthorityTool(BaseTool): ...@@ -103,7 +101,8 @@ class CertificateAuthorityTool(BaseTool):
Raises CertificateAuthorityBusy""" Raises CertificateAuthorityBusy"""
if os.path.exists(self.lock): if os.path.exists(self.lock):
raise CertificateAuthorityBusy raise CertificateAuthorityBusy
open(self.lock, 'w').write('locked') with open(self.lock, 'w') as f:
f.write('locked')
def _unlockCertificateAuthority(self): def _unlockCertificateAuthority(self):
"""Checks lock and locks Certificate Authority tool""" """Checks lock and locks Certificate Authority tool"""
...@@ -192,7 +191,8 @@ class CertificateAuthorityTool(BaseTool): ...@@ -192,7 +191,8 @@ class CertificateAuthorityTool(BaseTool):
self._checkCertificateAuthority() self._checkCertificateAuthority()
self._lockCertificateAuthority() self._lockCertificateAuthority()
index = open(self.index).read().splitlines() with open(self.index) as f:
index = f.read().splitlines()
valid_line_list = [q for q in index if q.startswith('V') and valid_line_list = [q for q in index if q.startswith('V') and
('CN=%s/' % common_name in q)] ('CN=%s/' % common_name in q)]
if len(valid_line_list) >= 1: if len(valid_line_list) >= 1:
...@@ -201,7 +201,8 @@ class CertificateAuthorityTool(BaseTool): ...@@ -201,7 +201,8 @@ class CertificateAuthorityTool(BaseTool):
'please revoke it before request a new one..' % common_name) 'please revoke it before request a new one..' % common_name)
try: try:
new_id = open(self.serial, 'r').read().strip().lower() with open(self.serial, 'r') as f:
new_id = f.read().strip().lower()
key = os.path.join(self.certificate_authority_path, 'private', key = os.path.join(self.certificate_authority_path, 'private',
new_id+'.key') new_id+'.key')
csr = os.path.join(self.certificate_authority_path, new_id + '.csr') csr = os.path.join(self.certificate_authority_path, new_id + '.csr')
...@@ -211,14 +212,18 @@ class CertificateAuthorityTool(BaseTool): ...@@ -211,14 +212,18 @@ class CertificateAuthorityTool(BaseTool):
os.close(os.open(key, os.O_CREAT | os.O_EXCL, 0o600)) os.close(os.open(key, os.O_CREAT | os.O_EXCL, 0o600))
popenCommunicate([self.openssl_binary, 'req', '-utf8', '-nodes', '-config', popenCommunicate([self.openssl_binary, 'req', '-utf8', '-nodes', '-config',
self.openssl_config, '-new', '-keyout', key, '-out', csr, '-days', self.openssl_config, '-new', '-keyout', key, '-out', csr, '-days',
'3650'], '%s\n' % common_name, stdin=subprocess.PIPE) '3650'], ('%s\n' % common_name).encode(), stdin=subprocess.PIPE)
popenCommunicate([self.openssl_binary, 'ca', '-utf8', '-days', '3650', popenCommunicate([self.openssl_binary, 'ca', '-utf8', '-days', '3650',
'-batch', '-config', self.openssl_config, '-out', cert, '-infiles', '-batch', '-config', self.openssl_config, '-out', cert, '-infiles',
csr]) csr])
os.unlink(csr) os.unlink(csr)
with open(key) as f:
key = f.read()
with open(cert) as f:
cert = f.read()
return dict( return dict(
key=open(key).read(), key=key,
certificate=open(cert).read(), certificate=cert,
id=new_id, id=new_id,
common_name=common_name) common_name=common_name)
except Exception: except Exception:
...@@ -242,7 +247,8 @@ class CertificateAuthorityTool(BaseTool): ...@@ -242,7 +247,8 @@ class CertificateAuthorityTool(BaseTool):
self._checkCertificateAuthority() self._checkCertificateAuthority()
self._lockCertificateAuthority() self._lockCertificateAuthority()
try: try:
new_id = open(self.crl, 'r').read().strip().lower() with open(self.crl, 'r') as f:
new_id = f.read().strip().lower()
crl_path = os.path.join(self.certificate_authority_path, 'crl') crl_path = os.path.join(self.certificate_authority_path, 'crl')
crl = os.path.join(crl_path, new_id + '.crl') crl = os.path.join(crl_path, new_id + '.crl')
cert = os.path.join(self.certificate_authority_path, 'certs', cert = os.path.join(self.certificate_authority_path, 'certs',
...@@ -256,11 +262,13 @@ class CertificateAuthorityTool(BaseTool): ...@@ -256,11 +262,13 @@ class CertificateAuthorityTool(BaseTool):
popenCommunicate([self.openssl_binary, 'ca', '-utf8', '-config', popenCommunicate([self.openssl_binary, 'ca', '-utf8', '-config',
self.openssl_config, '-gencrl', '-out', crl]) self.openssl_config, '-gencrl', '-out', crl])
alias = os.path.join(crl_path, popenCommunicate([self.openssl_binary, alias = os.path.join(crl_path, popenCommunicate([self.openssl_binary,
'crl', '-noout', '-hash', '-in', crl]).strip() + '.r') 'crl', '-noout', '-hash', '-in', crl]).strip().decode() + '.r')
alias += str(len(glob.glob(alias + '*'))) alias += str(len(glob.glob(alias + '*')))
created.append(alias) created.append(alias)
os.symlink(os.path.basename(crl), alias) os.symlink(os.path.basename(crl), alias)
return dict(crl=open(crl).read()) with open(crl) as f:
crl = f.read()
return dict(crl=crl)
except Exception: except Exception:
e = sys.exc_info() e = sys.exc_info()
try: try:
...@@ -278,7 +286,8 @@ class CertificateAuthorityTool(BaseTool): ...@@ -278,7 +286,8 @@ class CertificateAuthorityTool(BaseTool):
self._unlockCertificateAuthority() self._unlockCertificateAuthority()
def _getValidSerial(self, common_name): def _getValidSerial(self, common_name):
index = open(self.index).read().splitlines() with open(self.index) as f:
index = f.read().splitlines()
valid_line_list = [q for q in index if q.startswith('V') and valid_line_list = [q for q in index if q.startswith('V') and
('CN=%s/' % common_name in q)] ('CN=%s/' % common_name in q)]
if len(valid_line_list) < 1: if len(valid_line_list) < 1:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment