Commit a2b2f851 authored by Rafael Monnerat's avatar Rafael Monnerat

slapos_cloud: use cryptography library to extract certificate information

   The string (certificate) dont contains metadata text anymore only the certificate itself.
parent 33a44549
......@@ -24,7 +24,8 @@ from time import sleep
from zExceptions import Unauthorized
from unittest import expectedFailure
from Products.ERP5Type.Errors import UnsupportedWorkflowMethod
from cryptography import x509
from cryptography.x509.oid import NameOID
class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
......@@ -60,10 +61,15 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
certificate_login = certificate_login_list[0]
self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertTrue(serial in compute_node_certificate)
self.assertTrue(certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
self.assertNotEqual(certificate_login.getSourceReference(), None)
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
def test_generateCertificate_twice(self):
self.login(self.compute_node.getUserId())
......@@ -72,18 +78,23 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate')
self.assertNotEqual(None, compute_node_key)
self.assertNotEqual(None, compute_node_certificate)
self.assertEqual(None, self.compute_node.getDestinationReference())
self.assertEqual(None, self.compute_node.getSourceReference())
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
self.assertEqual(len(certificate_login_list), 1)
certificate_login = certificate_login_list[0]
self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertTrue(serial in compute_node_certificate)
self.assertTrue(certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
self.assertNotEqual(certificate_login.getSourceReference(), None)
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
self.assertRaises(ValueError, self.compute_node.generateCertificate)
self.assertEqual(None, self.portal.REQUEST.get('compute_node_key'))
self.assertEqual(None, self.portal.REQUEST.get('compute_node_certificate'))
......@@ -272,10 +283,16 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
certificate_login = certificate_login_list[0]
self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertTrue(serial in compute_node_certificate)
self.assertTrue(certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
self.assertNotEqual(certificate_login.getSourceReference(), None)
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference())
......@@ -290,7 +307,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
self.assertRaises(ValueError, self.compute_node.revokeCertificate)
self.assertEqual(None, self.portal.REQUEST.get('compute_node_key'))
self.assertEqual(None, self.portal.REQUEST.get('compute_node_certificate'))
self.assertEqual(None, self.compute_node.getDestinationReference())
self.assertEqual(None, self.compute_node.getSourceReference())
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
self.assertEqual(len(certificate_login_list), 0)
......@@ -306,13 +323,18 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
certificate_login = certificate_login_list[0]
self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertTrue(serial in compute_node_certificate)
self.assertTrue(certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
self.assertNotEqual(certificate_login.getSourceReference(), None)
self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference())
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
self.compute_node.revokeCertificate()
self.assertEqual(None, self.portal.REQUEST.get('compute_node_key'))
self.assertEqual(None, self.portal.REQUEST.get('compute_node_certificate'))
......@@ -337,17 +359,22 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
self.assertEqual(len(certificate_login_list), 1)
certificate_login = certificate_login_list[0]
destination_reference = certificate_login.getDestinationReference()
source_reference = certificate_login.getSourceReference()
self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertTrue(serial in compute_node_certificate)
self.assertTrue(certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
self.assertNotEqual(certificate_login.getSourceReference(), None)
self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference())
self.assertNotEqual(None, destination_reference)
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
self.assertNotEqual(None, source_reference)
self.compute_node.revokeCertificate()
self.compute_node.generateCertificate()
......@@ -358,7 +385,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
self.assertNotEqual(compute_node_certificate, self.portal.REQUEST.get('compute_node_certificate'))
self.assertEqual(certificate_login.getValidationState(), 'invalidated')
self.assertEqual(certificate_login.getDestinationReference(), destination_reference)
self.assertEqual(certificate_login.getSourceReference(), source_reference)
self.assertNotEqual(certificate_login.getReference(), None)
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
......@@ -366,21 +393,25 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
new_certificate_login = [i for i in certificate_login_list \
if i.getId() != certificate_login.getId()][0]
destination_reference = certificate_login.getDestinationReference()
source_reference = certificate_login.getSourceReference()
self.assertEqual(new_certificate_login.getValidationState(), 'validated')
self.assertNotEqual(new_certificate_login.getReference(), None)
self.assertNotEqual(new_certificate_login.getReference(),
certificate_login.getReference())
self.assertNotEqual(new_certificate_login.getDestinationReference(), None)
self.assertNotEqual(new_certificate_login.getDestinationReference(),
certificate_login.getDestinationReference())
serial = '0x%x' % int(new_certificate_login.getDestinationReference(), 16)
self.assertNotEqual(new_certificate_login.getSourceReference(), None)
self.assertNotEqual(new_certificate_login.getSourceReference(),
certificate_login.getSourceReference())
compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate')
self.assertTrue(serial in compute_node_certificate)
self.assertTrue(new_certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
self.assertFalse(certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(new_certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference())
......@@ -396,17 +427,19 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
self.assertEqual(len(certificate_login_list), 1)
certificate_login = certificate_login_list[0]
destination_reference = certificate_login.getDestinationReference()
source_reference = certificate_login.getSourceReference()
self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertTrue(serial in compute_node_certificate)
self.assertTrue(certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
self.assertNotEqual(certificate_login.getSourceReference(), None)
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference())
self.assertNotEqual(None, destination_reference)
self.assertNotEqual(None, source_reference)
self.compute_node.revokeCertificate()
self.compute_node.generateCertificate()
......@@ -417,7 +450,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
self.assertNotEqual(compute_node_certificate, self.portal.REQUEST.get('compute_node_certificate'))
self.assertEqual(certificate_login.getValidationState(), 'invalidated')
self.assertEqual(certificate_login.getDestinationReference(), destination_reference)
self.assertEqual(certificate_login.getSourceReference(), source_reference)
self.assertNotEqual(certificate_login.getReference(), None)
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
......@@ -425,22 +458,21 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
new_certificate_login = [i for i in certificate_login_list \
if i.getId() != certificate_login.getId()][0]
destination_reference = certificate_login.getDestinationReference()
source_reference = certificate_login.getSourceReference()
self.assertEqual(new_certificate_login.getValidationState(), 'validated')
self.assertNotEqual(new_certificate_login.getReference(), None)
self.assertNotEqual(new_certificate_login.getReference(),
certificate_login.getReference())
self.assertNotEqual(new_certificate_login.getDestinationReference(), None)
self.assertNotEqual(new_certificate_login.getDestinationReference(),
certificate_login.getDestinationReference())
serial = '0x%x' % int(new_certificate_login.getDestinationReference(), 16)
self.assertNotEqual(new_certificate_login.getSourceReference(), None)
self.assertNotEqual(new_certificate_login.getSourceReference(),
certificate_login.getSourceReference())
compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate')
self.assertTrue(serial in compute_node_certificate)
self.assertTrue(new_certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
self.assertFalse(certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(new_certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference())
......@@ -453,7 +485,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
self.assertNotEqual(compute_node_certificate, self.portal.REQUEST.get('compute_node_certificate'))
self.assertEqual(new_certificate_login.getValidationState(), 'invalidated')
self.assertNotEqual(new_certificate_login.getDestinationReference(), destination_reference)
self.assertNotEqual(new_certificate_login.getSourceReference(), source_reference)
self.assertNotEqual(new_certificate_login.getReference(), None)
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
......@@ -462,22 +494,22 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
third_certificate_login = [i for i in certificate_login_list \
if i.getId() not in [certificate_login.getId(), new_certificate_login.getId()]][0]
destination_reference = new_certificate_login.getDestinationReference()
source_reference = new_certificate_login.getSourceReference()
self.assertEqual(third_certificate_login.getValidationState(), 'validated')
self.assertNotEqual(third_certificate_login.getReference(), None)
self.assertNotEqual(third_certificate_login.getReference(),
certificate_login.getReference())
self.assertNotEqual(third_certificate_login.getDestinationReference(), None)
self.assertNotEqual(third_certificate_login.getDestinationReference(),
new_certificate_login.getDestinationReference())
serial = '0x%x' % int(third_certificate_login.getDestinationReference(), 16)
self.assertNotEqual(third_certificate_login.getSourceReference(), None)
self.assertNotEqual(third_certificate_login.getSourceReference(),
new_certificate_login.getSourceReference())
compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate')
self.assertTrue(serial in compute_node_certificate)
self.assertTrue(third_certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
self.assertFalse(new_certificate_login.getReference() in compute_node_certificate.decode('string_escape'))
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(third_certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(new_certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(third_certificate_login.getReference(),
self.compute_node.getReference())
......
......@@ -22,10 +22,12 @@ from erp5.component.test.SlapOSTestCaseMixin import SlapOSTestCaseMixin
import transaction
from time import sleep
from zExceptions import Unauthorized
from cryptography import x509
from cryptography.x509.oid import NameOID
class TestSlapOSCoreInstanceSlapInterfaceWorkflow(SlapOSTestCaseMixin):
"""Tests instance.requestInstance"""
launch_caucase = 1
def afterSetUp(self):
......@@ -1207,7 +1209,6 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin):
self.software_instance.generateCertificate()
self.assertNotEqual(self.software_instance.getSslKey(), None)
self.assertNotEqual(self.software_instance.getSslCertificate(), None)
self.assertEqual(self.software_instance.getDestinationReference(), None)
certificate_login_list = self.software_instance.objectValues(portal_type="Certificate Login")
self.assertEqual(len(certificate_login_list), 1)
......@@ -1215,11 +1216,11 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin):
self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertTrue(serial in self.software_instance.getSslCertificate())
self.assertTrue(certificate_login.getReference() in \
self.software_instance.getSslCertificate().decode('string_escape'))
self.assertNotEqual(certificate_login.getSourceReference(), None)
ssl_certificate = x509.load_pem_x509_certificate(self.software_instance.getSslCertificate())
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
self.assertRaises(ValueError, self.software_instance.generateCertificate)
def test_revokeCertificate(self):
......@@ -1248,7 +1249,7 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin):
certificate_login = certificate_login_list[0]
self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None)
self.assertNotEqual(certificate_login.getSourceReference(), None)
self.assertNotEqual(self.software_instance.getSslKey(),
ssl_key)
......@@ -1275,11 +1276,11 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin):
self.assertEqual(another_certificate_login.getValidationState(), 'validated')
self.assertNotEqual(another_certificate_login.getReference(), None)
self.assertNotEqual(another_certificate_login.getDestinationReference(), None)
self.assertNotEqual(another_certificate_login.getSourceReference(), None)
self.assertEqual(certificate_login.getValidationState(), 'invalidated')
self.assertNotEqual(certificate_login.getReference(),
another_certificate_login.getReference())
self.assertNotEqual(certificate_login.getDestinationReference(),
another_certificate_login.getDestinationReference())
self.assertNotEqual(certificate_login.getSourceReference(),
another_certificate_login.getSourceReference())
......@@ -554,7 +554,7 @@ class TestPerson_get_Certificate(TestSlapOSHalJsonStyleMixin):
self.assertSameSet(response_dict.keys(), ["common_name", "certificate", "id", "key"])
self.assertEqual(response_dict["id"], login.getDestinationReference())
self.assertEqual(response_dict["id"], login.getSourceReference())
self.assertEqual(json.dumps(response_dict["common_name"]), json.dumps(login.getReference()))
self.assertEqual(self.portal.REQUEST.RESPONSE.getStatus(), 200)
......@@ -568,7 +568,7 @@ class TestPerson_get_Certificate(TestSlapOSHalJsonStyleMixin):
self.assertEqual("validated" , login.getValidationState())
self.assertEqual("validated" , new_login.getValidationState())
self.assertNotEqual(login.getReference(), new_login.getReference())
self.assertNotEqual(login.getDestinationReference(), new_login.getDestinationReference())
self.assertNotEqual(login.getSourceReference(), new_login.getSourceReference())
self.assertSameSet(new_response_dict.keys(), ["common_name", "certificate", "id", "key"])
self.assertEqual(json.dumps(new_response_dict["common_name"]), json.dumps(new_login.getReference()))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment