Commit e1dc3be2 authored by bitkeeper's avatar bitkeeper Committed by oroulet

Fix missing SecurityLevel of the EndpointDescription

According to Part 4 - 7.10 this should be an a number that indicates how secure the EndPoint is. The number is Server implementation specific, how higher the number how saver.
parent 61134250
......@@ -402,6 +402,63 @@ class Server:
ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key,
permission_ruleset=self._permission_ruleset))
@staticmethod
def lookup_security_level_for_policy_type( security_policy_type: ua.SecurityPolicyType ) -> ua.Byte:
"""Returns the security level for an ua.SecurityPolicyType.
This is endpoint & server implementation specific!
Returns:
ua.Byte: the found security level
"""
return ua.Byte({
ua.SecurityPolicyType.NoSecurity : 0,
ua.SecurityPolicyType.Basic128Rsa15_Sign : 1,
ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt : 2,
ua.SecurityPolicyType.Basic256_Sign : 11,
ua.SecurityPolicyType.Basic256_SignAndEncrypt : 21,
ua.SecurityPolicyType.Basic256Sha256_Sign : 50,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt : 70,
ua.SecurityPolicyType.Aes128Sha256RsaOaep_Sign : 55,
ua.SecurityPolicyType.Aes128Sha256RsaOaep_SignAndEncrypt : 75
}[security_policy_type])
@staticmethod
def determine_security_level(security_policy_uri:str, security_mode: ua.MessageSecurityMode) -> ua.Byte:
"""Determine the security level of an EndPoint.
The security level indicates how secure an EndPoint is, compared to other EndPoints of the same server.
Value 0 is a special value; EndPoint isn't recommended, typical for ua.MessageSecurityMode.None_.
See Part 4 7.10
To the determine the level the value of ua.SecurityPolicyType is used.
The enum values already correspond to and
The Enum ua.SecurityPolicyType already contains a value per Enum entry that already correspond to
Args:
security_policy (ua.SecurityPolicy): the used policy
security_mode (ua.MessageSecurityMode): the used security mode for the messages.
Returns:
ua.Byte: the returned security level
"""
security_level: ua.Byte = ua.Byte(0)
if security_mode != ua.MessageSecurityMode.None_:
security_policy_name = f'{security_policy_uri.split("#")[1].replace("_","")}_{security_mode.name}'
try:
security_policy_type: ua.SecurityPolicyType = ua.SecurityPolicyType[security_policy_name]
security_level = Server.lookup_security_level_for_policy_type(security_policy_type)
except KeyError:
_logger.error('"%s" isn\'t a valid security policy', security_policy_name)
return security_level
def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtokens = []
supported_token_classes = []
......@@ -442,7 +499,7 @@ class Server:
edp.SecurityPolicyUri = policy.URI
edp.UserIdentityTokens = idtokens
edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
edp.SecurityLevel = 0
edp.SecurityLevel = Server.determine_security_level(policy.URI, mode)
self.iserver.add_endpoint(edp)
self.iserver.supported_tokens = tuple(supported_token_classes)
......
......@@ -452,3 +452,27 @@ async def test_anonymous_rejection():
with pytest.raises(ua.uaerrors.BadIdentityTokenRejected):
await clt.connect()
await srv.stop()
async def test_security_level_all():
assert Server.determine_security_level(ua.SecurityPolicy.URI, ua.MessageSecurityMode.None_) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.NoSecurity)
assert Server.determine_security_level(security_policies.SecurityPolicyBasic256Sha256.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic256Sha256_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyBasic256Sha256.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt)
assert Server.determine_security_level(security_policies.SecurityPolicyAes128Sha256RsaOaep.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Aes128Sha256RsaOaep_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyAes128Sha256RsaOaep.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Aes128Sha256RsaOaep_SignAndEncrypt)
# For the sake of completeness also the old, not recommended, protocols Basic128Rsa15 and Basic256
assert Server.determine_security_level(security_policies.SecurityPolicyBasic128Rsa15.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic128Rsa15_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyBasic128Rsa15.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt)
assert Server.determine_security_level(security_policies.SecurityPolicyBasic256.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic256_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyBasic256.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic256_SignAndEncrypt)
async def test_security_level_endpoints(srv_crypto_all_certs):
srv = srv_crypto_all_certs[0]
end_points: list[ua.EndpointDescription] = await srv.get_endpoints()
for end_point in end_points:
assert end_point.SecurityLevel == Server.determine_security_level(end_point.SecurityPolicyUri, end_point.SecurityMode)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment