Commit 8b665fbe authored by nic's avatar nic

option to disable anonymous connections to server

parent 72629f5e
......@@ -99,6 +99,12 @@ class Server(object):
sa_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_ServerArray))
sa_node.set_value([self._application_uri])
self.security_endpoints = ["None", "Basic128Rsa15_Sign",
"Basic128Rsa15_SignAndEncrypt",
"Basic256_Sign", "Basic256_SignAndEncrypt"]
self.policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
def __enter__(self):
self.start()
return self
......@@ -201,54 +207,68 @@ class Server(object):
def _setup_server_nodes(self):
# to be called just before starting server since it needs all parameters to be setup
self._set_endpoints()
self._policies = [ua.SecurityPolicyFactory()]
if "None" in self.security_endpoints:
self._set_endpoints()
self._policies = [ua.SecurityPolicyFactory()]
if self.certificate and self.private_key:
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
if "Basic128Rsa15_Sign" in self.security_endpoints:
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt,
self.certificate,
self.private_key)
)
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
if "Basic128Rsa15_SignAndEncrypt" in self.security_endpoints:
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign,
self.certificate,
self.private_key)
)
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
if "Basic256_Sign" in self.security_endpoints:
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt,
self.certificate,
self.private_key)
)
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
if "Basic256_SignAndEncrypt" in self.security_endpoints:
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.Sign,
self.certificate,
self.private_key)
)
def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'anonymous'
idtoken.TokenType = ua.UserTokenType.Anonymous
idtoken2 = ua.UserTokenPolicy()
idtoken2.PolicyId = 'certificate_basic256'
idtoken2.TokenType = ua.UserTokenType.Certificate
idtoken3 = ua.UserTokenPolicy()
idtoken3.PolicyId = 'certificate_basic128'
idtoken3.TokenType = ua.UserTokenType.Certificate
idtoken4 = ua.UserTokenPolicy()
idtoken4.PolicyId = 'username'
idtoken4.TokenType = ua.UserTokenType.UserName
idtokens = []
if "Anonymous" in self.policyIDs:
idtoken1 = ua.UserTokenPolicy()
idtoken1.PolicyId = 'anonymous'
idtoken1.TokenType = ua.UserTokenType.Anonymous
idtokens.append(idtoken1)
if "Basic256" in self.policyIDs:
idtoken2 = ua.UserTokenPolicy()
idtoken2.PolicyId = 'certificate_basic256'
idtoken2.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken2)
if "Basic128" in self.policyIDs:
idtoken3 = ua.UserTokenPolicy()
idtoken3.PolicyId = 'certificate_basic128'
idtoken3.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken3)
if "Username" in self.policyIDs:
idtoken4 = ua.UserTokenPolicy()
idtoken4.PolicyId = 'username'
idtoken4.TokenType = ua.UserTokenType.UserName
idtokens.append(idtoken4)
appdesc = ua.ApplicationDescription()
appdesc.ApplicationName = ua.LocalizedText(self.name)
......@@ -264,7 +284,7 @@ class Server(object):
edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
edp.SecurityMode = mode
edp.SecurityPolicyUri = policy.URI
edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
edp.UserIdentityTokens = idtokens
edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
edp.SecurityLevel = 0
self.iserver.add_endpoint(edp)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment