Commit 1cfd64bc authored by Rafael Monnerat's avatar Rafael Monnerat

erp5_certificate_authority: Use checkConsistency to validate url existence

  and implement other things nicer.
parent a4005367
......@@ -30,7 +30,8 @@ from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type.Globals import InitializeClass
from caucase.client import CaucaseClient, CaucaseError
from caucase.client import CaucaseClient, CaucaseHTTPError
from Products.ERP5Type.Core.Workflow import ValidationFailed
from six.moves import http_client
......@@ -42,27 +43,25 @@ from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
import tempfile
class CaucaseConnector(XMLObject):
meta_type = 'Caucase Connector'
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
def _getConnection(self, **kw):
message_list = self.checkConsistency()
if message_list:
raise ValidationFailed(message_list)
return CaucaseClient(**kw)
def _getServiceConnection(self, **kw):
# XXX Call checkConsistency
if self.getUrlString() is None:
raise ValueError("Caucase url must be defined")
return CaucaseClient(ca_url="%s/cas" % self.getUrlString(), **kw)
return self._getConnection(ca_url="%s/cas" % self.getUrlString(""), **kw)
def _getUserConnection(self, **kw):
# XXX Call checkConsistency
if self.getUrlString() is None:
raise ValueError("Caucase url must be defined")
return CaucaseClient(ca_url="%s/cau" % self.getUrlString(), **kw)
return self._getConnection(ca_url="%s/cau" % self.getUrlString(""), **kw)
def _getAuthenticatedConnection(self):
def _getAuthenticatedServiceConnection(self):
if self.getUserCertificate() is None:
if self.hasUserCertificateRequestReference():
self.bootstrapCaucaseConfiguration()
......@@ -103,11 +102,11 @@ class CaucaseConnector(XMLObject):
self.setUserCertificateRequestReference(csr_id)
self.setUserKey(key)
csr_id = int(self.getUserCertificateRequestReference())
csr_id = self.getUserCertificateRequestReference()
try:
crt_pem = caucase_connection.getCertificate(
csr_id=csr_id)
except CaucaseError as e:
except CaucaseHTTPError as e:
if e.args[0] != http_client.NOT_FOUND:
raise
......@@ -155,6 +154,11 @@ class CaucaseConnector(XMLObject):
)
name_attribute_list = self._getSubjectNameAttributeList()
name_attribute_list.append(
x509.NameAttribute(NameOID.COMMON_NAME,
# The cryptography library only accept Unicode.
"erp5-user".decode('UTF-8')))
# Probably we should extend a bit more the attributes.
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name(
name_attribute_list
......@@ -170,16 +174,16 @@ class CaucaseConnector(XMLObject):
security.declareProtected(Permissions.ManageUsers, 'createCertificate')
def createCertificate(self, csr_id, template_csr=""):
return self._getAuthenticatedConnection().createCertificate(csr_id, template_csr)
return self._getAuthenticatedServiceConnection().createCertificate(csr_id, template_csr)
security.declareProtected(Permissions.ManageUsers, 'getCertificate')
def getCertificate(self, csr_id):
return self._getAuthenticatedConnection().getCertificate(csr_id)
return self._getAuthenticatedServiceConnection().getCertificate(csr_id)
security.declareProtected(Permissions.ManageUsers, 'revokeCertificate')
def revokeCertificate(self, crt_pem, key_pem=None):
if key_pem is None:
return self._getAuthenticatedConnection().revokeCertificate(crt_pem)
return self._getAuthenticatedServiceConnection().revokeCertificate(crt_pem)
return self._getServiceConnection().revokeCertificate(crt_pem, key_pem)
InitializeClass(CaucaseConnector)
\ No newline at end of file
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Property Existence Constraint" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_identity_criterion</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>_local_properties</string> </key>
<value>
<tuple>
<dictionary>
<item>
<key> <string>id</string> </key>
<value> <string>message_property_not_set</string> </value>
</item>
<item>
<key> <string>type</string> </key>
<value> <string>string</string> </value>
</item>
</dictionary>
</tuple>
</value>
</item>
<item>
<key> <string>_range_criterion</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>constraint_property</string> </key>
<value>
<tuple>
<string>url_string</string>
</tuple>
</value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>url_string_existence_constraint</string> </value>
</item>
<item>
<key> <string>message_no_such_property</string> </key>
<value> <string>Url String must be set</string> </value>
</item>
<item>
<key> <string>message_property_not_set</string> </key>
<value>
<none/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -10,7 +10,7 @@
<key> <string>categories</string> </key>
<value>
<tuple>
<string>elementary_type/string</string>
<string>elementary_type/int</string>
</tuple>
</value>
</item>
......
......@@ -28,6 +28,7 @@
##############################################################################
from Products.ERP5Type.tests.ERP5TypeCaucaseTestCase import ERP5TypeCaucaseTestCase
from Products.ERP5Type.Core.Workflow import ValidationFailed
from cryptography import x509
from cryptography.hazmat.backends import default_backend
......@@ -58,24 +59,24 @@ class TestCertificateAuthorityCaucaseConnector(ERP5TypeCaucaseTestCase):
connector_no_url_string = self.portal.portal_web_services.newContent(
portal_type="Caucase Connector"
)
self.assertRaises(ValueError, connector_no_url_string._getServiceConnection)
self.assertRaises(ValidationFailed, connector_no_url_string._getServiceConnection)
def test_getConnection(self):
self.assertNotEqual(None, self.caucase_connector._getServiceConnection())
self.assertNotEqual(None, self.caucase_connector._getUserConnection())
def test_getAuthenticatedConnection_no_url(self):
def test_getAuthenticatedServiceConnection_no_url(self):
connector_no_url_string = self.portal.portal_web_services.newContent(
portal_type="Caucase Connector"
)
self.assertRaises(ValueError, connector_no_url_string._getAuthenticatedConnection)
self.assertRaises(ValueError, connector_no_url_string._getAuthenticatedServiceConnection)
def test_getAuthenticatedConnection_with_url(self):
def test_getAuthenticatedServiceConnection_with_url(self):
connector_no_url_string = self.portal.portal_web_services.newContent(
portal_type="Caucase Connector",
url_string="https://hasurl.but.no.user_certificate"
)
self.assertRaises(ValueError, connector_no_url_string._getAuthenticatedConnection)
self.assertRaises(ValueError, connector_no_url_string._getAuthenticatedServiceConnection)
def test(self):
# Simply test
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment