Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
d410c7e1
Commit
d410c7e1
authored
Oct 11, 2024
by
Christoph Ziebuhr
Committed by
oroulet
Dec 04, 2024
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
cryptography is always available
parent
c6b63b05
Changes
6
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
7 additions
and
72 deletions
+7
-72
asyncua/common/connection.py
asyncua/common/connection.py
+1
-7
asyncua/crypto/security_policies.py
asyncua/crypto/security_policies.py
+1
-35
asyncua/server/internal_server.py
asyncua/server/internal_server.py
+1
-9
asyncua/tools.py
asyncua/tools.py
+2
-4
tests/test_crypto_connect.py
tests/test_crypto_connect.py
+1
-9
tests/test_permissions.py
tests/test_permissions.py
+1
-8
No files found.
asyncua/common/connection.py
View file @
d410c7e1
...
...
@@ -7,13 +7,7 @@ import copy
from
asyncua
import
ua
from
asyncua.ua.uaerrors
import
UaInvalidParameterError
from
..ua.ua_binary
import
struct_from_binary
,
struct_to_binary
,
header_from_binary
,
header_to_binary
try
:
from
..crypto.uacrypto
import
InvalidSignature
except
ImportError
:
class
InvalidSignature
(
Exception
):
# type: ignore
pass
from
..crypto.uacrypto
import
InvalidSignature
_logger
=
logging
.
getLogger
(
"asyncua.uaprotocol"
)
...
...
asyncua/crypto/security_policies.py
View file @
d410c7e1
...
...
@@ -4,28 +4,13 @@ import time
from
abc
import
ABCMeta
,
abstractmethod
from
..ua
import
CryptographyNone
,
SecurityPolicy
,
MessageSecurityMode
,
UaError
try
:
from
..crypto
import
uacrypto
CRYPTOGRAPHY_AVAILABLE
=
True
except
ImportError
:
CRYPTOGRAPHY_AVAILABLE
=
False
from
..crypto
import
uacrypto
POLICY_NONE_URI
=
"http://opcfoundation.org/UA/SecurityPolicy#None"
_logger
=
logging
.
getLogger
(
__name__
)
def
require_cryptography
(
obj
):
"""
Raise exception if cryptography module is not available.
Call this function in constructors.
"""
if
not
CRYPTOGRAPHY_AVAILABLE
:
raise
UaError
(
f"Can't use
{
obj
.
__class__
.
__name__
}
, cryptography module is not installed"
)
class
Signer
:
"""
Abstract base class for cryptographic signature algorithm
...
...
@@ -237,7 +222,6 @@ class Cryptography(CryptographyNone):
class
SignerRsa
(
Signer
):
def
__init__
(
self
,
client_pk
):
require_cryptography
(
self
)
self
.
client_pk
=
client_pk
self
.
key_size
=
self
.
client_pk
.
key_size
//
8
...
...
@@ -250,7 +234,6 @@ class SignerRsa(Signer):
class
VerifierRsa
(
Verifier
):
def
__init__
(
self
,
server_cert
):
require_cryptography
(
self
)
self
.
server_cert
=
server_cert
self
.
key_size
=
self
.
server_cert
.
public_key
().
key_size
//
8
...
...
@@ -263,7 +246,6 @@ class VerifierRsa(Verifier):
class
EncryptorRsa
(
Encryptor
):
def
__init__
(
self
,
server_cert
,
enc_fn
,
padding_size
):
require_cryptography
(
self
)
self
.
server_cert
=
server_cert
self
.
key_size
=
self
.
server_cert
.
public_key
().
key_size
//
8
self
.
encryptor
=
enc_fn
...
...
@@ -285,7 +267,6 @@ class EncryptorRsa(Encryptor):
class
DecryptorRsa
(
Decryptor
):
def
__init__
(
self
,
client_pk
,
dec_fn
,
padding_size
):
require_cryptography
(
self
)
self
.
client_pk
=
client_pk
self
.
key_size
=
self
.
client_pk
.
key_size
//
8
self
.
decryptor
=
dec_fn
...
...
@@ -307,7 +288,6 @@ class DecryptorRsa(Decryptor):
class
SignerAesCbc
(
Signer
):
def
__init__
(
self
,
key
):
require_cryptography
(
self
)
self
.
key
=
key
def
signature_size
(
self
):
...
...
@@ -319,7 +299,6 @@ class SignerAesCbc(Signer):
class
VerifierAesCbc
(
Verifier
):
def
__init__
(
self
,
key
):
require_cryptography
(
self
)
self
.
key
=
key
def
signature_size
(
self
):
...
...
@@ -333,7 +312,6 @@ class VerifierAesCbc(Verifier):
class
EncryptorAesCbc
(
Encryptor
):
def
__init__
(
self
,
key
,
init_vec
):
require_cryptography
(
self
)
self
.
cipher
=
uacrypto
.
cipher_aes_cbc
(
key
,
init_vec
)
def
plain_block_size
(
self
):
...
...
@@ -348,7 +326,6 @@ class EncryptorAesCbc(Encryptor):
class
DecryptorAesCbc
(
Decryptor
):
def
__init__
(
self
,
key
,
init_vec
):
require_cryptography
(
self
)
self
.
cipher
=
uacrypto
.
cipher_aes_cbc
(
key
,
init_vec
)
def
plain_block_size
(
self
):
...
...
@@ -363,7 +340,6 @@ class DecryptorAesCbc(Decryptor):
class
SignerSha256
(
Signer
):
def
__init__
(
self
,
client_pk
):
require_cryptography
(
self
)
self
.
client_pk
=
client_pk
self
.
key_size
=
self
.
client_pk
.
key_size
//
8
...
...
@@ -376,7 +352,6 @@ class SignerSha256(Signer):
class
VerifierSha256
(
Verifier
):
def
__init__
(
self
,
server_cert
):
require_cryptography
(
self
)
self
.
server_cert
=
server_cert
self
.
key_size
=
self
.
server_cert
.
public_key
().
key_size
//
8
...
...
@@ -389,7 +364,6 @@ class VerifierSha256(Verifier):
class
SignerHMac256
(
Signer
):
def
__init__
(
self
,
key
):
require_cryptography
(
self
)
self
.
key
=
key
def
signature_size
(
self
):
...
...
@@ -401,7 +375,6 @@ class SignerHMac256(Signer):
class
VerifierHMac256
(
Verifier
):
def
__init__
(
self
,
key
):
require_cryptography
(
self
)
self
.
key
=
key
def
signature_size
(
self
):
...
...
@@ -415,7 +388,6 @@ class VerifierHMac256(Verifier):
class
SignerPssSha256
(
Signer
):
def
__init__
(
self
,
client_pk
):
require_cryptography
(
self
)
self
.
client_pk
=
client_pk
self
.
key_size
=
self
.
client_pk
.
key_size
//
8
...
...
@@ -428,7 +400,6 @@ class SignerPssSha256(Signer):
class
VerifierPssSha256
(
Verifier
):
def
__init__
(
self
,
server_cert
):
require_cryptography
(
self
)
self
.
server_cert
=
server_cert
self
.
key_size
=
self
.
server_cert
.
public_key
().
key_size
//
8
...
...
@@ -476,7 +447,6 @@ class SecurityPolicyAes128Sha256RsaOaep(SecurityPolicy):
return
uacrypto
.
encrypt_rsa_oaep
(
pubkey
,
data
)
def
__init__
(
self
,
peer_cert
,
host_cert
,
client_pk
,
mode
,
permission_ruleset
=
None
):
require_cryptography
(
self
)
if
isinstance
(
peer_cert
,
bytes
):
peer_cert
=
uacrypto
.
x509_from_der
(
peer_cert
)
# even in Sign mode we need to asymmetrically encrypt secrets
...
...
@@ -550,7 +520,6 @@ class SecurityPolicyAes256Sha256RsaPss(SecurityPolicy):
return
uacrypto
.
encrypt_rsa_oaep_sha256
(
pubkey
,
data
)
def
__init__
(
self
,
peer_cert
,
host_cert
,
client_pk
,
mode
,
permission_ruleset
=
None
):
require_cryptography
(
self
)
if
isinstance
(
peer_cert
,
bytes
):
peer_cert
=
uacrypto
.
x509_from_der
(
peer_cert
)
# even in Sign mode we need to asymmetrically encrypt secrets
...
...
@@ -632,7 +601,6 @@ class SecurityPolicyBasic128Rsa15(SecurityPolicy):
def
__init__
(
self
,
peer_cert
,
host_cert
,
client_pk
,
mode
,
permission_ruleset
=
None
):
_logger
.
warning
(
"DEPRECATED! Do not use SecurityPolicyBasic128Rsa15 anymore!"
)
require_cryptography
(
self
)
if
isinstance
(
peer_cert
,
bytes
):
peer_cert
=
uacrypto
.
x509_from_der
(
peer_cert
)
# even in Sign mode we need to asymmetrically encrypt secrets
...
...
@@ -712,7 +680,6 @@ class SecurityPolicyBasic256(SecurityPolicy):
def
__init__
(
self
,
peer_cert
,
host_cert
,
client_pk
,
mode
,
permission_ruleset
=
None
):
_logger
.
warning
(
"DEPRECATED! Do not use SecurityPolicyBasic256 anymore!"
)
require_cryptography
(
self
)
if
isinstance
(
peer_cert
,
bytes
):
peer_cert
=
uacrypto
.
x509_from_der
(
peer_cert
)
# even in Sign mode we need to asymmetrically encrypt secrets
...
...
@@ -790,7 +757,6 @@ class SecurityPolicyBasic256Sha256(SecurityPolicy):
return
uacrypto
.
encrypt_rsa_oaep
(
pubkey
,
data
)
def
__init__
(
self
,
peer_cert
,
host_cert
,
client_pk
,
mode
,
permission_ruleset
=
None
):
require_cryptography
(
self
)
if
isinstance
(
peer_cert
,
bytes
):
peer_cert
=
uacrypto
.
x509_from_der
(
peer_cert
)
# even in Sign mode we need to asymmetrically encrypt secrets
...
...
asyncua/server/internal_server.py
View file @
d410c7e1
...
...
@@ -24,12 +24,7 @@ from .users import User, UserRole
from
.internal_session
import
InternalSession
from
.event_generator
import
EventGenerator
from
..crypto.validator
import
CertificateValidatorMethod
try
:
from
asyncua.crypto
import
uacrypto
except
ImportError
:
logging
.
getLogger
(
__name__
).
warning
(
"cryptography is not installed, use of crypto disabled"
)
uacrypto
=
False
from
..crypto
import
uacrypto
_logger
=
logging
.
getLogger
(
__name__
)
...
...
@@ -403,9 +398,6 @@ class InternalServer:
# decrypt password if we can
if
str
(
token
.
EncryptionAlgorithm
)
!=
"None"
:
if
not
uacrypto
:
# raise # Should I raise a significant exception?
return
False
try
:
if
token
.
EncryptionAlgorithm
==
"http://www.w3.org/2001/04/xmlenc#rsa-1_5"
:
raw_pw
=
uacrypto
.
decrypt_rsa15
(
self
.
private_key
,
password
)
...
...
asyncua/tools.py
View file @
d410c7e1
...
...
@@ -513,10 +513,8 @@ def application_to_strings(app):
def
cert_to_string
(
der
):
if
not
der
:
return
"[no certificate]"
try
:
from
.crypto
import
uacrypto
except
ImportError
:
return
f"
{
len
(
der
)
}
bytes"
cert
=
uacrypto
.
x509_from_der
(
der
)
return
uacrypto
.
x509_to_string
(
cert
)
...
...
tests/test_crypto_connect.py
View file @
d410c7e1
...
...
@@ -13,15 +13,7 @@ from asyncua import ua
from
asyncua.server.user_managers
import
CertificateUserManager
from
asyncua.crypto.security_policies
import
Verifier
,
Decryptor
from
asyncua.crypto.validator
import
CertificateValidator
,
CertificateValidatorOptions
try
:
from
asyncua.crypto
import
uacrypto
from
asyncua.crypto
import
security_policies
except
ImportError
:
print
(
"WARNING: CRYPTO NOT AVAILABLE, CRYPTO TESTS DISABLED!!"
)
disable_crypto_tests
=
True
else
:
disable_crypto_tests
=
False
from
asyncua.crypto
import
uacrypto
,
security_policies
pytestmark
=
pytest
.
mark
.
asyncio
...
...
tests/test_permissions.py
View file @
d410c7e1
...
...
@@ -6,14 +6,7 @@ from asyncua import Server
from
asyncua
import
ua
from
asyncua.server.users
import
UserRole
from
asyncua.server.user_managers
import
CertificateUserManager
try
:
from
asyncua.crypto
import
security_policies
except
ImportError
:
print
(
"WARNING: CRYPTO NOT AVAILABLE, CRYPTO TESTS DISABLED!!"
)
disable_crypto_tests
=
True
else
:
disable_crypto_tests
=
False
from
asyncua.crypto
import
security_policies
pytestmark
=
pytest
.
mark
.
asyncio
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment