Commit 22758524 authored by Thilo Cestonaro's avatar Thilo Cestonaro Committed by oroulet

Added security policy Basic256Sha256

Added deprecated warning to the implementation of Basic256 and
Basic128Rsa15 security policies.
Removed usage of security policies Basic256 and Basic128Rsa15 from server example
implementation.
Basic256 and Basic128Rsa15 use both the sha-1 algorithm.
This is considererd not secure anymore since OPC UA Spec 1.04.
See:
http://opcfoundation.org/UA-Profile/UA/SecurityPolicy%23Basic128Rsa15
http://opcfoundation.org/UA-Profile/UA/SecurityPolicy%23Basic256

For details on Basic256Sha256 security policy have a look here:
http://opcfoundation.org/UA-Profile/UA/SecurityPolicy%23Basic256Sha256
parent fae18c50
......@@ -25,7 +25,7 @@ if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
#client = Client("opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/")
#client.set_security_string("Basic256,SignAndEncrypt,certificate-example.der,private-key-example.pem")
#client.set_security_string("Basic256Sha256,SignAndEncrypt,certificate-example.der,private-key-example.pem")
try:
client.connect()
root = client.get_root_node()
......
......@@ -10,7 +10,7 @@ from opcua import Client
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
client.set_security_string("Basic256,Sign,certificate-example.der,private-key-example.pem")
client.set_security_string("Basic256Sha256,Sign,certificate-example.der,private-key-example.pem")
try:
client.connect()
root = client.get_root_node()
......
......@@ -74,10 +74,8 @@ if __name__ == "__main__":
# set all possible endpoint policies for clients to connect through
server.set_security_policy([
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt,
ua.SecurityPolicyType.Basic128Rsa15_Sign,
ua.SecurityPolicyType.Basic256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256_Sign])
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256Sha256_Sign])
# setup our own namespace
uri = "http://examples.freeopcua.github.io"
......
......@@ -99,10 +99,11 @@ class Client(object):
async def set_security_string(self, string: str):
"""
Set SecureConnection mode. String format:
"<Policy>,<Mode>,<certificate>,<private_key>[,<server_private_key>]"
where <Policy> is Basic128Rsa15 or Basic256,
<Mode> is Sign or SignAndEncrypt
<certificate>, <private_key> and <server_private_key> are paths to .pem or .der files
Policy,Mode,certificate,private_key[,server_private_key]
where Policy is Basic128Rsa15, Basic256 or Basic256Sha256,
Mode is Sign or SignAndEncrypt
certificate, private_key and server_private_key are
paths to .pem or .der files
Call this before connect()
"""
if not string:
......
import logging
from abc import ABCMeta, abstractmethod
from ..ua import CryptographyNone, SecurityPolicy, MessageSecurityMode, UaError
try:
......@@ -307,9 +309,63 @@ class DecryptorAesCbc(Decryptor):
def decrypt(self, data):
return uacrypto.cipher_decrypt(self.cipher, data)
class SignerSha256(Signer):
def __init__(self, client_pk):
require_cryptography(self)
self.client_pk = client_pk
self.key_size = self.client_pk.key_size // 8
def signature_size(self):
return self.key_size
def signature(self, data):
return uacrypto.sign_sha256(self.client_pk, data)
class VerifierSha256(Verifier):
def __init__(self, server_cert):
require_cryptography(self)
self.server_cert = server_cert
self.key_size = self.server_cert.public_key().key_size // 8
def signature_size(self):
return self.key_size
def verify(self, data, signature):
uacrypto.verify_sha256(self.server_cert, data, signature)
class SignerHMac256(Signer):
def __init__(self, key):
require_cryptography(self)
self.key = key
def signature_size(self):
return uacrypto.sha256_size()
def signature(self, data):
return uacrypto.hmac_sha256(self.key, data)
class VerifierHMac256(Verifier):
def __init__(self, key):
require_cryptography(self)
self.key = key
def signature_size(self):
return uacrypto.sha256_size()
def verify(self, data, signature):
expected = uacrypto.hmac_sha256(self.key, data)
if signature != expected:
raise uacrypto.InvalidSignature
class SecurityPolicyBasic128Rsa15(SecurityPolicy):
"""
DEPRECATED, do not use anymore!
Security Basic 128Rsa15
A suite of algorithms that uses RSA15 as Key-Wrap-algorithm
and 128-Bit (16 bytes) for encryption algorithms.
......@@ -344,6 +400,9 @@ class SecurityPolicyBasic128Rsa15(SecurityPolicy):
return uacrypto.encrypt_rsa15(pubkey, data)
def __init__(self, server_cert, client_cert, client_pk, mode):
logger = logging.getLogger(__name__)
logger.warning("DEPRECATED! Do not use SecurityPolicyBasic128Rsa15 anymore!")
require_cryptography(self)
if isinstance(server_cert, bytes):
server_cert = uacrypto.x509_from_der(server_cert)
......@@ -376,6 +435,8 @@ class SecurityPolicyBasic128Rsa15(SecurityPolicy):
class SecurityPolicyBasic256(SecurityPolicy):
"""
DEPRECATED, do not use anymore!
Security Basic 256
A suite of algorithms that are for 256-Bit (32 bytes) encryption,
algorithms include:
......@@ -410,6 +471,9 @@ class SecurityPolicyBasic256(SecurityPolicy):
return uacrypto.encrypt_rsa_oaep(pubkey, data)
def __init__(self, server_cert, client_cert, client_pk, mode):
logger = logging.getLogger(__name__)
logger.warning("DEPRECATED! Do not use SecurityPolicyBasic256 anymore!")
require_cryptography(self)
if isinstance(server_cert, bytes):
server_cert = uacrypto.x509_from_der(server_cert)
......@@ -440,6 +504,70 @@ class SecurityPolicyBasic256(SecurityPolicy):
self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey)
self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
class SecurityPolicyBasic256Sha256(SecurityPolicy):
"""
Security Basic 256Sha256
A suite of algorithms that uses Sha256 as Key-Wrap-algorithm
and 256-Bit (32 bytes) for encryption algorithms.
- SymmetricSignatureAlgorithm_HMAC-SHA2-256
https://tools.ietf.org/html/rfc4634
- SymmetricEncryptionAlgorithm_AES256-CBC
http://www.w3.org/2001/04/xmlenc#aes256-cbc
- AsymmetricSignatureAlgorithm_RSA-PKCS15-SHA2-256
http://www.w3.org/2001/04/xmldsig-more#rsa-sha256
- AsymmetricEncryptionAlgorithm_RSA-OAEP-SHA1
http://www.w3.org/2001/04/xmlenc#rsa-oaep
- KeyDerivationAlgorithm_P-SHA2-256
http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha256
- CertificateSignatureAlgorithm_RSA-PKCS15-SHA2-256
http://www.w3.org/2001/04/xmldsig-more#rsa-sha256
- Basic256Sha256_Limits
-> DerivedSignatureKeyLength: 256 bits
-> MinAsymmetricKeyLength: 2048 bits
-> MaxAsymmetricKeyLength: 4096 bits
-> SecureChannelNonceLength: 32 bytes
"""
URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic256Sha256"
signature_key_size = 32
symmetric_key_size = 32
AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-oaep"
@staticmethod
def encrypt_asymmetric(pubkey, data):
return uacrypto.encrypt_rsa_oaep(pubkey, data)
def __init__(self, server_cert, client_cert, client_pk, mode):
require_cryptography(self)
if isinstance(server_cert, bytes):
server_cert = uacrypto.x509_from_der(server_cert)
# even in Sign mode we need to asymmetrically encrypt secrets
# transmitted in OpenSecureChannel. So SignAndEncrypt here
self.asymmetric_cryptography = Cryptography(
MessageSecurityMode.SignAndEncrypt)
self.asymmetric_cryptography.Signer = SignerSha256(client_pk)
self.asymmetric_cryptography.Verifier = VerifierSha256(server_cert)
self.asymmetric_cryptography.Encryptor = EncryptorRsa(
server_cert, uacrypto.encrypt_rsa_oaep, 42)
self.asymmetric_cryptography.Decryptor = DecryptorRsa(
client_pk, uacrypto.decrypt_rsa_oaep, 42)
self.symmetric_cryptography = Cryptography(mode)
self.Mode = mode
self.server_certificate = uacrypto.der_from_x509(server_cert)
self.client_certificate = uacrypto.der_from_x509(client_cert)
def make_symmetric_key(self, nonce1, nonce2):
# specs part 6, 6.7.5
key_sizes = (self.signature_key_size, self.symmetric_key_size, 16)
(sigkey, key, init_vec) = uacrypto.p_sha256(nonce2, nonce1, key_sizes)
self.symmetric_cryptography.Signer = SignerHMac256(sigkey)
self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
(sigkey, key, init_vec) = uacrypto.p_sha256(nonce1, nonce2, key_sizes)
self.symmetric_cryptography.Verifier = VerifierHMac256(sigkey)
self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
def encrypt_asymmetric(pubkey, data, policy_uri):
"""
......@@ -447,7 +575,7 @@ def encrypt_asymmetric(pubkey, data, policy_uri):
The algorithm is selected by policy_uri.
Returns a tuple (encrypted_data, algorithm_uri)
"""
for cls in [SecurityPolicyBasic256, SecurityPolicyBasic128Rsa15]:
for cls in [SecurityPolicyBasic256Sha256, SecurityPolicyBasic256, SecurityPolicyBasic128Rsa15]:
if policy_uri == cls.URI:
return (cls.encrypt_asymmetric(pubkey, data),
cls.AsymmetricEncryptionURI)
......
......@@ -50,6 +50,12 @@ def sign_sha1(private_key, data):
hashes.SHA1()
)
def sign_sha256(private_key, data):
return private_key.sign(
data,
padding.PKCS1v15(),
hashes.SHA256()
)
def verify_sha1(certificate, data, signature):
certificate.public_key().verify(
......@@ -60,6 +66,13 @@ def verify_sha1(certificate, data, signature):
)
def verify_sha256(certificate, data, signature):
certificate.public_key().verify(
signature,
data,
padding.PKCS1v15(),
hashes.SHA256())
def encrypt_basic256(public_key, data):
ciphertext = public_key.encrypt(
data,
......@@ -128,10 +141,16 @@ def hmac_sha1(key, message):
hasher.update(message)
return hasher.finalize()
def hmac_sha256(key, message):
hasher = hmac.HMAC(key, hashes.SHA256(), backend=default_backend())
hasher.update(message)
return hasher.finalize()
def sha1_size():
return hashes.SHA1.digest_size
def sha256_size():
return hashes.SHA256.digest_size
def p_sha1(secret, seed, sizes=()):
"""
......@@ -155,6 +174,27 @@ def p_sha1(secret, seed, sizes=()):
result = result[size:]
return tuple(parts)
def p_sha256(secret, seed, sizes=()):
"""
Derive one or more keys from secret and seed.
(See specs part 6, 6.7.5 and RFC 2246 - TLS v1.0)
Lengths of keys will match sizes argument
"""
full_size = 0
for size in sizes:
full_size += size
result = b''
accum = seed
while len(result) < full_size:
accum = hmac_sha256(secret, accum)
result += hmac_sha256(secret, accum + seed)
parts = []
for size in sizes:
parts.append(result[:size])
result = result[size:]
return tuple(parts)
def x509_name_to_string(name):
parts = ["{0}={1}".format(attr.oid._name, attr.value) for attr in name]
......
......@@ -106,10 +106,16 @@ class Server:
status.SecondsTillShutdown = 0
await status_node.set_value(status)
await build_node.set_value(status.BuildInfo)
# enable all endpoints by default
self._security_policy = [
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256Sha256_Sign
]
self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.stop()
......@@ -208,6 +214,7 @@ class Server:
to the server, where security_policy is a list of integers.
During server initialization, all endpoints are enabled:
<<<<<<< HEAD
security_policy = [
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt,
......@@ -215,18 +222,31 @@ class Server:
ua.SecurityPolicyType.Basic256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256_Sign
]
=======
security_policy = [
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
ua.SecurityPolicyType.Basic256Sha256_Sign
]
>>>>>>> 10131b7... Added security policy Basic256Sha256
E.g. to limit the number of endpoints and disable no encryption:
<<<<<<< HEAD
set_security_policy([
ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt
ua.SecurityPolicyType.Basic256_SignAndEncrypt])
=======
set_security_policy([
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
>>>>>>> 10131b7... Added security policy Basic256Sha256
"""
self._security_policy = security_policy
def set_security_IDs(self, policyIDs):
"""
<<<<<<< HEAD
Method setting up the security endpoints for identification
of clients. During server object initialization, all possible
endpoints are enabled:
......@@ -234,6 +254,17 @@ class Server:
self._policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
E.g. to limit the number of IDs and disable anonymous clients:
=======
Method setting up the security endpoints for identification
of clients. During server object initialization, all possible
endpoints are enabled:
self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
E.g. to limit the number of IDs and disable anonymous clients:
set_security_policy(["Basic256Sha256"])
>>>>>>> 10131b7... Added security policy Basic256Sha256
set_security_policy(["Basic256"])
......@@ -255,22 +286,37 @@ class Server:
if ua.SecurityPolicyType.NoSecurity in self._security_policy:
self.logger.warning("Creating an open endpoint to the server, although encrypted endpoints are enabled.")
<<<<<<< HEAD
if ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
=======
if ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
>>>>>>> 10131b7... Added security policy Basic256Sha256
ua.MessageSecurityMode.SignAndEncrypt,
self.certificate,
self.private_key)
)
<<<<<<< HEAD
if ua.SecurityPolicyType.Basic128Rsa15_Sign in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
=======
if ua.SecurityPolicyType.Basic256Sha256_Sign in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
>>>>>>> 10131b7... Added security policy Basic256Sha256
ua.MessageSecurityMode.Sign,
self.certificate,
self.private_key)
)
<<<<<<< HEAD
if ua.SecurityPolicyType.Basic256_SignAndEncrypt in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt)
......@@ -287,6 +333,8 @@ class Server:
self.certificate,
self.private_key)
)
=======
>>>>>>> 10131b7... Added security policy Basic256Sha256
def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtokens = []
......@@ -296,6 +344,7 @@ class Server:
idtoken.TokenType = ua.UserTokenType.Anonymous
idtokens.append(idtoken)
<<<<<<< HEAD
if "Basic256" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = "certificate_basic256"
......@@ -305,6 +354,11 @@ class Server:
if "Basic128" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = "certificate_basic128"
=======
if "Basic256Sha256" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'certificate_basic256sha256'
>>>>>>> 10131b7... Added security policy Basic256Sha256
idtoken.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken)
......
......@@ -60,7 +60,7 @@ def add_common_args(parser, default_node='i=84', require_node=False):
default=0,
metavar="NAMESPACE")
parser.add_argument("--security",
help="Security settings, for example: Basic256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None",
help="Security settings, for example: Basic256Sha256,SignAndEncrypt,cert.der,pk.pem[,server_cert.der]. Default: None",
default='')
parser.add_argument("--user",
help="User name for authentication. Overrides the user name given in the URL.")
......
......@@ -954,6 +954,8 @@ class SecurityPolicyType(Enum):
"Basic128Rsa15_SignAndEncrypt"
"Basic256_Sign"
"Basic256_SignAndEncrypt"
"Basic256Sha256_Sign"
"Basic256Sha256_SignAndEncrypt"
"""
......@@ -962,3 +964,5 @@ class SecurityPolicyType(Enum):
Basic128Rsa15_SignAndEncrypt = 2
Basic256_Sign = 3
Basic256_SignAndEncrypt = 4
Basic256Sha256_Sign = 5
Basic256Sha256_SignAndEncrypt = 6
import unittest
from opcua import Client
from opcua import Server
from opcua import ua
try:
from opcua.crypto import uacrypto
from opcua.crypto import security_policies
except ImportError:
print("WARNING: CRYPTO NOT AVAILABLE, CRYPTO TESTS DISABLED!!")
disable_crypto_tests = True
else:
disable_crypto_tests = False
port_num1 = 48515
port_num2 = 48512
@unittest.skipIf(disable_crypto_tests, "crypto not available")
class TestCryptoConnect(unittest.TestCase):
'''
Test connectino with a server supporting crypto
'''
@classmethod
def setUpClass(self):
# start our own server
self.srv_crypto = Server()
self.uri_crypto = 'opc.tcp://127.0.0.1:{0:d}'.format(port_num1)
self.srv_crypto.set_endpoint(self.uri_crypto)
# load server certificate and private key. This enables endpoints
# with signing and encryption.
self.srv_crypto.load_certificate("examples/certificate-example.der")
self.srv_crypto.load_private_key("examples/private-key-example.pem")
self.srv_crypto.start()
# start a server without crypto
self.srv_no_crypto = Server()
self.uri_no_crypto = 'opc.tcp://127.0.0.1:{0:d}'.format(port_num2)
self.srv_no_crypto.set_endpoint(self.uri_no_crypto)
self.srv_no_crypto.start()
@classmethod
def tearDownClass(self):
# stop the server
self.srv_no_crypto.stop()
self.srv_crypto.stop()
def test_nocrypto(self):
clt = Client(self.uri_no_crypto)
clt.connect()
try:
clt.get_objects_node().get_children()
finally:
clt.disconnect()
def test_nocrypto_fail(self):
clt = Client(self.uri_no_crypto)
with self.assertRaises(ua.UaError):
clt.set_security_string("Basic256Sha256,Sign,examples/certificate-example.der,examples/private-key-example.pem")
def test_basic256sha256(self):
clt = Client(self.uri_crypto)
try:
clt.set_security_string("Basic256Sha256,Sign,examples/certificate-example.der,examples/private-key-example.pem")
clt.connect()
self.assertTrue(clt.get_objects_node().get_children())
finally:
clt.disconnect()
def test_basic256sha256_encrypt(self):
clt = Client(self.uri_crypto)
try:
clt.set_security_string("Basic256Sha256,SignAndEncrypt,examples/certificate-example.der,examples/private-key-example.pem")
clt.connect()
self.assertTrue(clt.get_objects_node().get_children())
finally:
clt.disconnect()
def test_basic256sha56_encrypt_success(self):
clt = Client(self.uri_crypto)
try:
clt.set_security(security_policies.SecurityPolicyBasic256Sha256,
'examples/certificate-example.der',
'examples/private-key-example.pem',
None,
ua.MessageSecurityMode.SignAndEncrypt
)
clt.connect()
self.assertTrue(clt.get_objects_node().get_children())
finally:
clt.disconnect()
def test_basic256sha56_encrypt_fail(self):
# FIXME: how to make it fail???
clt = Client(self.uri_crypto)
with self.assertRaises(ua.UaError):
clt.set_security(security_policies.SecurityPolicyBasic256Sha256,
'examples/certificate-example.der',
'examples/private-key-example.pem',
None,
ua.MessageSecurityMode.None_
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment