Commit 594dcad8 authored by Christoph Ziebuhr's avatar Christoph Ziebuhr Committed by oroulet

Improve usage of UserTokenPolicy.SecurityPolicyUri

Don't allow plaintext password to prevent BadSecurityModeInsufficient in some clients
parent d5ce251b
......@@ -704,7 +704,8 @@ class Client:
# then the password only contains UTF-8 encoded password
# and EncryptionAlgorithm is null
if self._password:
_logger.warning("Sending plain-text password")
if self.security_policy.Mode != ua.MessageSecurityMode.SignAndEncrypt:
_logger.warning("Sending plain-text password")
params.UserIdentityToken.Password = password.encode("utf8")
params.UserIdentityToken.EncryptionAlgorithm = None
elif self._password:
......
......@@ -382,40 +382,30 @@ class InternalServer:
"""
self.user_manager = user_manager
def check_user_token(self, isession, token):
def decrypt_user_token(self, isession, token):
"""
unpack the username and password for the benefit of the user defined user manager
"""
user_name = token.UserName
password = token.Password
# TODO Support all Token Types
# AnonimousIdentityToken
# UserIdentityToken
# UserNameIdentityToken
# X509IdentityToken
# IssuedIdentityToken
# TODO check if algorithm is allowed, throw BadSecurityPolicyRejected if not
# decrypt password if we can
if str(token.EncryptionAlgorithm) != "None":
try:
if token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-1_5":
raw_pw = uacrypto.decrypt_rsa15(self.private_key, password)
elif token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep":
raw_pw = uacrypto.decrypt_rsa_oaep(self.private_key, password)
elif token.EncryptionAlgorithm == "http://opcfoundation.org/UA/security/rsa-oaep-sha2-256":
raw_pw = uacrypto.decrypt_rsa_oaep_sha256(self.private_key, password)
else:
self.logger.warning("Unknown password encoding %s", token.EncryptionAlgorithm)
# raise # Should I raise a significant exception?
return user_name, password
length = unpack_from("<I", raw_pw)[0] - len(isession.nonce)
password = raw_pw[4 : 4 + length]
password = password.decode("utf-8")
except Exception:
self.logger.exception("Unable to decrypt password")
return False
elif isinstance(password, bytes): # TODO check
if token.EncryptionAlgorithm:
if token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-1_5":
raw_pw = uacrypto.decrypt_rsa15(self.private_key, password)
elif token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep":
raw_pw = uacrypto.decrypt_rsa_oaep(self.private_key, password)
elif token.EncryptionAlgorithm == "http://opcfoundation.org/UA/security/rsa-oaep-sha2-256":
raw_pw = uacrypto.decrypt_rsa_oaep_sha256(self.private_key, password)
else:
self.logger.warning("Unknown password encoding %s", token.EncryptionAlgorithm)
raise ValueError("Unknown password encoding")
length = unpack_from("<I", raw_pw)[0] - len(isession.nonce)
password = raw_pw[4 : 4 + length]
password = password.decode("utf-8")
elif isinstance(password, bytes):
password = password.decode("utf-8")
return user_name, password
......@@ -110,8 +110,6 @@ class InternalSession(AbstractSession):
raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
if InternalSession._current_connections >= InternalSession.max_connections:
raise ServiceError(ua.StatusCodes.BadMaxConnectionsReached)
self.nonce = create_nonce(32)
result.ServerNonce = self.nonce
for _ in params.ClientSoftwareCertificates:
result.Results.append(ua.StatusCode())
id_token = params.UserIdentityToken
......@@ -124,13 +122,19 @@ class InternalSession(AbstractSession):
self.logger.error("Rejected active session UserIdentityToken not supported")
raise ServiceError(ua.StatusCodes.BadIdentityTokenRejected)
if self.iserver.user_manager is not None:
if isinstance(id_token, ua.UserNameIdentityToken):
username, password = self.iserver.check_user_token(self, id_token)
elif isinstance(id_token, ua.X509IdentityToken):
peer_certificate = id_token.CertificateData
username, password = None, None
else:
username, password = None, None
try:
if isinstance(id_token, ua.UserNameIdentityToken):
username, password = self.iserver.decrypt_user_token(self, id_token)
elif isinstance(id_token, ua.X509IdentityToken):
# TODO implement verify_x509_token
peer_certificate = id_token.CertificateData
username, password = None, None
else:
username, password = None, None
except (ServiceError, ua.uaerrors.UaStatusCodeError):
raise
except Exception:
raise ServiceError(ua.StatusCodes.BadIdentityTokenInvalid)
user = self.iserver.user_manager.get_user(
self.iserver, username=username, password=password, certificate=peer_certificate
......@@ -139,6 +143,8 @@ class InternalSession(AbstractSession):
raise ServiceError(ua.StatusCodes.BadUserAccessDenied)
else:
self.user = user
self.nonce = create_nonce(32)
result.ServerNonce = self.nonce
self.state = SessionState.Activated
InternalSession._current_connections += 1
self.logger.info("Activated internal session %s for user %s", self.name, self.user)
......
......@@ -410,21 +410,41 @@ class Server:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = "anonymous"
idtoken.TokenType = ua.UserTokenType.Anonymous
idtoken.SecurityPolicyUri = policy.URI
idtoken.SecurityPolicyUri = security_policies.SecurityPolicyNone.URI
idtokens.append(idtoken)
if ua.X509IdentityToken in tokens:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = "certificate_basic256sha256"
idtoken.PolicyId = "certificate"
idtoken.TokenType = ua.UserTokenType.Certificate
idtoken.SecurityPolicyUri = policy.URI
# TODO request signing if mode == ua.MessageSecurityMode.None_ (also need to verify signature then)
idtokens.append(idtoken)
if ua.UserNameIdentityToken in tokens:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = "username"
idtoken.TokenType = ua.UserTokenType.UserName
idtoken.SecurityPolicyUri = policy.URI
if mode == ua.MessageSecurityMode.SignAndEncrypt:
# channel is encrypted, no need to encrypt password again
idtoken.SecurityPolicyUri = security_policies.SecurityPolicyNone.URI
elif mode == ua.MessageSecurityMode.Sign:
# use same policy for encryption
idtoken.SecurityPolicyUri = policy.URI
# try to avoid plaintext password, find first policy with encryption
elif self.certificate and self.iserver.private_key:
for token_policy_type in self._security_policy:
token_policy, token_mode, _ = security_policies.SECURITY_POLICY_TYPE_MAP[token_policy_type]
if token_mode != ua.MessageSecurityMode.SignAndEncrypt:
continue
idtoken.SecurityPolicyUri = token_policy.URI
break
else:
_logger.warning("No encrypting policy available, password may get transferred in plaintext")
idtoken.SecurityPolicyUri = security_policies.SecurityPolicyNone.URI
else:
_logger.warning("No encrypting policy available, password may get transferred in plaintext")
idtoken.SecurityPolicyUri = security_policies.SecurityPolicyNone.URI
idtokens.append(idtoken)
appdesc = ua.ApplicationDescription()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment