Commit 7c247f65 authored by nic's avatar nic Committed by Christian Bergmiller

cherry pick/merge 8b665fbe

parent 75dc638f
......@@ -91,6 +91,10 @@ class Server:
self.private_key = None
self._policies = []
self.nodes = Shortcuts(self.iserver.isession)
self.security_endpoints = ["None", "Basic128Rsa15_Sign",
"Basic128Rsa15_SignAndEncrypt",
"Basic256_Sign", "Basic256_SignAndEncrypt"]
self.policyIDs = ["Anonymous", "Basic256", "Basic128", "Username"]
async def init(self, shelf_file=None):
await self.iserver.init(shelf_file)
......@@ -208,63 +212,68 @@ class Server:
async def _setup_server_nodes(self):
# to be called just before starting server since it needs all parameters to be setup
self._set_endpoints()
self._policies = [ua.SecurityPolicyFactory()]
if self.certificate and self.private_key:
self._set_endpoints(
security_policies.SecurityPolicyBasic128Rsa15, ua.MessageSecurityMode.SignAndEncrypt
)
self._policies.append(
ua.SecurityPolicyFactory(
security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt,
self.certificate,
self.private_key)
)
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15, ua.MessageSecurityMode.Sign)
self._policies.append(
ua.SecurityPolicyFactory(
security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign,
self.certificate,
self.private_key
)
)
self._set_endpoints(security_policies.SecurityPolicyBasic256, ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(
ua.SecurityPolicyFactory(
security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt,
self.certificate,
self.private_key
)
)
self._set_endpoints(security_policies.SecurityPolicyBasic256, ua.MessageSecurityMode.Sign)
self._policies.append(
ua.SecurityPolicyFactory(
security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.Sign,
self.certificate,
self.private_key
)
)
if "None" in self.security_endpoints:
self._set_endpoints()
self._policies = [ua.SecurityPolicyFactory()]
if self.certificate and self.private_key:
if "Basic128Rsa15_Sign" in self.security_endpoints:
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.SignAndEncrypt,
self.certificate,
self.private_key)
)
if "Basic128Rsa15_SignAndEncrypt" in self.security_endpoints:
self._set_endpoints(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic128Rsa15,
ua.MessageSecurityMode.Sign,
self.certificate,
self.private_key)
)
if "Basic256_Sign" in self.security_endpoints:
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.SignAndEncrypt,
self.certificate,
self.private_key)
)
if "Basic256_SignAndEncrypt" in self.security_endpoints:
self._set_endpoints(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.Sign)
self._policies.append(ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256,
ua.MessageSecurityMode.Sign,
self.certificate,
self.private_key)
)
def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtoken = ua.UserTokenPolicy()
idtoken.PolicyId = 'anonymous'
idtoken.TokenType = ua.UserTokenType.Anonymous
idtoken2 = ua.UserTokenPolicy()
idtoken2.PolicyId = 'certificate_basic256'
idtoken2.TokenType = ua.UserTokenType.Certificate
idtoken3 = ua.UserTokenPolicy()
idtoken3.PolicyId = 'certificate_basic128'
idtoken3.TokenType = ua.UserTokenType.Certificate
idtoken4 = ua.UserTokenPolicy()
idtoken4.PolicyId = 'username'
idtoken4.TokenType = ua.UserTokenType.UserName
idtokens = []
if "Anonymous" in self.policyIDs:
idtoken1 = ua.UserTokenPolicy()
idtoken1.PolicyId = 'anonymous'
idtoken1.TokenType = ua.UserTokenType.Anonymous
idtokens.append(idtoken1)
if "Basic256" in self.policyIDs:
idtoken2 = ua.UserTokenPolicy()
idtoken2.PolicyId = 'certificate_basic256'
idtoken2.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken2)
if "Basic128" in self.policyIDs:
idtoken3 = ua.UserTokenPolicy()
idtoken3.PolicyId = 'certificate_basic128'
idtoken3.TokenType = ua.UserTokenType.Certificate
idtokens.append(idtoken3)
if "Username" in self.policyIDs:
idtoken4 = ua.UserTokenPolicy()
idtoken4.PolicyId = 'username'
idtoken4.TokenType = ua.UserTokenType.UserName
idtokens.append(idtoken4)
appdesc = ua.ApplicationDescription()
appdesc.ApplicationName = ua.LocalizedText(self.name)
......@@ -280,7 +289,7 @@ class Server:
edp.ServerCertificate = uacrypto.der_from_x509(self.certificate)
edp.SecurityMode = mode
edp.SecurityPolicyUri = policy.URI
edp.UserIdentityTokens = [idtoken, idtoken2, idtoken3, idtoken4]
edp.UserIdentityTokens = idtokens
edp.TransportProfileUri = 'http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary'
edp.SecurityLevel = 0
self.iserver.add_endpoint(edp)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment