Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
7618777d
Commit
7618777d
authored
Feb 01, 2016
by
ORD
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #128 from alkor/fix-password-encryption
Use correct password encryption algorithm in client
parents
5c13c813
f45fbd49
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
41 additions
and
3 deletions
+41
-3
opcua/client/client.py
opcua/client/client.py
+18
-3
opcua/crypto/security_policies.py
opcua/crypto/security_policies.py
+23
-0
No files found.
opcua/client/client.py
View file @
7618777d
...
...
@@ -335,6 +335,20 @@ class Client(object):
return
policy
.
PolicyId
return
default
def
server_policy_uri
(
self
,
token_type
):
"""
Find SecurityPolicyUri of server's UserTokenPolicy by token_type.
If SecurityPolicyUri is empty, use default SecurityPolicyUri
of the endpoint
"""
for
policy
in
self
.
_policy_ids
:
if
policy
.
TokenType
==
token_type
:
if
policy
.
SecurityPolicyUri
:
return
policy
.
SecurityPolicyUri
else
:
# empty URI means "use this endpoint's policy URI"
return
self
.
security_policy
.
URI
return
self
.
security_policy
.
URI
def
activate_session
(
self
,
username
=
None
,
password
=
None
,
certificate
=
None
):
"""
Activate session using either username and password or private_key
...
...
@@ -365,11 +379,12 @@ class Client(object):
# see specs part 4, 7.36.3: if the token is encrypted, password
# shall be converted to UTF-8 and serialized with server nonce
etoken
=
ua
.
pack_bytes
(
bytes
(
password
,
"utf8"
)
+
self
.
_server_nonce
)
#data = uacrypto.encrypt_basic256(pubkey, etoken)
data
=
uacrypto
.
encrypt_rsa_oaep
(
pubkey
,
etoken
)
(
data
,
uri
)
=
security_policies
.
encrypt_asymmetric
(
pubkey
,
etoken
,
self
.
server_policy_uri
(
ua
.
UserTokenType
.
UserName
))
params
.
UserIdentityToken
.
Password
=
data
params
.
UserIdentityToken
.
EncryptionAlgorithm
=
uri
params
.
UserIdentityToken
.
PolicyId
=
self
.
server_policy_id
(
ua
.
UserTokenType
.
UserName
,
b"username_basic256"
)
params
.
UserIdentityToken
.
EncryptionAlgorithm
=
'http://www.w3.org/2001/04/xmlenc#rsa-oaep'
return
self
.
uaclient
.
activate_session
(
params
)
def
close_session
(
self
):
...
...
opcua/crypto/security_policies.py
View file @
7618777d
...
...
@@ -334,6 +334,11 @@ class SecurityPolicyBasic128Rsa15(SecurityPolicy):
URI
=
"http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15"
signature_key_size
=
16
symmetric_key_size
=
16
AsymmetricEncryptionURI
=
"http://www.w3.org/2001/04/xmlenc#rsa-1_5"
@
staticmethod
def
encrypt_asymmetric
(
pubkey
,
data
):
return
uacrypto
.
encrypt_rsa15
(
pubkey
,
data
)
def
__init__
(
self
,
server_cert
,
client_cert
,
client_pk
,
mode
):
require_cryptography
(
self
)
...
...
@@ -395,6 +400,11 @@ class SecurityPolicyBasic256(SecurityPolicy):
URI
=
"http://opcfoundation.org/UA/SecurityPolicy#Basic256"
signature_key_size
=
24
symmetric_key_size
=
32
AsymmetricEncryptionURI
=
"http://www.w3.org/2001/04/xmlenc#rsa-oaep"
@
staticmethod
def
encrypt_asymmetric
(
pubkey
,
data
):
return
uacrypto
.
encrypt_rsa_oaep
(
pubkey
,
data
)
def
__init__
(
self
,
server_cert
,
client_cert
,
client_pk
,
mode
):
require_cryptography
(
self
)
...
...
@@ -426,3 +436,16 @@ class SecurityPolicyBasic256(SecurityPolicy):
(
sigkey
,
key
,
init_vec
)
=
uacrypto
.
p_sha1
(
nonce1
,
nonce2
,
key_sizes
)
self
.
symmetric_cryptography
.
Verifier
=
VerifierAesCbc
(
sigkey
)
self
.
symmetric_cryptography
.
Decryptor
=
DecryptorAesCbc
(
key
,
init_vec
)
def
encrypt_asymmetric
(
pubkey
,
data
,
policy_uri
):
"""
Encrypt data with pubkey using an asymmetric algorithm.
The algorithm is selected by policy_uri.
Returns a tuple (encrypted_data, algorithm_uri)
"""
for
cls
in
[
SecurityPolicyBasic256
,
SecurityPolicyBasic128Rsa15
]:
if
policy_uri
==
cls
.
URI
:
return
(
cls
.
encrypt_asymmetric
(
pubkey
,
data
),
cls
.
AsymmetricEncryptionURI
)
raise
UaError
(
"Unsupported security policy `{}`"
.
format
(
uri
))
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment