Commit 9aad5a4d authored by Ralf Taraschewski's avatar Ralf Taraschewski Committed by GitHub

Replace os path with pathlib (#1179)

* replaced os.path with pathlib.Path

* fixed some type hinting; added FIXME for critical vulnerability

* added string and path support for certificate paths because path only broke the API. BUT we should stop using strings for paths in the future..

---------

Co-authored-by: ratara <noreply>
parent bc4aea7e
......@@ -22,6 +22,6 @@ examples/history.db
/.settings/
/venv/
.eggs*
.vscode
.coverage
schemas/UA-Nodese*
.vscode
......@@ -2,6 +2,7 @@ import asyncio
import logging
from typing import List, Union, Coroutine, Optional, Type
from urllib.parse import urlparse, unquote
from pathlib import Path
from asyncua import ua
from .ua_client import UaClient
......@@ -218,7 +219,7 @@ class Client:
"""
self.user_certificate = await uacrypto.load_certificate(path, extension)
async def load_private_key(self, path: str, password: Optional[Union[str, bytes]] = None, extension: Optional[str] = None):
async def load_private_key(self, path: Path, password: Optional[Union[str, bytes]] = None, extension: Optional[str] = None):
"""
Load user private key. This is used for authenticating using certificate
"""
......
import os
from pathlib import Path
import aiofiles
from typing import Optional, Union
......@@ -17,12 +17,12 @@ from dataclasses import dataclass
@dataclass
class CertProperties:
path_or_content: Union[str, bytes]
path_or_content: Union[bytes, Path, str]
extension: Optional[str] = None
password: Optional[Union[str, bytes]] = None
async def get_content(path_or_content: Union[str, bytes]) -> bytes:
async def get_content(path_or_content: Union[str, bytes, Path]) -> bytes:
if isinstance(path_or_content, bytes):
return path_or_content
......@@ -30,8 +30,13 @@ async def get_content(path_or_content: Union[str, bytes]) -> bytes:
return await f.read()
async def load_certificate(path_or_content: Union[str, bytes], extension: Optional[str] = None):
_, ext = os.path.splitext(path_or_content)
async def load_certificate(path_or_content: Union[bytes, str, Path], extension: Optional[str] = None):
if isinstance(path_or_content, str):
ext = Path(path_or_content).suffix
elif isinstance(path_or_content, Path):
ext = path_or_content.suffix
else:
ext = ''
content = await get_content(path_or_content)
if ext == ".pem" or extension == 'pem' or extension == 'PEM':
......@@ -46,10 +51,15 @@ def x509_from_der(data):
return x509.load_der_x509_certificate(data, default_backend())
async def load_private_key(path_or_content: Union[str, bytes],
async def load_private_key(path_or_content: Union[str, Path, bytes],
password: Optional[Union[str, bytes]] = None,
extension: Optional[str] = None):
_, ext = os.path.splitext(path_or_content)
if isinstance(path_or_content, str):
ext = Path(path_or_content).suffix
elif isinstance(path_or_content, Path):
ext = path_or_content.suffix
else:
ext = ''
if isinstance(password, str):
password = password.encode('utf-8')
......@@ -149,6 +159,7 @@ def decrypt_rsa15(private_key, data):
def cipher_aes_cbc(key, init_vec):
# FIXME sonarlint reports critical vulnerability (python:S5542)
return Cipher(algorithms.AES(key), modes.CBC(init_vec), default_backend())
......
......@@ -7,7 +7,7 @@ import asyncio
from datetime import datetime, timedelta
from copy import copy
from struct import unpack_from
import os
from pathlib import Path
import logging
from urllib.parse import urlparse
from typing import Coroutine, Tuple
......@@ -120,8 +120,8 @@ class InternalServer:
async def load_standard_address_space(self, shelf_file=None):
if shelf_file:
is_file = await asyncio.get_running_loop().run_in_executor(
None, os.path.isfile, shelf_file
) or await asyncio.get_running_loop().run_in_executor(None, os.path.isfile, f'{shelf_file}.db')
None, Path.is_file, shelf_file
) or await asyncio.get_running_loop().run_in_executor(None, Path.is_file, f'{shelf_file}.db')
if is_file:
# import address space from shelf
await asyncio.get_running_loop().run_in_executor(None, self.aspace.load_aspace_shelf, shelf_file)
......
......@@ -8,6 +8,7 @@ import math
from datetime import timedelta, datetime
from urllib.parse import urlparse
from typing import Coroutine, Optional, Tuple, Union
from pathlib import Path
from asyncua import ua
from .binary_server_asyncio import BinaryServer
......@@ -227,7 +228,7 @@ class Server:
"""
self.certificate = await uacrypto.load_certificate(path_or_content, format)
async def load_private_key(self, path_or_content: Union[str, bytes], password=None, format=None):
async def load_private_key(self, path_or_content: Union[str, Path, bytes], password=None, format=None):
self.iserver.private_key = await uacrypto.load_private_key(path_or_content, password, format)
def disable_clock(self, val: bool = True):
......
from asyncua.crypto import uacrypto
import logging
from asyncua.server.users import UserRole, User
from pathlib import Path
from typing import Union
from asyncua.crypto import uacrypto
from asyncua.server.users import User, UserRole
class UserManager:
......@@ -26,7 +29,7 @@ class CertificateUserManager:
def __init__(self):
self._trusted_certificates = {}
async def add_role(self, certificate_path: str, user_role: UserRole, name: str, format: str = None):
async def add_role(self, certificate_path: Path, user_role: UserRole, name: str, format: Union[str, None] = None):
certificate = await uacrypto.load_certificate(certificate_path, format)
if name is None:
raise KeyError
......@@ -48,8 +51,8 @@ class CertificateUserManager:
else:
return correct_users[0]
async def add_user(self, certificate_path: str, name: str, format: str = None):
async def add_user(self, certificate_path: Path, name: str, format: Union[str, None] = None):
await self.add_role(certificate_path=certificate_path, user_role=UserRole.User, name=name, format=format)
async def add_admin(self, certificate_path: str, name:str, format: str = None):
async def add_admin(self, certificate_path: Path, name:str, format: Union[str, None] = None):
await self.add_role(certificate_path=certificate_path, user_role=UserRole.Admin, name=name, format=format)
......@@ -2,6 +2,7 @@
sync API of asyncua
"""
import asyncio
from pathlib import Path
from threading import Thread, Condition
import logging
from typing import List, Tuple, Union, Optional
......@@ -647,7 +648,7 @@ class XmlExporter:
pass
@syncmethod
def write_xml(self, xmlpath, pretty=True):
def write_xml(self, xmlpath: Path, pretty=True):
pass
......
import os.path
import asyncio
import logging
from pathlib import Path
from asyncua import ua, uamethod, Server
from asyncua import Server, ua, uamethod
@uamethod
......@@ -85,11 +85,11 @@ class HelloServer:
async def main():
script_dir = os.path.dirname(__file__)
script_dir = Path(__file__).parent
async with HelloServer(
"opc.tcp://0.0.0.0:4840/freeopcua/server/",
"FreeOpcUa Example Server",
os.path.join(script_dir, "test_saying.xml"),
script_dir / "test_saying.xml",
) as server:
while True:
await asyncio.sleep(1)
......
import os
from pathlib import Path
import datetime
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
BASE_DIR = Path(__file__).absolute().parent.parent
IgnoredEnums = ["NodeIdType"]
IgnoredStructs = ["QualifiedName", "NodeId", "ExpandedNodeId", "Variant", "DataValue",
......@@ -217,8 +217,8 @@ class CodeGenerator:
if __name__ == '__main__':
import generate_model_from_nodeset as gm
xml_path = os.path.join(BASE_DIR, 'schemas', 'UA-Nodeset-master', 'Schema', 'Opc.Ua.NodeSet2.Services.xml')
protocol_path = os.path.join(BASE_DIR, "asyncua", "ua", "uaprotocol_auto.py")
xml_path = BASE_DIR.joinpath('schemas', 'UA-Nodeset-master', 'Schema', 'Opc.Ua.NodeSet2.Services.xml')
protocol_path = BASE_DIR.joinpath("asyncua", "ua", "uaprotocol_auto.py")
p = gm.Parser(xml_path)
model = p.parse()
gm.nodeid_to_names(model)
......
import asyncio
import pytest
import operator
import os
from pathlib import Path
import socket
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process, Condition, Event
......@@ -268,19 +268,19 @@ async def history_server(request):
@pytest.fixture(scope="session")
def client_key_and_cert(request):
base_dir = os.path.dirname(os.path.dirname(__file__))
cert_dir = os.path.join(base_dir, "examples/certificates") + os.sep
key = f"{cert_dir}peer-private-key-example-1.pem"
cert = f"{cert_dir}peer-certificate-example-1.der"
base_dir = Path(__file__).parent.parent
cert_dir = base_dir / "examples/certificates"
key = cert_dir / "peer-private-key-example-1.pem"
cert = cert_dir / "peer-certificate-example-1.der"
return key, cert
@pytest.fixture(scope="session")
def server_key_and_cert(request):
base_dir = os.path.dirname(os.path.dirname(__file__))
cert_dir = os.path.join(base_dir, "examples") + os.sep
key = f"{cert_dir}private-key-example.pem"
cert = f"{cert_dir}certificate-example.der"
base_dir = Path(__file__).parent.parent
cert_dir = base_dir / "examples"
key = cert_dir / "private-key-example.pem"
cert = cert_dir / "certificate-example.der"
return key, cert
......
......@@ -9,9 +9,9 @@ same api on server and client side
import asyncio
import contextlib
import math
import os
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
import pytest
......@@ -20,7 +20,7 @@ from asyncua.common import ua_utils
from asyncua.common.copy_node_util import copy_node
from asyncua.common.instantiate_util import instantiate
from asyncua.common.methods import call_method_full
from asyncua.common.sql_injection import validate_table_name, SqlInjectionError
from asyncua.common.sql_injection import SqlInjectionError, validate_table_name
from asyncua.common.structures104 import new_enum, new_struct, new_struct_field
from asyncua.ua.ua_binary import struct_from_binary, struct_to_binary
......@@ -1426,9 +1426,9 @@ async def test_nested_struct_arrays(opc):
@contextlib.contextmanager
def expect_file_creation(filename: str):
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, filename)
path = Path(tmpdir) / filename
yield path
assert os.path.isfile(path), f"File {path} should have been created"
assert Path.is_file(path), f"File {path} should have been created"
async def test_custom_struct_export(opc):
......
import os
from pathlib import Path
import pytest
import asyncio
......@@ -29,26 +29,26 @@ port_num3 = 48516
uri_crypto = "opc.tcp://127.0.0.1:{0:d}".format(port_num1)
uri_no_crypto = "opc.tcp://127.0.0.1:{0:d}".format(port_num2)
uri_crypto_cert = "opc.tcp://127.0.0.1:{0:d}".format(port_num3)
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
EXAMPLE_PATH = os.path.join(BASE_DIR, "examples") + os.sep
srv_crypto_params = [(f"{EXAMPLE_PATH}private-key-example.pem",
f"{EXAMPLE_PATH}certificate-example.der"),
(f"{EXAMPLE_PATH}private-key-3072-example.pem",
f"{EXAMPLE_PATH}certificate-3072-example.der")]
BASE_DIR = Path(__file__).parent.parent
EXAMPLE_PATH = BASE_DIR / "examples"
srv_crypto_params = [(EXAMPLE_PATH / "private-key-example.pem",
EXAMPLE_PATH / "certificate-example.der"),
(EXAMPLE_PATH / "private-key-3072-example.pem",
EXAMPLE_PATH / "certificate-3072-example.der")]
peer_creds = {
"certificate": f"{EXAMPLE_PATH}certificates/peer-certificate-example-1.der",
"private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-1.pem"
"certificate": EXAMPLE_PATH / "certificates/peer-certificate-example-1.der",
"private_key": EXAMPLE_PATH / "certificates/peer-private-key-example-1.pem"
}
unauthorized_peer_creds = {
"certificate": f"{EXAMPLE_PATH}certificates/peer-certificate-example-2.der",
"private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-2.pem"
"certificate": EXAMPLE_PATH / "certificates/peer-certificate-example-2.der",
"private_key": EXAMPLE_PATH / "certificates/peer-private-key-example-2.pem"
}
encrypted_private_key_peer_creds = {
"private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-encrypted-private-key.pem",
"certificate": f"{EXAMPLE_PATH}certificates/peer-certificate-example-encrypted-private-key.der",
"private_key": EXAMPLE_PATH / "certificates/peer-private-key-example-encrypted-private-key.pem",
"certificate": EXAMPLE_PATH / "certificates/peer-certificate-example-encrypted-private-key.der",
"password": b"password"
}
......@@ -130,14 +130,14 @@ async def test_nocrypto_fail(srv_no_crypto):
clt = Client(uri_no_crypto)
with pytest.raises(ua.UaError):
await clt.set_security_string(
f"Basic256Sha256,Sign,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem")
f"Basic256Sha256,Sign,{EXAMPLE_PATH / 'certificate-example.der'},{EXAMPLE_PATH / 'private-key-example.pem'}")
async def test_basic256(srv_crypto_all_certs):
_, cert = srv_crypto_all_certs
clt = Client(uri_crypto)
await clt.set_security_string(
f"Basic256Sha256,Sign,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem,{cert}"
f"Basic256Sha256,Sign,{EXAMPLE_PATH / 'certificate-example.der'},{EXAMPLE_PATH / 'private-key-example.pem'},{cert}"
)
async with clt:
assert await clt.nodes.objects.get_children()
......@@ -147,7 +147,7 @@ async def test_basic256_encrypt(srv_crypto_all_certs):
_, cert = srv_crypto_all_certs
clt = Client(uri_crypto)
await clt.set_security_string(
f"Basic256Sha256,SignAndEncrypt,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem,{cert}")
f"Basic256Sha256,SignAndEncrypt,{EXAMPLE_PATH / 'certificate-example.der'},{EXAMPLE_PATH / 'private-key-example.pem'},{cert}")
async with clt:
assert await clt.nodes.objects.get_children()
......@@ -157,8 +157,8 @@ async def test_basic256_encrypt_success(srv_crypto_all_certs):
_, cert = srv_crypto_all_certs
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
f"{EXAMPLE_PATH}certificate-example.der",
f"{EXAMPLE_PATH}private-key-example.pem",
f"{EXAMPLE_PATH / 'certificate-example.der'}",
f"{EXAMPLE_PATH / 'private-key-example.pem'}",
None,
cert,
ua.MessageSecurityMode.SignAndEncrypt
......@@ -172,8 +172,8 @@ async def test_basic256_encrypt_use_certificate_bytes(srv_crypto_all_certs):
clt = Client(uri_crypto)
_, cert = srv_crypto_all_certs
with open(cert, 'rb') as server_cert, \
open(f"{EXAMPLE_PATH}certificate-example.der", 'rb') as user_cert, \
open(f"{EXAMPLE_PATH}private-key-example.pem", 'rb') as user_key:
open(f"{EXAMPLE_PATH / 'certificate-example.der'}", 'rb') as user_cert, \
open(f"{EXAMPLE_PATH / 'private-key-example.pem'}", 'rb') as user_key:
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
user_cert.read(),
......@@ -195,8 +195,8 @@ async def test_basic256_encrypt_fail(srv_crypto_all_certs):
with pytest.raises(ua.UaError):
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
f"{EXAMPLE_PATH}certificate-example.der",
f"{EXAMPLE_PATH}private-key-example.pem",
f"{EXAMPLE_PATH / 'certificate-example.der'}",
f"{EXAMPLE_PATH / 'private-key-example.pem'}",
None,
None,
mode=ua.MessageSecurityMode.None_
......@@ -208,8 +208,8 @@ async def test_Aes128Sha256RsaOaep_encrypt_success(srv_crypto_all_certs):
_, cert = srv_crypto_all_certs
await clt.set_security(
security_policies.SecurityPolicyAes128Sha256RsaOaep,
f"{EXAMPLE_PATH}certificate-example.der",
f"{EXAMPLE_PATH}private-key-example.pem",
f"{EXAMPLE_PATH / 'certificate-example.der'}",
f"{EXAMPLE_PATH / 'private-key-example.pem'}",
None,
cert,
ua.MessageSecurityMode.SignAndEncrypt
......
import os
from pathlib import Path
import pytest
from asyncua import Client
......@@ -24,24 +24,24 @@ port_num3 = 48516
uri_crypto = "opc.tcp://127.0.0.1:{0:d}".format(port_num1)
uri_no_crypto = "opc.tcp://127.0.0.1:{0:d}".format(port_num2)
uri_crypto_cert = "opc.tcp://127.0.0.1:{0:d}".format(port_num3)
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
EXAMPLE_PATH = os.path.join(BASE_DIR, "examples") + os.sep
srv_crypto_params = [(f"{EXAMPLE_PATH}private-key-example.pem",
f"{EXAMPLE_PATH}certificate-example.der"), ]
BASE_DIR = Path(__file__).parent.parent
EXAMPLE_PATH = BASE_DIR / "examples"
srv_crypto_params = [(EXAMPLE_PATH / "private-key-example.pem",
EXAMPLE_PATH / "certificate-example.der"),]
admin_peer_creds = {
"certificate": f"{EXAMPLE_PATH}certificates/peer-certificate-example-1.der",
"private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-1.pem"
"certificate": EXAMPLE_PATH / "certificates/peer-certificate-example-1.der",
"private_key": EXAMPLE_PATH / "certificates/peer-private-key-example-1.pem"
}
user_peer_creds = {
"certificate": f"{EXAMPLE_PATH}certificates/peer-certificate-example-2.der",
"private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-2.pem"
"certificate": EXAMPLE_PATH / "certificates/peer-certificate-example-2.der",
"private_key": EXAMPLE_PATH / "certificates/peer-private-key-example-2.pem"
}
anonymous_peer_creds = {
"certificate": f"{EXAMPLE_PATH}certificates/peer-certificate-example-3.der",
"private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-3.pem"
"certificate": EXAMPLE_PATH / "certificates/peer-certificate-example-3.der",
"private_key": EXAMPLE_PATH / "certificates/peer-private-key-example-3.pem"
}
......
import pytest
import os.path
from pathlib import Path
import xml.etree.ElementTree as ET
from asyncua.server.address_space import AddressSpace
......@@ -55,7 +55,7 @@ def test_std_address_space_references():
node_mgt_service = NodeManagementService(aspace)
standard_address_space.fill_address_space(node_mgt_service)
std_nodes = read_nodes(
os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'schemas', 'Opc.Ua.NodeSet2.xml'))
Path(__file__).parent.parent.absolute() / 'schemas' / 'Opc.Ua.NodeSet2.xml'
)
for k in aspace.keys():
refs = set(
......
from concurrent.futures import Future
import os
from pathlib import Path
import tempfile
import pytest
......@@ -134,7 +134,7 @@ def test_sync_xml_export(server):
exp = XmlExporter(server)
exp.build_etree([server.nodes.objects])
with tempfile.TemporaryDirectory() as tmpdir:
exp.write_xml(os.path.join(tmpdir, "toto_test_export.xml"))
exp.write_xml(Path(tmpdir) / "toto_test_export.xml")
def test_create_enum_sync(server):
......
......@@ -5,7 +5,7 @@ Simple unit test that do not need to setup a server or a client
"""
import io
import os
from pathlib import Path
import uuid
import pytest
import logging
......@@ -27,7 +27,7 @@ from asyncua.ua.uatypes import _MaskEnum
from asyncua.common.structures import StructGenerator
from asyncua.common.connection import MessageChunk
EXAMPLE_BSD_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "example.bsd"))
EXAMPLE_BSD_PATH = Path(__file__).parent.absolute() / "example.bsd"
def test_variant_array_none():
......
import os
from dataclasses import dataclass
from enum import IntEnum
from pathlib import Path
from asyncua import ua
from asyncua import Server, ua
from asyncua.ua import uatypes
from enum import IntEnum
from asyncua import Server
TEST_DIR = os.path.dirname(__file__) + os.sep
TEST_DIR = Path(__file__).parent
class ExampleEnum(IntEnum):
......@@ -28,7 +27,7 @@ class ExampleStruct:
async def add_server_custom_enum_struct(server: Server):
# import some nodes from xml
await server.import_xml(f"{TEST_DIR}enum_struct_test_nodes.xml")
await server.import_xml(TEST_DIR / "enum_struct_test_nodes.xml")
ns = await server.get_namespace_index('http://yourorganisation.org/struct_enum_example/')
uatypes.register_extension_object('ExampleStruct', ua.NodeId(5001, ns), ExampleStruct)
val = ua.ExampleStruct()
......
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from pathlib import Path
from asyncua.tools import uacall
sys.path.append(f'{Path(__file__).parent.parent}')
from asyncua.tools import uacall
if __name__ == "__main__":
uacall()
#!/usr/bin/env python
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from pathlib import Path
sys.path.append(f'{Path(__file__).parent.parent}')
from asyncua.tools import uaclient
......
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from pathlib import Path
sys.path.append(f'{Path(__file__).parent.parent}')
from asyncua.tools import uadiscover
......
#!/usr/bin/env python
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from pathlib import Path
sys.path.append(f'{Path(__file__).parent.parent}')
from asyncua.tools import uahistoryread
......
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from pathlib import Path
sys.path.append(f'{Path(__file__).parent.parent}')
from asyncua.tools import uals
......
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from pathlib import Path
sys.path.append(f'{Path(__file__).parent.parent}')
from asyncua.tools import uaread
......
#!/usr/bin/env python3
import sys
import os
import logging
import sys
from pathlib import Path
logging.basicConfig(level=logging.DEBUG)
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
sys.path.append(f'{Path(__file__).parent.parent}')
from asyncua.tools import uaserver
if __name__ == "__main__":
uaserver()
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from pathlib import Path
sys.path.append(f'{Path(__file__).parent.parent}')
from asyncua.tools import uasubscribe
......
#!/usr/bin/env python3
import sys
import os
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from pathlib import Path
sys.path.append(f'{Path(__file__).parent.parent}')
from asyncua.tools import uawrite
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment