Commit dc0e2feb authored by Rafael Monnerat's avatar Rafael Monnerat

erp5_certificate_authority: Sign the certificate with Certificate Login before send to caucase

parent 512c9549
......@@ -29,11 +29,57 @@
from AccessControl import ClassSecurityInfo
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
class CertificateLoginMixin:
security = ClassSecurityInfo()
def _getCertificate(self):
def _getCertificateSigningRequestTemplate(self):
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend())
# Probably we should extend a bit more the attributes.
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
# The cryptography library only accept Unicode.
x509.NameAttribute(NameOID.COMMON_NAME, self.getReference().decode('UTF-8')),
])).sign(key, hashes.SHA256(), default_backend())
return csr.public_bytes(serialization.Encoding.PEM).decode()
def _getCaucaseConnector(self):
portal = self.getPortalObject()
connector_list = portal.portal_catalog.unrestrictedSearchResults(
portal_type="Caucase Connector",
reference="default",
validation_state="validated"
)
if len(connector_list) == 0:
raise ValueError("No caucase connector found!")
if len(connector_list) > 1:
raise ValueError("Too many caucase connector found!")
return connector_list[0]
def _getCertificate(self, csr=None):
caucase_connector = self._getCaucaseConnector()
portal = self.getPortalObject()
certificate_dict = {
"common_name" : self.getReference()
}
if self.getReference and self.getDestinationReference():
certificate_dict["id"] = self.getDestinationReference()
crt_pem = caucase_connector.getCertificate(self.getDestinationReference())
certificate_dict["certificate"] = crt_pem
# We should assert that reference is the CN of crt_pem
return certificate_dict
_id = self._generateRandomId()
reference = 'CERTLOGIN-%i-%s-%s' % (
portal.portal_ids.generateNewId(
......@@ -42,10 +88,29 @@ class CertificateLoginMixin:
), _id, self.getParentValue().getReference("")
)
self.setReference(reference)
certificate_dict = self.getPortalObject().portal_certificate_authority\
.getNewCertificate(self.getReference())
self.setDestinationReference(certificate_dict['id'])
return certificate_dict
template_csr = self._getCertificateSigningRequestTemplate()
csr_id = caucase_connector.createCertificateSigningRequest(csr)
caucase_connector.createCertificate(csr_id, template_csr=template_csr)
crt_pem = caucase_connector.getCertificate(csr_id)
self.setDestinationReference(csr_id)
return {
"certificate" : crt_pem,
"id" : self.getDestinationReference(),
"common_name" : self.getReference()
}
security.declarePublic('getCertificate')
def getCertificate(self, csr=None):
"""Returns new SSL certificate"""
if csr is None and self.getDestinationReference() is None:
key, csr = self._getCaucaseConnector()._createCertificateRequest()
certificate_dict = self._getCertificate(csr=csr)
certificate_dict["key"] = key
return certificate_dict
else:
return self._getCertificate(csr=csr)
def _revokeCertificate(self):
if self.getDestinationReference() is not None:
......@@ -63,14 +128,7 @@ class CertificateLoginMixin:
else:
raise ValueError("No certificate found to revoke!")
security.declarePublic('getCertificate')
def getCertificate(self):
"""Returns new SSL certificate"""
if self.getDestinationReference() is not None:
raise ValueError("Certificate was already issued, please revoke first.")
return self._getCertificate()
security.declarePublic('revokeCertificate')
def revokeCertificate(self):
"""Revokes existing certificate"""
self._revokeCertificate()
self._revokeCertificate()
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment