Commit 580b215b authored by Rafael Monnerat's avatar Rafael Monnerat

erp5_certificate_authority: General update from review

parent 3654e63b
...@@ -49,18 +49,18 @@ class CaucaseConnector(XMLObject): ...@@ -49,18 +49,18 @@ class CaucaseConnector(XMLObject):
security = ClassSecurityInfo() security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation) security.declareObjectProtected(Permissions.AccessContentsInformation)
def _getConnection(self, mode="service", **kw):
def _getServiceConnection(self, **kw):
# XXX Call checkConsistency
if self.getUrlString() is None: if self.getUrlString() is None:
raise ValueError("Caucase url must be defined") raise ValueError("Caucase url must be defined")
return CaucaseClient(ca_url="%s/cas" % self.getUrlString(), **kw)
if mode == "service": def _getUserConnection(self, **kw):
ca_url = self.getUrlString() + '/cas' # XXX Call checkConsistency
elif mode == "user": if self.getUrlString() is None:
ca_url = self.getUrlString() + '/cau' raise ValueError("Caucase url must be defined")
else: return CaucaseClient(ca_url="%s/cau" % self.getUrlString(), **kw)
raise ValueError("Unknown mode, please use service or user only")
return CaucaseClient(ca_url=ca_url, **kw)
def _getAuthenticatedConnection(self): def _getAuthenticatedConnection(self):
if self.getUserCertificate() is None: if self.getUserCertificate() is None:
...@@ -75,13 +75,28 @@ class CaucaseConnector(XMLObject): ...@@ -75,13 +75,28 @@ class CaucaseConnector(XMLObject):
user_key_file.write("\n") user_key_file.write("\n")
user_key_file.write(self.getUserCertificate()) user_key_file.write(self.getUserCertificate())
user_key_file.flush() user_key_file.flush()
return self._getServiceConnection(user_key=user_key_file.name)
def getCertificateSigningRequestTemplate(self, common_name):
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend())
name_attribute_list = self._getSubjectNameAttributeList()
name_attribute_list.append(
x509.NameAttribute(NameOID.COMMON_NAME,
# The cryptography library only accept Unicode.
common_name.decode('UTF-8')))
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(
name_attribute_list
)).sign(key, hashes.SHA256(), default_backend())
return self._getConnection(user_key=user_key_file.name) return csr.public_bytes(serialization.Encoding.PEM).decode()
security.declareProtected(Permissions.ManageUsers, 'bootstrapCaucaseConfiguration') security.declareProtected(Permissions.ManageUsers, 'bootstrapCaucaseConfiguration')
def bootstrapCaucaseConfiguration(self): def bootstrapCaucaseConfiguration(self):
if self.getUserCertificate() is None: if self.getUserCertificate() is None:
caucase_connection = self._getConnection(mode="user") caucase_connection = self._getUserConnection()
if not self.hasUserCertificateRequestReference(): if not self.hasUserCertificateRequestReference():
key, csr = self._createCertificateRequest() key, csr = self._createCertificateRequest()
csr_id = caucase_connection.createCertificateSigningRequest(csr) csr_id = caucase_connection.createCertificateSigningRequest(csr)
...@@ -135,7 +150,7 @@ class CaucaseConnector(XMLObject): ...@@ -135,7 +150,7 @@ class CaucaseConnector(XMLObject):
public_exponent=65537, key_size=2048, backend=default_backend()) public_exponent=65537, key_size=2048, backend=default_backend())
key_pem = key.private_bytes( key_pem = key.private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=serialization.NoEncryption()
) )
...@@ -148,10 +163,10 @@ class CaucaseConnector(XMLObject): ...@@ -148,10 +163,10 @@ class CaucaseConnector(XMLObject):
return key_pem.decode(), csr.public_bytes(serialization.Encoding.PEM).decode() return key_pem.decode(), csr.public_bytes(serialization.Encoding.PEM).decode()
def getCACertificate(self): def getCACertificate(self):
return self._getConnection().getCACertificate() return self._getServiceConnection().getCACertificate()
def createCertificateSigningRequest(self, csr): def createCertificateSigningRequest(self, csr):
return self._getConnection().createCertificateSigningRequest(csr) return self._getServiceConnection().createCertificateSigningRequest(csr)
security.declareProtected(Permissions.ManageUsers, 'createCertificate') security.declareProtected(Permissions.ManageUsers, 'createCertificate')
def createCertificate(self, csr_id, template_csr=""): def createCertificate(self, csr_id, template_csr=""):
...@@ -165,6 +180,6 @@ class CaucaseConnector(XMLObject): ...@@ -165,6 +180,6 @@ class CaucaseConnector(XMLObject):
def revokeCertificate(self, crt_pem, key_pem=None): def revokeCertificate(self, crt_pem, key_pem=None):
if key_pem is None: if key_pem is None:
return self._getAuthenticatedConnection().revokeCertificate(crt_pem) return self._getAuthenticatedConnection().revokeCertificate(crt_pem)
return self._getConnection().revokeCertificate(crt_pem, key_pem) return self._getServiceConnection().revokeCertificate(crt_pem, key_pem)
InitializeClass(CaucaseConnector) InitializeClass(CaucaseConnector)
\ No newline at end of file
...@@ -29,33 +29,9 @@ ...@@ -29,33 +29,9 @@
from AccessControl import ClassSecurityInfo from AccessControl import ClassSecurityInfo
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
class CertificateLoginMixin: class CertificateLoginMixin:
security = ClassSecurityInfo() security = ClassSecurityInfo()
def _getCertificateSigningRequestTemplate(self):
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend())
name_attribute_list = self._getCaucaseConnector()._getSubjectNameAttributeList()
name_attribute_list.append(
x509.NameAttribute(NameOID.COMMON_NAME,
# The cryptography library only accept Unicode.
self.getReference().decode('UTF-8')))
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(
name_attribute_list
)).sign(key, hashes.SHA256(), default_backend())
return csr.public_bytes(serialization.Encoding.PEM).decode()
def _getCaucaseConnector(self): def _getCaucaseConnector(self):
portal = self.getPortalObject() portal = self.getPortalObject()
connector_list = portal.portal_catalog.unrestrictedSearchResults( connector_list = portal.portal_catalog.unrestrictedSearchResults(
...@@ -94,7 +70,7 @@ class CertificateLoginMixin: ...@@ -94,7 +70,7 @@ class CertificateLoginMixin:
), _id, self.getParentValue().getReference("") ), _id, self.getParentValue().getReference("")
) )
self.setReference(reference) self.setReference(reference)
template_csr = self._getCertificateSigningRequestTemplate() template_csr = caucase_connector.getCertificateSigningRequestTemplate(reference)
csr_id = caucase_connector.createCertificateSigningRequest(csr) csr_id = caucase_connector.createCertificateSigningRequest(csr)
caucase_connector.createCertificate(csr_id, template_csr=template_csr) caucase_connector.createCertificate(csr_id, template_csr=template_csr)
...@@ -104,7 +80,7 @@ class CertificateLoginMixin: ...@@ -104,7 +80,7 @@ class CertificateLoginMixin:
return { return {
"certificate" : crt_pem, "certificate" : crt_pem,
"id" : self.getSourceReference(), "id" : self.getSourceReference(),
"common_name" : self.getReference() "common_name" : reference
} }
security.declarePublic('getCertificate') security.declarePublic('getCertificate')
......
...@@ -54,17 +54,15 @@ class TestCertificateAuthorityCaucaseConnector(ERP5TypeCaucaseTestCase): ...@@ -54,17 +54,15 @@ class TestCertificateAuthorityCaucaseConnector(ERP5TypeCaucaseTestCase):
def getBusinessTemplateList(self): def getBusinessTemplateList(self):
return ('erp5_base', 'erp5_web_service', 'erp5_certificate_authority') return ('erp5_base', 'erp5_web_service', 'erp5_certificate_authority')
def test_getConnection_no_url(self): def test_getServiceConnection_no_url(self):
connector_no_url_string = self.portal.portal_web_services.newContent( connector_no_url_string = self.portal.portal_web_services.newContent(
portal_type="Caucase Connector" portal_type="Caucase Connector"
) )
self.assertRaises(ValueError, connector_no_url_string._getConnection) self.assertRaises(ValueError, connector_no_url_string._getServiceConnection)
def test_getConnection(self): def test_getConnection(self):
self.assertNotEqual(None, self.caucase_connector._getConnection()) self.assertNotEqual(None, self.caucase_connector._getServiceConnection())
self.assertNotEqual(None, self.caucase_connector._getConnection(mode="service")) self.assertNotEqual(None, self.caucase_connector._getUserConnection())
self.assertNotEqual(None, self.caucase_connector._getConnection(mode="user"))
self.assertRaises(ValueError, self.caucase_connector._getConnection, "unknownmode")
def test_getAuthenticatedConnection_no_url(self): def test_getAuthenticatedConnection_no_url(self):
connector_no_url_string = self.portal.portal_web_services.newContent( connector_no_url_string = self.portal.portal_web_services.newContent(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment