Commit 3759134a authored by nic's avatar nic

docstrings, private vars, public methods for setting

parent 8b665fbe
......@@ -99,11 +99,11 @@ class Server(object):
sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
sa_node.set_value([self._application_uri])
self.security_endpoints = ["None", "Basic128Rsa15_Sign",
# enable all endpoints by default
self._security_policy = ["None", "Basic128Rsa15_Sign",
"Basic128Rsa15_SignAndEncrypt",
"Basic256_Sign", "Basic256_SignAndEncrypt"]
self.policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
self._policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
def __enter__(self):
self.start()
......@@ -205,14 +205,57 @@ class Server(object):
def get_endpoints(self):
return self.iserver.get_endpoints()
def set_security_policy(self, security_policy):
"""
Method setting up the security policies for connections
to the server. During server object initialization, all
possible endpoints are enabled:
security_policy = ["None",
"Basic128Rsa15_Sign",
"Basic128Rsa15_SignAndEncrypt",
"Basic256_Sign",
"Basic256_SignAndEncrypt"]
where security_policy is a list of strings. "None" enables an
endpoint without any security.
E.g. to limit the number of endpoints and disable no encryption:
set_security_policy(["Basic256_Sign",
"Basic256_SignAndEncrypt"])
"""
self._security_policy = security_policy
def set_security_IDs(self, policyIDs):
"""
Method setting up the security endpoints for identification
of clients. During server object initialization, all possible
endpoints are enabled:
self._policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
E.g. to limit the number of IDs and disable anonymous clients:
set_security_policy(["Basic256"])
(Implementation for ID check is currently not finalized...)
"""
self._policyIDs = policyIDs
def _setup_server_nodes(self):
# to be called just before starting server since it needs all parameters to be setup
if "None" in self.security_endpoints:
if "None" in self._security_policy:
self._set_endpoints()
self._policies = [ua.SecurityPolicyFactory()]
if (len(self._security_policy)>1) and self.private_key:
self.logger.warning("Creating an open endpoint to the server, although encrypted endpoints are enabled.")
if self.certificate and self.private_key:
if "Basic128Rsa15_Sign" in self.security_endpoints:
if "Basic128Rsa15_Sign" in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
......@@ -220,7 +263,7 @@ class Server(object):
self.certificate,
self.private_key)
)
if "Basic128Rsa15_SignAndEncrypt" in self.security_endpoints:
if "Basic128Rsa15_SignAndEncrypt" in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
......@@ -228,7 +271,7 @@ class Server(object):
self.certificate,
self.private_key)
)
if "Basic256_Sign" in self.security_endpoints:
if "Basic256_Sign" in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
......@@ -236,7 +279,7 @@ class Server(object):
self.certificate,
self.private_key)
)
if "Basic256_SignAndEncrypt" in self.security_endpoints:
if "Basic256_SignAndEncrypt" in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
......@@ -244,31 +287,32 @@ class Server(object):
self.certificate,
self.private_key)
)
def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtokens = []
if "Anonymous" in self.policyIDs:
idtoken1 = ua.UserTokenPolicy()
idtoken1.PolicyId = 'anonymous'
idtoken1.TokenType = ua.UserTokenType.Anonymous
idtokens.append(idtoken1)
if "Basic256" in self.policyIDs:
idtoken2 = ua.UserTokenPolicy()
idtoken2.PolicyId = 'certificate_basic256'
idtoken2.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken2)
if "Basic128" in self.policyIDs:
idtoken3 = ua.UserTokenPolicy()
idtoken3.PolicyId = 'certificate_basic128'
idtoken3.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken3)
if "Username" in self.policyIDs:
idtoken4 = ua.UserTokenPolicy()
idtoken4.PolicyId = 'username'
idtoken4.TokenType = ua.UserTokenType.UserName
idtokens.append(idtoken4)
if "Anonymous" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'anonymous'
idtoken.TokenType = ua.UserTokenType.Anonymous
idtokens.append(idtoken)
if "Basic256" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'certificate_basic256'
idtoken.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken)
if "Basic128" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'certificate_basic128'
idtoken.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken)
if "Username" in self._policyIDs:
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'username'
idtoken.TokenType = ua.UserTokenType.UserName
idtokens.append(idtoken)
appdesc = ua.ApplicationDescription()
appdesc.ApplicationName = ua.LocalizedText(self.name)
......@@ -306,7 +350,6 @@ class Server(object):
self.iserver.stop()
raise exp
def stop(self):
"""
Stop server
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment