Commit 08a4fd19 authored by antoine_galataud's avatar antoine_galataud Committed by oroulet

add support for Aes256Sha256RsaPss security policy

parent 6e4a83c9
......@@ -427,6 +427,32 @@ class VerifierHMac256(Verifier):
raise uacrypto.InvalidSignature
class SignerPssSha256(Signer):
def __init__(self, client_pk):
require_cryptography(self)
self.client_pk = client_pk
self.key_size = self.client_pk.key_size // 8
def signature_size(self):
return self.key_size
def signature(self, data):
return uacrypto.sign_pss_sha256(self.client_pk, data)
class VerifierPssSha256(Verifier):
def __init__(self, server_cert):
require_cryptography(self)
self.server_cert = server_cert
self.key_size = self.server_cert.public_key().key_size // 8
def signature_size(self):
return self.key_size
def verify(self, data, signature):
uacrypto.verify_pss_sha256(self.server_cert, data, signature)
class SecurityPolicyAes128Sha256RsaOaep(SecurityPolicy):
"""
Security Aes128 Sha256 RsaOaep
......@@ -513,6 +539,91 @@ class SecurityPolicyAes128Sha256RsaOaep(SecurityPolicy):
self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
class SecurityPolicyAes256Sha256RsaPss(SecurityPolicy):
"""Security policy Aes256_Sha256_RsaPss implementation
- SymmetricSignatureAlgorithm_HMAC-SHA2-256
https://tools.ietf.org/html/rfc4634
- SymmetricEncryptionAlgorithm_AES256-CBC
http://www.w3.org/2001/04/xmlenc#aes256-cbc
- AsymmetricSignatureAlgorithm_RSA-PSS-SHA2-256
http://www.w3.org/2001/04/xmldsig-more#rsa-sha256
- AsymmetricEncryptionAlgorithm_RSA-OAEP-SHA2-256
http://www.w3.org/2001/04/xmlenc#rsa-oaep
- KeyDerivationAlgorithm_P-SHA2-256
http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha256
- CertificateSignatureAlgorithm_RSA-PKCS15-SHA2-256
http://www.w3.org/2001/04/xmldsig-more#rsa-sha256
- Aes256Sha256RsaPss_Limits
-> DerivedSignatureKeyLength: 256 bits
-> MinAsymmetricKeyLength: 2048 bits
-> MaxAsymmetricKeyLength: 4096 bits
-> SecureChannelNonceLength: 32 bytes
"""
URI = "http://opcfoundation.org/UA/SecurityPolicy#Aes256_Sha256_RsaPss"
signature_key_size = 32
symmetric_key_size = 32
secure_channel_nonce_length = 32
AsymmetricEncryptionURI = "http://opcfoundation.org/UA/security/rsa-oaep-sha2-256"
AsymmetricSignatureURI = "http://opcfoundation.org/UA/security/rsa-pss-sha2-256"
@staticmethod
def encrypt_asymmetric(pubkey, data):
return uacrypto.encrypt_rsa_oaep_sha256(pubkey, data)
def __init__(self, peer_cert, host_cert, client_pk, mode, permission_ruleset=None):
require_cryptography(self)
if isinstance(peer_cert, bytes):
peer_cert = uacrypto.x509_from_der(peer_cert)
# even in Sign mode we need to asymmetrically encrypt secrets
# transmitted in OpenSecureChannel. So SignAndEncrypt here
self.asymmetric_cryptography = Cryptography(MessageSecurityMode.SignAndEncrypt)
self.asymmetric_cryptography.Signer = SignerPssSha256(client_pk)
self.asymmetric_cryptography.Verifier = VerifierPssSha256(peer_cert)
self.asymmetric_cryptography.Encryptor = EncryptorRsa(
peer_cert, uacrypto.encrypt_rsa_oaep_sha256, 66
)
self.asymmetric_cryptography.Decryptor = DecryptorRsa(
client_pk, uacrypto.decrypt_rsa_oaep_sha256, 66
)
self.symmetric_cryptography = Cryptography(mode)
self.Mode = mode
self.peer_certificate = uacrypto.der_from_x509(peer_cert)
self.host_certificate = uacrypto.der_from_x509(host_cert)
if permission_ruleset is None:
from asyncua.crypto.permission_rules import SimpleRoleRuleset
permission_ruleset = SimpleRoleRuleset()
self.permissions = permission_ruleset
def make_local_symmetric_key(self, secret, seed):
# specs part 6, 6.7.5
key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
(sigkey, key, init_vec) = uacrypto.p_sha256(secret, seed, key_sizes)
self.symmetric_cryptography.Signer = SignerHMac256(sigkey)
self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
def make_remote_symmetric_key(self, secret, seed, lifetime):
# specs part 6, 6.7.5
key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
(sigkey, key, init_vec) = uacrypto.p_sha256(secret, seed, key_sizes)
if self.symmetric_cryptography.Verifier or self.symmetric_cryptography.Decryptor:
self.symmetric_cryptography.Prev_Verifier = self.symmetric_cryptography.Verifier
self.symmetric_cryptography.Prev_Decryptor = self.symmetric_cryptography.Decryptor
self.symmetric_cryptography.prev_key_expiration = (
self.symmetric_cryptography.key_expiration
)
# lifetime is in ms
self.symmetric_cryptography.key_expiration = time.time() + (lifetime * 0.001)
self.symmetric_cryptography.Verifier = VerifierHMac256(sigkey)
self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
class SecurityPolicyBasic128Rsa15(SecurityPolicy):
"""
DEPRECATED, do not use anymore!
......@@ -786,7 +897,8 @@ def encrypt_asymmetric(pubkey, data, policy_uri):
Returns a tuple (encrypted_data, algorithm_uri)
"""
for cls in [SecurityPolicyBasic256Sha256, SecurityPolicyBasic256,
SecurityPolicyBasic128Rsa15, SecurityPolicyAes128Sha256RsaOaep]:
SecurityPolicyBasic128Rsa15, SecurityPolicyAes128Sha256RsaOaep,
SecurityPolicyAes256Sha256RsaPss]:
if policy_uri == cls.URI:
return (cls.encrypt_asymmetric(pubkey, data),
cls.AsymmetricEncryptionURI)
......
......@@ -114,6 +114,17 @@ def sign_sha256(private_key, data):
)
def sign_pss_sha256(private_key, data):
return private_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256(),
)
def verify_sha1(certificate, data, signature):
certificate.public_key().verify(
signature,
......@@ -131,6 +142,18 @@ def verify_sha256(certificate, data, signature):
hashes.SHA256())
def verify_pss_sha256(certificate, data, signature):
certificate.public_key().verify(
signature,
data,
padding.PSS(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256(),
)
def encrypt_basic256(public_key, data):
ciphertext = public_key.encrypt(
data,
......@@ -153,6 +176,18 @@ def encrypt_rsa_oaep(public_key, data):
return ciphertext
def encrypt_rsa_oaep_sha256(public_key, data):
ciphertext = public_key.encrypt(
data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
),
)
return ciphertext
def encrypt_rsa15(public_key, data):
ciphertext = public_key.encrypt(
data,
......@@ -172,6 +207,18 @@ def decrypt_rsa_oaep(private_key, data):
return text
def decrypt_rsa_oaep_sha256(private_key, data):
text = private_key.decrypt(
bytes(data),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
),
)
return text
def decrypt_rsa15(private_key, data):
text = private_key.decrypt(
bytes(data),
......
......@@ -103,11 +103,12 @@ class Server:
self._security_policy = [
ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256Sha256_Sign, ua.SecurityPolicyType.Aes128Sha256RsaOaep_SignAndEncrypt,
ua.SecurityPolicyType.Aes128Sha256RsaOaep_Sign
ua.SecurityPolicyType.Aes128Sha256RsaOaep_Sign, ua.SecurityPolicyType.Aes256Sha256RsaPss_Sign,
ua.SecurityPolicyType.Aes256Sha256RsaPss_SignAndEncrypt
]
# allow all certificates by default
self._permission_ruleset = None
self._policyIDs = ["Anonymous", "Basic256Sha256", "Username", "Aes128Sha256RsaOaep"]
self._policyIDs = ["Anonymous", "Basic256Sha256", "Username", "Aes128Sha256RsaOaep", "Aes256Sha256RsaPss"]
self.certificate: Optional[x509.Certificate] = None
# Use acceptable limits
buffer_sz = 65535
......@@ -414,6 +415,20 @@ class Server:
ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt, self.certificate, self.iserver.private_key,
permission_ruleset=self._permission_ruleset))
if ua.SecurityPolicyType.Aes256Sha256RsaPss_SignAndEncrypt in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyAes256Sha256RsaPss,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(
ua.SecurityPolicyFactory(security_policies.SecurityPolicyAes256Sha256RsaPss,
ua.MessageSecurityMode.SignAndEncrypt, self.certificate,
self.iserver.private_key,
permission_ruleset=self._permission_ruleset))
if ua.SecurityPolicyType.Aes256Sha256RsaPss_Sign in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyAes256Sha256RsaPss, ua.MessageSecurityMode.Sign)
self._policies.append(
ua.SecurityPolicyFactory(security_policies.SecurityPolicyAes256Sha256RsaPss,
ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key,
permission_ruleset=self._permission_ruleset))
@staticmethod
def lookup_security_level_for_policy_type(security_policy_type: ua.SecurityPolicyType) -> ua.Byte:
......@@ -434,7 +449,9 @@ class Server:
ua.SecurityPolicyType.Basic256Sha256_Sign: 50,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt: 70,
ua.SecurityPolicyType.Aes128Sha256RsaOaep_Sign: 55,
ua.SecurityPolicyType.Aes128Sha256RsaOaep_SignAndEncrypt: 75
ua.SecurityPolicyType.Aes128Sha256RsaOaep_SignAndEncrypt: 75,
ua.SecurityPolicyType.Aes256Sha256RsaPss_Sign: 60,
ua.SecurityPolicyType.Aes256Sha256RsaPss_SignAndEncrypt: 80
}[security_policy_type])
@staticmethod
......
......@@ -1272,7 +1272,10 @@ class SecurityPolicyType(Enum):
"Basic256_SignAndEncrypt"
"Basic256Sha256_Sign"
"Basic256Sha256_SignAndEncrypt"
"Aes128_Sha256_RsaOaep_Sign"
"Aes128_Sha256_RsaOaep_SignAndEncrypt"
"Aes256_Sha256_RsaPss_Sign"
"Aes256_Sha256_RsaPss_SignAndEncrypt"
"""
NoSecurity = 0
......@@ -1284,3 +1287,5 @@ class SecurityPolicyType(Enum):
Basic256Sha256_SignAndEncrypt = 6
Aes128Sha256RsaOaep_Sign = 7
Aes128Sha256RsaOaep_SignAndEncrypt = 8
Aes256Sha256RsaPss_Sign = 9
Aes256Sha256RsaPss_SignAndEncrypt = 10
\ No newline at end of file
......@@ -254,6 +254,22 @@ async def test_Aes128Sha256RsaOaep_encrypt_success(srv_crypto_all_certs):
assert await clt.nodes.objects.get_children()
async def test_Aes256Sha256RsaPss_encrypt_success(srv_crypto_all_certs):
clt = Client(uri_crypto)
_, cert = srv_crypto_all_certs
await clt.set_security(
security_policies.SecurityPolicyAes256Sha256RsaPss,
f"{EXAMPLE_PATH / 'certificate-example.der'}",
f"{EXAMPLE_PATH / 'private-key-example.pem'}",
None,
cert,
ua.MessageSecurityMode.SignAndEncrypt
)
async with clt:
assert await clt.nodes.objects.get_children()
async def test_certificate_handling_success(srv_crypto_one_cert):
_, cert = srv_crypto_one_cert
clt = Client(uri_crypto_cert)
......@@ -492,6 +508,9 @@ async def test_security_level_all():
assert Server.determine_security_level(security_policies.SecurityPolicyAes128Sha256RsaOaep.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Aes128Sha256RsaOaep_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyAes128Sha256RsaOaep.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Aes128Sha256RsaOaep_SignAndEncrypt)
assert Server.determine_security_level(security_policies.SecurityPolicyAes256Sha256RsaPss.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Aes256Sha256RsaPss_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyAes256Sha256RsaPss.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Aes256Sha256RsaPss_SignAndEncrypt)
# For the sake of completeness also the old, not recommended, protocols Basic128Rsa15 and Basic256
assert Server.determine_security_level(security_policies.SecurityPolicyBasic128Rsa15.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic128Rsa15_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyBasic128Rsa15.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment