Commit 92a7318e authored by Łukasz Nowak's avatar Łukasz Nowak

Simplify signature certificate check.

Except list of PEM encoded certificate strings. Do not try to read them from
file nor download from network.

Also fix few naming errors.
parent 29ece754
......@@ -55,7 +55,7 @@ class NetworkcacheClient(object):
def __init__(self, shacache, shadir,
signature_private_key_file=None,
signature_certificate_file_list=None):
signature_certificate_list=None):
''' Set the initial values. '''
# ShaCache Properties
for k, v in self.parseUrl(shacache).iteritems():
......@@ -68,16 +68,7 @@ class NetworkcacheClient(object):
setattr(self, 'shadir_%s' % k, v)
self.signature_private_key_file = signature_private_key_file
self.signature_certificate_file_list = []
self.signature_certificate_url_list = []
if signature_certificate_file_list is not None:
# Split the path and urls
for value in signature_certificate_file_list:
if os.path.exists(value):
self.signature_certificate_file_list.append(value)
elif value.startswith('http'):
self.signature_certificate_url_list.append(value)
self.signature_certificate_list = signature_certificate_list
def upload(self, file_descriptor, key=None, urlmd5=None, file_name=None,
valid_until=None, architecture=None):
......@@ -154,11 +145,11 @@ class NetworkcacheClient(object):
headers=self.shadir_header_dict)
return urllib2.urlopen(request)
def select(self, urlmd5):
def select(self, key):
''' Download a file from shacache by selecting the entry in shadir
Raise DirectoryNotFound if multiple files are found.
'''
url = os.path.join(self.shadir_url, urlmd5)
url = os.path.join(self.shadir_url, key)
file_descriptor = tempfile.NamedTemporaryFile()
request = urllib2.Request(url=url, data=None,
headers=self.shadir_header_dict)
......@@ -166,16 +157,15 @@ class NetworkcacheClient(object):
# Filtering...
data_list = json.loads(data)
if self.signature_certificate_file_list or \
self.signature_certificate_url_list:
method = self._verifySignatureInCertificateList
data_list = filter(lambda x: method(x[1]), data_list)
if self.signature_certificate_list is not None:
data_list = filter(lambda x: self._verifySignatureInCertificateList(
x[1]), data_list)
if not data_list:
raise DirectoryNotFound('Could not find a trustable entry.')
if len(data_list) > 1:
raise DirectoryNotFound('Too many entries for a given key. ' \
'Directory: %s. Entries: %s.' % (urlmd5, str(data_list)))
raise DirectoryNotFound('Too many entries for a given key %r. ' \
'Entries: %s.' % (key, str(data_list)))
information_dict, signature = data_list[0]
sha512 = information_dict.get('sha512')
......@@ -185,7 +175,7 @@ class NetworkcacheClient(object):
"""
Return the signature based on certification file.
"""
if not self.signature_private_key_file:
if self.signature_private_key_file is None:
return ''
SignEVP = M2Crypto.EVP.load_key(self.signature_private_key_file)
......@@ -198,38 +188,27 @@ class NetworkcacheClient(object):
"""
Returns true if it can find any valid certificate or false if it does not
find any.
It must check the local certificate files first before checking the files
which are available under HTTP.
"""
for certificate_path in self.signature_certificate_file_list:
if self._verifySignatureCertificate(signature_string, certificate_path):
return True
for certificate_url in self.signature_certificate_url_list:
file_descriptor = self._fetchCertificateFileFromUrl(certificate_url)
try:
file_name = file_descriptor.name
if self._verifySignatureCertificate(signature_string, file_name):
for certificate in self.signature_certificate_list:
if self._verifySignatureCertificate(signature_string, certificate):
return True
finally:
file_descriptor.close()
return False
def _verifySignatureCertificate(self, signature_string, certificate_path):
def _verifySignatureCertificate(self, signature_string, certificate):
""" verify if the signature is valid for a given certificate. """
PubKey = M2Crypto.X509.load_cert(certificate_path)
certificate_file = tempfile.NamedTemporaryFile()
certificate_file.write(certificate)
certificate_file.flush()
certificate_file.seek(0)
try:
PubKey = M2Crypto.X509.load_cert(certificate_file.name)
VerifyEVP = M2Crypto.EVP.PKey()
VerifyEVP.assign_rsa(PubKey.get_pubkey().get_rsa())
VerifyEVP.verify_init()
VerifyEVP.verify_update('')
return VerifyEVP.verify_final(signature_string.decode('base64'))
def _fetchCertificateFileFromUrl(self, certification_file_url):
""" Download the certification files from the url. """
return urllib2.urlopen(certification_file_url)
finally:
certificate_file.close()
class DirectoryNotFound(Exception):
pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment