Commit d77fe07b authored by Rafael Monnerat's avatar Rafael Monnerat

erp5_certificate_authority: Verify if the returned certificated is from the expected CA

parent 5f5485e3
...@@ -32,6 +32,7 @@ from Products.ERP5Type.XMLObject import XMLObject ...@@ -32,6 +32,7 @@ from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type.Globals import InitializeClass from Products.ERP5Type.Globals import InitializeClass
from caucase.client import CaucaseClient, CaucaseHTTPError from caucase.client import CaucaseClient, CaucaseHTTPError
from Products.ERP5Type.Core.Workflow import ValidationFailed from Products.ERP5Type.Core.Workflow import ValidationFailed
from caucase.utils import load_ca_certificate, load_certificate
from six.moves import http_client from six.moves import http_client
...@@ -201,6 +202,17 @@ class CaucaseConnector(XMLObject): ...@@ -201,6 +202,17 @@ class CaucaseConnector(XMLObject):
ca_crt_file.seek(0) ca_crt_file.seek(0)
self.setCaCertificateChain(ca_crt_file.read()) self.setCaCertificateChain(ca_crt_file.read())
security.declareProtected(Permissions.ManageUsers, 'verifyCertificate')
def verifyCertificate(self, crt_pem):
if not self.getCaCertificateChain():
self.updateCACertificateChain()
# Here we are just checking if the certificate is valid, and if the
# certificate was issued from a ca we expect, otherwise it will just fail.
load_certificate(
crt_pem, [load_ca_certificate(self.getCaCertificateChain())], [])
return crt_pem
def createCertificateSigningRequest(self, csr): def createCertificateSigningRequest(self, csr):
return self._getServiceConnection().createCertificateSigningRequest(csr) return self._getServiceConnection().createCertificateSigningRequest(csr)
...@@ -210,7 +222,8 @@ class CaucaseConnector(XMLObject): ...@@ -210,7 +222,8 @@ class CaucaseConnector(XMLObject):
security.declareProtected(Permissions.ManageUsers, 'getCertificate') security.declareProtected(Permissions.ManageUsers, 'getCertificate')
def getCertificate(self, csr_id): def getCertificate(self, csr_id):
return self._getAuthenticatedServiceConnection().getCertificate(csr_id) return self.verifyCertificate(
self._getAuthenticatedServiceConnection().getCertificate(csr_id))
security.declareProtected(Permissions.ManageUsers, 'revokeCertificate') security.declareProtected(Permissions.ManageUsers, 'revokeCertificate')
def revokeCertificate(self, crt_pem, key_pem=None): def revokeCertificate(self, crt_pem, key_pem=None):
......
...@@ -31,6 +31,8 @@ from Products.ERP5Type.tests.ERP5TypeCaucaseTestCase import ERP5TypeCaucaseTestC ...@@ -31,6 +31,8 @@ from Products.ERP5Type.tests.ERP5TypeCaucaseTestCase import ERP5TypeCaucaseTestC
from Products.ERP5Type.Core.Workflow import ValidationFailed from Products.ERP5Type.Core.Workflow import ValidationFailed
from caucase.client import CaucaseError from caucase.client import CaucaseError
from caucase.exceptions import CertificateVerificationError
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
...@@ -178,3 +180,37 @@ PB8= ...@@ -178,3 +180,37 @@ PB8=
-----END CERTIFICATE----- -----END CERTIFICATE-----
""") """)
self.assertRaises(CaucaseError, self.caucase_connector.updateCACertificateChain) self.assertRaises(CaucaseError, self.caucase_connector.updateCACertificateChain)
def test_untrust(self):
# Simply test
key, csr = self.caucase_connector._createCertificateRequest()
# Only simple test for the order of response dont change
self.assertIn("PRIVATE KEY", key)
self.assertIn("CERTIFICATE REQUEST", csr)
self.caucase_connector.setCaCertificateChain("""-----BEGIN CERTIFICATE-----
MIIDXjCCAkagAwIBAgIUWur7vpjLtzdWTuaBVQtzgEnDNegwDQYJKoZIhvcNAQEL
BQAwNTEzMDEGA1UEAwwqQ2F1Y2FzZSBDQVMgYXQgaHR0cDovLzEwLjAuNzcuMjI3
Ojg4OTAvY2FzMB4XDTIzMTAwMzE5MTM0NloXDTI0MTAwOTE5MTM0NlowNTEzMDEG
A1UEAwwqQ2F1Y2FzZSBDQVMgYXQgaHR0cDovLzEwLjAuNzcuMjI3Ojg4OTAvY2Fz
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoUPOUx/glzpxe1lmD2vq
ZS5UlOR7oeBoNsdmFpuikZ6ksQvVlnQehsRwvCa8plOWC01ob/NqcVbTqhUCEcnf
LL7y8wqD4qg1wTBOEQ9T2BjNSfY+y5UxGDiTqKSYCre+OY5jWipwNUGXZ7rsQPvU
ExUP/itu1E8vDe9c6uCVq5IR+SJvwwwgB4LwCl14xRpKmkoRcduJFI51mjQmG1/u
q9dbBffZXddEQGZwrjvHXgCMfEccfyPU67PVuyCX6q/1pX3HCxaFR1Z2QVHa2MqV
wjPxqbxOVBK/3oXAVYUS9ksGWxzFdzyDZwPi714sUjUhI/0UholZslQniWhNWp+P
xwIDAQABo2YwZDAdBgNVHQ4EFgQU6xc8HvOdfmnhZ85cxFlfecnVBNAwHwYDVR0j
BBgwFoAU6xc8HvOdfmnhZ85cxFlfecnVBNAwEgYDVR0TAQH/BAgwBgEB/wIBADAO
BgNVHQ8BAf8EBAMCAQYwDQYJKoZIhvcNAQELBQADggEBAGLjwIByLsnohRAx7qVX
2o8d8UvzUXEDTmx2NStYTX53nPu+ajngPV+qr7n7e6PD6xLyNp585aH7P1jt9ZDE
i4JrbtUSl8toB1hizBJeWG4BTRfJ/70ojOEhn/BodhoCIo/Qzn9cuLCjfMXbDhlK
ySrBjKOrG9nl16sT5iao5lJJw2KqzDB7e1SKvBwwILtO74VwdkdUO9itUkP7d6Do
LSnalc7gqVsf8BAlymRktQuDUXZzP3AbWNH6c7ihhNqsP8npKdA/Z4rWCTtIHj+P
YvI3c9Ftc8ACdjv7cMHEdtRmxCYLxIitkfr2wG2sWbGmHoUVjGQdvAjBq8iyMY4q
PB8=
-----END CERTIFICATE-----
""")
csr_id = self.caucase_connector.createCertificateSigningRequest(csr)
self.caucase_connector.createCertificate(csr_id)
self.assertRaises(CertificateVerificationError, self.caucase_connector.getCertificate, csr_id)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment