Commit 35bb6207 authored by oroulet's avatar oroulet Committed by oroulet

add some missing typing and correct timeout type!

parent 8d8edbd6
......@@ -29,7 +29,7 @@ class Client:
use UaClient object, available as self.uaclient
which offers the raw OPC-UA services interface.
"""
def __init__(self, url: str, timeout: int = 4):
def __init__(self, url: str, timeout: float = 4):
"""
:param url: url of the server.
if you are unsure of url, write at least hostname
......
......@@ -3,7 +3,7 @@ Low level binary client
"""
import asyncio
import logging
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union
from asyncua import ua
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
......@@ -20,7 +20,7 @@ class UASocketProtocol(asyncio.Protocol):
OPEN = 'open'
CLOSED = 'closed'
def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
def __init__(self, timeout: float = 1, security_policy: ua.SecurityPolicy = ua.SecurityPolicy()):
"""
:param timeout: Timeout in seconds
:param security_policy: Security policy (optional)
......@@ -45,7 +45,7 @@ class UASocketProtocol(asyncio.Protocol):
self.state = self.OPEN
self.transport = transport
def connection_lost(self, exc):
def connection_lost(self, exc: Exception):
self.logger.info("Socket has closed connection")
self.state = self.CLOSED
self.transport = None
......@@ -91,7 +91,7 @@ class UASocketProtocol(asyncio.Protocol):
self.disconnect_socket()
return
def _process_received_message(self, msg):
def _process_received_message(self, msg: Union[ua.Message, ua.Acknowledge, ua.ErrorMessage]):
if msg is None:
pass
elif isinstance(msg, ua.Message):
......@@ -104,7 +104,7 @@ class UASocketProtocol(asyncio.Protocol):
else:
raise ua.UaError(f"Unsupported message type: {msg}")
def _send_request(self, request, timeout=1, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
def _send_request(self, request, timeout: float = 1, message_type=ua.MessageType.SecureMessage) -> asyncio.Future:
"""
Send request to server, lower-level method.
Timeout is the timeout written in ua header.
......@@ -134,7 +134,7 @@ class UASocketProtocol(asyncio.Protocol):
self.transport.write(msg)
return future
async def send_request(self, request, timeout=None, message_type=ua.MessageType.SecureMessage):
async def send_request(self, request, timeout: Optional[float] = None, message_type=ua.MessageType.SecureMessage):
"""
Send a request to the server.
Timeout is the timeout written in ua header.
......@@ -183,7 +183,7 @@ class UASocketProtocol(asyncio.Protocol):
hdr.AuthenticationToken = self.authentication_token
self._request_handle += 1
hdr.RequestHandle = self._request_handle
hdr.TimeoutHint = timeout * 1000
hdr.TimeoutHint = int(timeout * 1000)
return hdr
def disconnect_socket(self):
......@@ -193,7 +193,7 @@ class UASocketProtocol(asyncio.Protocol):
else:
self.logger.warning("disconnect_socket was called but transport is None")
async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
async def send_hello(self, url, max_messagesize: int = 0, max_chunkcount: int = 0):
hello = ua.Hello()
hello.EndpointUrl = url
hello.MaxMessageSize = max_messagesize
......@@ -241,7 +241,7 @@ class UaClient:
In this Python implementation most of the structures are defined in
uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
"""
def __init__(self, timeout=1):
def __init__(self, timeout: float = 1):
"""
:param timeout: Timout in seconds
"""
......@@ -273,7 +273,7 @@ class UaClient:
return None
return self.protocol.disconnect_socket()
async def send_hello(self, url, max_messagesize=0, max_chunkcount=0):
async def send_hello(self, url, max_messagesize: int = 0, max_chunkcount: int = 0):
await self.protocol.send_hello(url, max_messagesize, max_chunkcount)
async def open_secure_channel(self, params):
......@@ -446,7 +446,7 @@ class UaClient:
"create_subscription success SubscriptionId %s",
response.Parameters.SubscriptionId
)
if not self._publish_task or self._publish_task.done():
if not self._publish_task or self._publish_task.done() :
# Start the publish loop if it is not yet running
# The current strategy is to have only one open publish request per UaClient. This might not be enough
# in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
......
......@@ -4,6 +4,7 @@ Implement user management here.
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class UserRole(Enum):
......@@ -18,4 +19,4 @@ class UserRole(Enum):
@dataclass
class User:
role: UserRole = UserRole.Anonymous
name: str = None
name: Optional[str] = None
......@@ -185,7 +185,7 @@ def win_epoch_to_datetime(epch):
return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
FROZEN = False
FROZEN: bool = False
class ValueRank(IntEnum):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment