Commit ac108cca authored by nic's avatar nic Committed by Christian Bergmiller

cherry pick/merge 3759134a

parent 951b0ca5
......@@ -91,10 +91,10 @@ class Server:
self.private_key = None
self._policies = []
self.nodes = Shortcuts(self.iserver.isession)
self.security_endpoints = [
self._security_endpoints = [
"None", "Basic128Rsa15_Sign", "Basic128Rsa15_SignAndEncrypt", "Basic256_Sign", "Basic256_SignAndEncrypt"
]
self.policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
self._policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
async def init(self, shelf_file=None):
await self.iserver.init(shelf_file)
......@@ -210,14 +210,57 @@ class Server:
def get_endpoints(self):
return self.iserver.get_endpoints()
def set_security_policy(self, security_policy):
"""
Method setting up the security policies for connections
to the server. During server object initialization, all
possible endpoints are enabled:
security_policy = ["None",
"Basic128Rsa15_Sign",
"Basic128Rsa15_SignAndEncrypt",
"Basic256_Sign",
"Basic256_SignAndEncrypt"]
where security_policy is a list of strings. "None" enables an
endpoint without any security.
E.g. to limit the number of endpoints and disable no encryption:
set_security_policy(["Basic256_Sign",
"Basic256_SignAndEncrypt"])
"""
self._security_policy = security_policy
def set_security_IDs(self, policyIDs):
"""
Method setting up the security endpoints for identification
of clients. During server object initialization, all possible
endpoints are enabled:
self._policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
E.g. to limit the number of IDs and disable anonymous clients:
set_security_policy(["Basic256"])
(Implementation for ID check is currently not finalized...)
"""
self._policyIDs = policyIDs
async def _setup_server_nodes(self):
# to be called just before starting server since it needs all parameters to be setup
if "None" in self.security_endpoints:
if "None" in self._security_policy:
self._set_endpoints()
self._policies = [ua.SecurityPolicyFactory()]
if (len(self._security_policy)>1) and self.private_key:
self.logger.warning("Creating an open endpoint to the server, although encrypted endpoints are enabled.")
if self.certificate and self.private_key:
if "Basic128Rsa15_Sign" in self.security_endpoints:
if "Basic128Rsa15_Sign" in self._security_endpoints:
self._set_endpoints(
security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt)
......@@ -229,7 +272,7 @@ class Server:
self.private_key
)
)
if "Basic128Rsa15_SignAndEncrypt" in self.security_endpoints:
if "Basic128Rsa15_SignAndEncrypt" in self._security_endpoints:
self._set_endpoints(
security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign)
......@@ -241,7 +284,7 @@ class Server:
self.private_key
)
)
if "Basic256_Sign" in self.security_endpoints:
if "Basic256_Sign" in self._security_endpoints:
self._set_endpoints(
security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt)
......@@ -253,7 +296,7 @@ class Server:
self.private_key
)
)
if "Basic256_SignAndEncrypt" in self.security_endpoints:
if "Basic256_SignAndEncrypt" in self._security_endpoints:
self._set_endpoints(
security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.Sign)
......@@ -268,29 +311,29 @@ class Server:
def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtokens = []
if "Anonymous" in self.policyIDs:
idtoken1 = ua.UserTokenPolicy()
idtoken1.PolicyId = 'anonymous'
idtoken1.TokenType = ua.UserTokenType.Anonymous
idtokens.append(idtoken1)
if "Basic256" in self.policyIDs:
idtoken2 = ua.UserTokenPolicy()
idtoken2.PolicyId = 'certificate_basic256'
idtoken2.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken2)
if "Basic128" in self.policyIDs:
idtoken3 = ua.UserTokenPolicy()
idtoken3.PolicyId = 'certificate_basic128'
idtoken3.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken3)
if "Username" in self.policyIDs:
idtoken4 = ua.UserTokenPolicy()
idtoken4.PolicyId = 'username'
idtoken4.TokenType = ua.UserTokenType.UserName
idtokens.append(idtoken4)
if "Anonymous" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'anonymous'
idtoken.TokenType = ua.UserTokenType.Anonymous
idtokens.append(idtoken)
if "Basic256" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'certificate_basic256'
idtoken.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken)
if "Basic128" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'certificate_basic128'
idtoken.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken)
if "Username" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'username'
idtoken.TokenType = ua.UserTokenType.UserName
idtokens.append(idtoken)
appdesc = ua.ApplicationDescription()
appdesc.ApplicationName = ua.LocalizedText(self.name)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment