Commit 54751169 authored by Rafael Monnerat's avatar Rafael Monnerat

erp5_certificate_authority: General update from review

parent 8652a6a6
......@@ -49,18 +49,18 @@ class CaucaseConnector(XMLObject):
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
def _getConnection(self, mode="service", **kw):
def _getServiceConnection(self, **kw):
# XXX Call checkConsistency
if self.getUrlString() is None:
raise ValueError("Caucase url must be defined")
return CaucaseClient(ca_url="%s/cas" % self.getUrlString(), **kw)
if mode == "service":
ca_url = self.getUrlString() + '/cas'
elif mode == "user":
ca_url = self.getUrlString() + '/cau'
else:
raise ValueError("Unknown mode, please use service or user only")
return CaucaseClient(ca_url=ca_url, **kw)
def _getUserConnection(self, **kw):
# XXX Call checkConsistency
if self.getUrlString() is None:
raise ValueError("Caucase url must be defined")
return CaucaseClient(ca_url="%s/cau" % self.getUrlString(), **kw)
def _getAuthenticatedConnection(self):
if self.getUserCertificate() is None:
......@@ -75,13 +75,28 @@ class CaucaseConnector(XMLObject):
user_key_file.write("\n")
user_key_file.write(self.getUserCertificate())
user_key_file.flush()
return self._getServiceConnection(user_key=user_key_file.name)
def getCertificateSigningRequestTemplate(self, common_name):
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend())
name_attribute_list = self._getSubjectNameAttributeList()
name_attribute_list.append(
x509.NameAttribute(NameOID.COMMON_NAME,
# The cryptography library only accept Unicode.
common_name.decode('UTF-8')))
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(
name_attribute_list
)).sign(key, hashes.SHA256(), default_backend())
return self._getConnection(user_key=user_key_file.name)
return csr.public_bytes(serialization.Encoding.PEM).decode()
security.declareProtected(Permissions.ManageUsers, 'bootstrapCaucaseConfiguration')
def bootstrapCaucaseConfiguration(self):
if self.getUserCertificate() is None:
caucase_connection = self._getConnection(mode="user")
caucase_connection = self._getUserConnection()
if not self.hasUserCertificateRequestReference():
key, csr = self._createCertificateRequest()
csr_id = caucase_connection.createCertificateSigningRequest(csr)
......@@ -135,7 +150,7 @@ class CaucaseConnector(XMLObject):
public_exponent=65537, key_size=2048, backend=default_backend())
key_pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
......@@ -148,10 +163,10 @@ class CaucaseConnector(XMLObject):
return key_pem.decode(), csr.public_bytes(serialization.Encoding.PEM).decode()
def getCACertificate(self):
return self._getConnection().getCACertificate()
return self._getServiceConnection().getCACertificate()
def createCertificateSigningRequest(self, csr):
return self._getConnection().createCertificateSigningRequest(csr)
return self._getServiceConnection().createCertificateSigningRequest(csr)
security.declareProtected(Permissions.ManageUsers, 'createCertificate')
def createCertificate(self, csr_id, template_csr=""):
......@@ -165,6 +180,6 @@ class CaucaseConnector(XMLObject):
def revokeCertificate(self, crt_pem, key_pem=None):
if key_pem is None:
return self._getAuthenticatedConnection().revokeCertificate(crt_pem)
return self._getConnection().revokeCertificate(crt_pem, key_pem)
return self._getServiceConnection().revokeCertificate(crt_pem, key_pem)
InitializeClass(CaucaseConnector)
\ No newline at end of file
......@@ -29,33 +29,9 @@
from AccessControl import ClassSecurityInfo
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
class CertificateLoginMixin:
security = ClassSecurityInfo()
def _getCertificateSigningRequestTemplate(self):
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend())
name_attribute_list = self._getCaucaseConnector()._getSubjectNameAttributeList()
name_attribute_list.append(
x509.NameAttribute(NameOID.COMMON_NAME,
# The cryptography library only accept Unicode.
self.getReference().decode('UTF-8')))
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(
name_attribute_list
)).sign(key, hashes.SHA256(), default_backend())
return csr.public_bytes(serialization.Encoding.PEM).decode()
def _getCaucaseConnector(self):
portal = self.getPortalObject()
connector_list = portal.portal_catalog.unrestrictedSearchResults(
......@@ -94,7 +70,7 @@ class CertificateLoginMixin:
), _id, self.getParentValue().getReference("")
)
self.setReference(reference)
template_csr = self._getCertificateSigningRequestTemplate()
template_csr = caucase_connector.getCertificateSigningRequestTemplate(reference)
csr_id = caucase_connector.createCertificateSigningRequest(csr)
caucase_connector.createCertificate(csr_id, template_csr=template_csr)
......@@ -104,7 +80,7 @@ class CertificateLoginMixin:
return {
"certificate" : crt_pem,
"id" : self.getSourceReference(),
"common_name" : self.getReference()
"common_name" : reference
}
security.declarePublic('getCertificate')
......
......@@ -54,17 +54,15 @@ class TestCertificateAuthorityCaucaseConnector(ERP5TypeCaucaseTestCase):
def getBusinessTemplateList(self):
return ('erp5_base', 'erp5_web_service', 'erp5_certificate_authority')
def test_getConnection_no_url(self):
def test_getServiceConnection_no_url(self):
connector_no_url_string = self.portal.portal_web_services.newContent(
portal_type="Caucase Connector"
)
self.assertRaises(ValueError, connector_no_url_string._getConnection)
self.assertRaises(ValueError, connector_no_url_string._getServiceConnection)
def test_getConnection(self):
self.assertNotEqual(None, self.caucase_connector._getConnection())
self.assertNotEqual(None, self.caucase_connector._getConnection(mode="service"))
self.assertNotEqual(None, self.caucase_connector._getConnection(mode="user"))
self.assertRaises(ValueError, self.caucase_connector._getConnection, "unknownmode")
self.assertNotEqual(None, self.caucase_connector._getServiceConnection())
self.assertNotEqual(None, self.caucase_connector._getUserConnection())
def test_getAuthenticatedConnection_no_url(self):
connector_no_url_string = self.portal.portal_web_services.newContent(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment