initial use of dataclass

......@@ -6,6 +6,8 @@ import struct
import logging
import uuid
from enum import IntEnum, Enum
from typing import get_origin, Union, get_type_hints, get_args
from dataclasses import is_dataclass
from .uaerrors import UaError
from ..common.utils import Buffer
......@@ -14,6 +16,30 @@ from asyncua import ua
logger = logging.getLogger('__name__')
def _is_union(uatype):
return get_origin(uatype) == Union
def _is_list(uatype):
return get_origin(uatype) == list
def _from_union(uatype, origin=None):
if origin is None:
origin = get_origin(uatype)
if origin == Union:
# need to find out what real type is
for subtype in get_args(uatype):
if subtype is not None.__class__: # FIXME: strange comparison...
return subtype
ValueError(f"Union {uatype} seems to only contain None")
def _from_list(uatype):
return get_args(uatype)[0]
def test_bit(data, offset):
mask = 1 << offset
return data & mask
......@@ -141,7 +167,7 @@ class _Primitive1(object):
return struct.unpack(self._fmt.format(length), * length))
class Primitives1(object):
class Primitives1:
SByte = _Primitive1('<{:d}b')
Int16 = _Primitive1('<{:d}h')
Int32 = _Primitive1('<{:d}i')
......@@ -230,17 +256,26 @@ def struct_to_binary(obj):
packet = []
has_switch = hasattr(obj, 'ua_switches')
if has_switch:
container_val = 0
for name, switch in obj.ua_switches.items():
member = getattr(obj, name)
container_name, idx = switch
if member is not None:
container_val = getattr(obj, container_name)
print("container_val before", container_val, idx)
container_val = container_val | 1 << idx
setattr(obj, container_name, container_val)
for name, uatype in obj.ua_types:
print("container_val after", container_val, idx)
print("SETATTR", name, idx, bin(container_val))
setattr(obj, container_name, container_val)
for name, uatype in get_type_hints(obj).items():
print("STRUCT", obj, name, uatype)
if name == "Encoding":
val = getattr(obj, name)
if uatype.startswith('ListOf'):
packet.append(list_to_binary(uatype[6:], val))
if _is_union(uatype):
uatype = _from_union(uatype)
if _is_list(uatype):
packet.append(list_to_binary(_from_list(uatype), val))
if has_switch and val is None and name in obj.ua_switches:
......@@ -251,23 +286,23 @@ def struct_to_binary(obj):
def to_binary(uatype, val):
Pack a python object to binary given a string defining its type
Pack a python object to binary given a type hint
if uatype.startswith('ListOf'):
# if isinstance(val, (list, tuple)):
return list_to_binary(uatype[6:], val)
elif type(uatype) is str and hasattr(ua.VariantType, uatype):
vtype = getattr(ua.VariantType, uatype)
return pack_uatype(vtype, val)
elif type(uatype) is str and hasattr(Primitives, uatype):
return getattr(Primitives, uatype).pack(val)
print("TOBIN", uatype, val)
if _is_list(uatype):
return list_to_binary(_from_list(uatype), val)
elif hasattr(Primitives, uatype.__name__):
return getattr(Primitives, uatype.__name__).pack(val)
elif isinstance(val, (IntEnum, Enum)):
return Primitives.UInt32.pack(val.value)
elif hasattr(ua.VariantType, uatype.__name__):
vtype = getattr(ua.VariantType, uatype.__name__)
return pack_uatype(vtype, val)
elif isinstance(val, ua.NodeId):
return nodeid_to_binary(val)
elif isinstance(val, ua.Variant):
return variant_to_binary(val)
elif hasattr(val, 'ua_types'):
elif is_dataclass(val):
return struct_to_binary(val)
raise UaError(f'No known way to pack {val} of type {uatype} to ua binary')
......@@ -276,8 +311,8 @@ def to_binary(uatype, val):
def list_to_binary(uatype, val):
if val is None:
return Primitives.Int32.pack(-1)
if hasattr(Primitives1, uatype):
data_type = getattr(Primitives1, uatype)
if hasattr(Primitives1, uatype.__name__):
data_type = getattr(Primitives1, uatype.__name__)
return data_type.pack_array(val)
data_size = Primitives.Int32.pack(len(val))
pack = [to_binary(uatype, el) for el in val]
......@@ -289,7 +324,7 @@ def nodeid_to_binary(nodeid):
if nodeid.NodeIdType == ua.NodeIdType.TwoByte:
return struct.pack('<BB', nodeid.NodeIdType.value, nodeid.Identifier)
elif nodeid.NodeIdType == ua.NodeIdType.FourByte:
return struct.pack('<BBH', nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
data = struct.pack('<BBH', nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
elif nodeid.NodeIdType == ua.NodeIdType.Numeric:
data = struct.pack('<BHI', nodeid.NodeIdType.value, nodeid.NamespaceIndex, nodeid.Identifier)
elif nodeid.NodeIdType == ua.NodeIdType.String:
......@@ -304,11 +339,11 @@ def nodeid_to_binary(nodeid):
raise UaError(f'Unknown NodeIdType: {nodeid.NodeIdType} for NodeId: {nodeid}')
# Add NamespaceUri and ServerIndex in case we have an ExpandedNodeId
if nodeid.NamespaceUri:
if hasattr(nodeid, "NamespaceUri") and nodeid.NamespaceUri:
data = bytearray(data)
data[0] = set_bit(data[0], 7)
if nodeid.ServerIndex:
if hasattr(nodeid, "ServerIndex") and nodeid.ServerIndex:
if not isinstance(data, bytearray):
data = bytearray(data)
data[0] = set_bit(data[0], 6)
......@@ -317,34 +352,39 @@ def nodeid_to_binary(nodeid):
def nodeid_from_binary(data):
nid = ua.NodeId()
encoding = ord(
nid.NodeIdType = ua.NodeIdType(encoding & 0b00111111)
if nid.NodeIdType == ua.NodeIdType.TwoByte:
nid.Identifier = ord(
elif nid.NodeIdType == ua.NodeIdType.FourByte:
nid.NamespaceIndex, nid.Identifier = struct.unpack("<BH",
elif nid.NodeIdType == ua.NodeIdType.Numeric:
nid.NamespaceIndex, nid.Identifier = struct.unpack("<HI",
elif nid.NodeIdType == ua.NodeIdType.String:
nid.NamespaceIndex = Primitives.UInt16.unpack(data)
nid.Identifier = Primitives.String.unpack(data)
elif nid.NodeIdType == ua.NodeIdType.ByteString:
nid.NamespaceIndex = Primitives.UInt16.unpack(data)
nid.Identifier = Primitives.Bytes.unpack(data)
elif nid.NodeIdType == ua.NodeIdType.Guid:
nid.NamespaceIndex = Primitives.UInt16.unpack(data)
nid.Identifier = Primitives.Guid.unpack(data)
nidtype = ua.NodeIdType(encoding & 0b00111111)
expanded = False
if nidtype == ua.NodeIdType.TwoByte:
identifier = ord(
return ua.TwoByteNodeId(identifier)
if nidtype == ua.NodeIdType.FourByte:
nidx, identifier = struct.unpack("<BH",
elif nidtype == ua.NodeIdType.Numeric:
nidx, identifier = struct.unpack("<HI",
elif nidtype == ua.NodeIdType.String:
nidx = Primitives.UInt16.unpack(data)
identifier = Primitives.String.unpack(data)
elif nidtype == ua.NodeIdType.ByteString:
nidx = Primitives.UInt16.unpack(data)
identifier = Primitives.Bytes.unpack(data)
elif nidtype == ua.NodeIdType.Guid:
nidx = Primitives.UInt16.unpack(data)
identifier = Primitives.Guid.unpack(data)
raise UaError(f'Unknown NodeId encoding: {nid.NodeIdType}')
raise UaError(f'Unknown NodeId encoding: {nidtype}')
if test_bit(encoding, 7):
nid.NamespaceUri = Primitives.String.unpack(data)
uri = Primitives.String.unpack(data)
expanded = True
if test_bit(encoding, 6):
nid.ServerIndex = Primitives.UInt32.unpack(data)
server_idx = Primitives.UInt32.unpack(data)
expanded = True
return nid
if expanded:
return ua.ExpandedNodeId(identifier, nidx, nidtype, uri, server_idx)
return ua.NodeId(identifier, nidx, nidtype)
def variant_to_binary(var):
......@@ -457,18 +497,20 @@ def from_binary(uatype, data):
unpack data given an uatype as a string or a python class having a ua_types member
if isinstance(uatype, str) and uatype.startswith("ListOf"):
utype = uatype[6:]
if hasattr(ua.VariantType, utype):
vtype = getattr(ua.VariantType, utype)
if _is_union(uatype):
uatype = _from_union(uatype)
if _is_list(uatype):
utype = _from_list(uatype)
if hasattr(ua.VariantType, utype.__name__):
vtype = getattr(ua.VariantType, utype.__name__)
return unpack_uatype_array(vtype, data)
size = Primitives.Int32.unpack(data)
return [from_binary(utype, data) for _ in range(size)]
elif isinstance(uatype, str) and hasattr(ua.VariantType, uatype):
vtype = getattr(ua.VariantType, uatype)
elif hasattr(ua.VariantType, uatype.__name__):
vtype = getattr(ua.VariantType, uatype.__name__)
return unpack_uatype(vtype, data)
elif isinstance(uatype, str) and hasattr(Primitives, uatype):
return getattr(Primitives, uatype).unpack(data)
elif hasattr(Primitives, uatype.__name__):
return getattr(Primitives, uatype.__name__).unpack(data)
return struct_from_binary(uatype, data)
......@@ -482,14 +524,17 @@ def struct_from_binary(objtype, data):
if issubclass(objtype, Enum):
return objtype(Primitives.UInt32.unpack(data))
obj = objtype()
for name, uatype in obj.ua_types:
for name, uatype in get_type_hints(obj).items():
# if our member has a switch and it is not set we skip it
if hasattr(obj, "ua_switches") and name in obj.ua_switches:
container_name, idx = obj.ua_switches[name]
val = getattr(obj, container_name)
print("BIN VAL", val, idx)
if not test_bit(val, idx):
print("NOT", name)
val = from_binary(uatype, data)
print("SETATR", obj, name, val)
setattr(obj, name, val)
return obj
......@@ -3,13 +3,14 @@ implement ua datatypes
import logging
from typing import Optional, Any, Union
from enum import Enum, IntEnum
from calendar import timegm
import os
import uuid
import re
import itertools
from datetime import datetime, timedelta, MAXYEAR, tzinfo
from dataclasses import dataclass, field
from import status_codes
from .uaerrors import UaError, UaStatusCodeError, UaStringParsingError
......@@ -21,10 +22,83 @@ HUNDREDS_OF_NANOSECONDS = 10000000
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
class SByte(bytes):
class Byte(bytes):
class Bytes(bytes):
class ByteString(bytes): # what is the ifference between Byte and ByteString??
class Int16(int):
class Int32(int):
class Int64(int):
class UInt16(int):
class UInt32(int):
class UInt64(int):
class Boolean:
class Double(float):
class Float(float):
class Null:
class String(str):
class CharArray(str):
class DateTime(datetime):
class Guid(uuid.UUID):
class UTC(tzinfo):
def utcoffset(self, dt):
return timedelta(0)
......@@ -52,31 +126,12 @@ def win_epoch_to_datetime(epch):
return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
except OverflowError:
# FILETIMEs after 31 Dec 9999 can't be converted to datetime
logger.warning('datetime overflow: %s', epch)
logger.warning("datetime overflow: %s", epch)
return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
class _FrozenClass(object):
Make it impossible to add members to a class.
Not pythonic at all but we found out it prevents many many
bugs in use of protocol structures
_freeze = False
def __setattr__(self, key, value):
if self._freeze and not hasattr(self, key):
raise TypeError(f"Error adding member '{key}' to class '{self.__class__.__name__}'," f" class is frozen, members are {self.__dict__.keys()}")
object.__setattr__(self, key, value)
if "PYOPCUA_TYPO_CHECK" in os.environ:
# typo check is cpu consuming, but it will make debug easy.
# set PYOPCUA_TYPO_CHECK will make all uatype classes inherit from _FrozenClass
logger.warning('uaypes typo checking is active')
FrozenClass = _FrozenClass
FrozenClass = object
FROZEN = False
FrozenClass = object
class ValueRank(IntEnum):
......@@ -85,6 +140,7 @@ class ValueRank(IntEnum):
This enum does not support all cases since ValueRank support any n>0
but since it is an IntEnum it can be replace by a normal int
ScalarOrOneDimension = -3
Any = -2
Scalar = -1
......@@ -101,7 +157,9 @@ class _MaskEnum(IntEnum):
def parse_bitfield(cls, the_int):
""" Take an integer and interpret it as a set of enum values. """
if not isinstance(the_int, int):
raise ValueError(f"Argument should be an int, we received {the_int} fo type {type(the_int)}")
raise ValueError(
f"Argument should be an int, we received {the_int} fo type {type(the_int)}"
return {cls(b) for b in cls._bits(the_int)}
......@@ -120,9 +178,9 @@ class _MaskEnum(IntEnum):
def _bits(n):
""" Iterate over the bits in n.
"""Iterate over the bits in n.
e.g. bits(44) yields at 2, 3, 5
e.g. bits(44) yields at 2, 3, 5
if not n >= 0: # avoid infinite recursion
raise ValueError()
......@@ -140,6 +198,7 @@ class AccessLevel(_MaskEnum):
Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
CurrentRead = 0
CurrentWrite = 1
HistoryRead = 2
......@@ -155,6 +214,7 @@ class WriteMask(_MaskEnum):
Spec Part 3, Paragraph 5.2.7 WriteMask
AccessLevel = 0
ArrayDimensions = 1
BrowseName = 2
......@@ -185,30 +245,16 @@ class EventNotifier(_MaskEnum):
Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
SubscribeToEvents = 0
# Reserved = 1
HistoryRead = 2
HistoryWrite = 3
class StatusCode:
value: UInt32 = 0
:ivar value:
:vartype value: int
:ivar name:
:vartype name: string
:ivar doc:
:vartype doc: string
ua_types = [("value", "UInt32")]
def __init__(self, value=0):
if isinstance(value, str):
self.value = getattr(status_codes.StatusCodes, value)
self.value = value
self._freeze = True
class StatusCode:
value: UInt32 = 0
def check(self):
......@@ -226,8 +272,7 @@ class StatusCode(FrozenClass):
mask = 3 << 30
if mask & self.value:
return False
return True
return True
def name(self):
......@@ -239,17 +284,6 @@ class StatusCode(FrozenClass):
_, doc = status_codes.get_name_and_doc(self.value)
return doc
return doc
def __str__(self):
return f'StatusCode({})'
__repr__ = __str__
def __eq__(self, other):
return self.value == other.value
def __ne__(self, other):
return not self.__eq__(other)
TwoByte = 0
TwoByte = 0
......@@ -260,7 +294,11 @@ class NodeIdType(IntEnum):
ByteString = 5
class NodeId(object):
_NodeIdType = NodeIdType # ugly hack
@dataclass(frozen=True, eq=False)
class NodeId:
NodeId Object
......@@ -280,49 +318,47 @@ class NodeId(object):
:ivar ServerIndex:
:vartype ServerIndex: Int
def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
self.Identifier = identifier
self.NamespaceIndex = namespaceidx
self.NodeIdType = nodeidtype
self.NamespaceUri = ""
self.ServerIndex = 0
self._freeze = True
if self.Identifier is None:
self.Identifier = 0
if namespaceidx == 0:
self.NodeIdType = NodeIdType.TwoByte
else: # TwoByte NodeId does not encode namespace.
self.NodeIdType = NodeIdType.Numeric
Identifier: Union[Int32, String, Guid, ByteString] = 0
NamespaceIndex: Int16 = 0
NodeIdType: _NodeIdType = None
def __post_init__(self):
if self.NodeIdType is None:
if isinstance(self.Identifier, int):
self.NodeIdType = NodeIdType.Numeric
if self.Identifier < 255 and self.NamespaceIndex == 0:
object.__setattr__(self, "NodeIdType", NodeIdType.TwoByte)
elif self.Identifier < 2 ** 16 and self.NamespaceIndex < 255:
object.__setattr__(self, "NodeIdType", NodeIdType.FourByte)
object.__setattr__(self, "NodeIdType", NodeIdType.Numeric)
elif isinstance(self.Identifier, str):
self.NodeIdType = NodeIdType.String
object.__setattr__(self, "NodeIdType", NodeIdType.String)
elif isinstance(self.Identifier, bytes):
self.NodeIdType = NodeIdType.ByteString
object.__setattr__(self, "NodeIdType", NodeIdType.ByteString)
elif isinstance(self.Identifier, uuid.UUID):
self.NodeIdType = NodeIdType.Guid
object.__setattr__(self, "NodeIdType", NodeIdType.Guid)
raise UaError("NodeId: Could not guess type of NodeId, set NodeIdType")
def check_identifier_type_compatibility(self):
Check whether the given identifier can be interpreted as the given node identifier type.
valid_type_combinations = [
(int, [NodeIdType.Numeric, NodeIdType.TwoByte, NodeIdType.FourByte]),
(str, [NodeIdType.String, NodeIdType.ByteString]),
(bytes, [NodeIdType.ByteString, NodeIdType.TwoByte, NodeIdType.FourByte]),
(uuid.UUID, [NodeIdType.Guid])
(uuid.UUID, [NodeIdType.Guid]),
for identifier, valid_node_types in valid_type_combinations:
if isinstance(self.Identifier, identifier) and self.NodeIdType in valid_node_types:
raise UaError(f"NodeId of type {self.NodeIdType} has an incompatible identifier {self.Identifier} of type {type(self.Identifier)}")
raise UaError(
f"NodeId of type {} has an incompatible identifier {self.Identifier} of type {type(self.Identifier)}"
def __eq__(self, node):
return isinstance(node, NodeId) and self.NamespaceIndex == node.NamespaceIndex and self.Identifier == node.Identifier
......@@ -337,6 +373,9 @@ class NodeId(object):
if not isinstance(other, NodeId):
raise AttributeError("Can only compare to NodeId")
return (self.NodeIdType, self.NamespaceIndex, self.Identifier) < (other.NodeIdType, other.NamespaceIndex, other.Identifier)
raise UaError(
f"NodeId of type {} has an incompatible identifier {self.Identifier} of type {type(self.Identifier)}"
def is_null(self):
if self.NamespaceIndex != 0:
......@@ -355,17 +394,17 @@ class NodeId(object):
return NodeId._from_string(string)
except ValueError as ex:
raise UaStringParsingError(f"Error parsing string {string}", ex)
raise UaStringParsingError(f"Error parsing string {string}", ex) from ex
def _from_string(string):
l = string.split(";")
elements = string.split(";")
identifier = None
namespace = 0
ntype = None
srv = None
nsu = None
for el in l:
for el in elements:
if not el:
k, v = el.split("=", 1)
......@@ -386,15 +425,14 @@ class NodeId(object):
ntype = NodeIdType.ByteString
identifier = v
elif k == "srv":
srv = v
srv = int(v)
elif k == "nsu":
nsu = v
if identifier is None:
raise UaStringParsingError(f"Could not find identifier in string: {string}")
nodeid = NodeId(identifier, namespace, ntype)
nodeid.NamespaceUri = nsu
nodeid.ServerIndex = srv
return nodeid
if nsu is not None or srv is not None:
return ExpandedNodeId(identifier, namespace, ntype, NamespaceUri=nsu, ServerIndex=srv)
return NodeId(identifier, namespace, ntype)
string = []
string = []
......@@ -414,72 +452,103 @@ class NodeId(object):
elif self.NodeIdType == NodeIdType.ByteString:
ntype = "b"
if self.ServerIndex:
if self.NamespaceUri:
return ";".join(string)
return self.to_string()
return self.to_string()
def __repr__(self):
return f"{}NodeId({self.to_string()})"
def to_binary(self):
import asyncua
@dataclass(frozen=True, eq=False)
class TwoByteNodeId(NodeId):
def __init__(self, identifier):
NodeId.__init__(self, identifier, 0, NodeIdType.TwoByte)
def __post_init__(self):
object.__setattr__(self, "NodeIdType", NodeIdType.TwoByte)
if not isinstance(self.Identifier, int):
raise ValueError(f"{self.__class__.__name__} Identifier must be int")
if self.Identifier > 255:
raise ValueError(f"{self.__class__.__name__} cannot have identifier > 255")
if self.NamespaceIndex != 0:
raise ValueError(f"{self.__class__.__name__} cannot have NamespaceIndex != 0")
@dataclass(frozen=True, eq=False)
class FourByteNodeId(NodeId):
def __init__(self, identifier, namespace=0):
NodeId.__init__(self, identifier, namespace, NodeIdType.FourByte)
def __post_init__(self):
object.__setattr__(self, "NodeIdType", NodeIdType.FourByte)
if not isinstance(self.Identifier, int):
raise ValueError(f"{self.__class__.__name__} Identifier must be int")
if self.Identifier > 65535:
raise ValueError(f"{self.__class__.__name__} cannot have identifier > 65535")
if self.NamespaceIndex > 255:
raise ValueError(f"{self.__class__.__name__} cannot have NamespaceIndex != 0")
@dataclass(frozen=True, eq=False)
class NumericNodeId(NodeId):
def __init__(self, identifier, namespace=0):
NodeId.__init__(self, identifier, namespace, NodeIdType.Numeric)
def __post_init__(self):
object.__setattr__(self, "NodeIdType", NodeIdType.Numeric)
if not isinstance(self.Identifier, int):
raise ValueError(f"{self.__class__.__name__} Identifier must be int")
@dataclass(frozen=True, eq=False)
class ByteStringNodeId(NodeId):
def __init__(self, identifier, namespace=0):
NodeId.__init__(self, identifier, namespace, NodeIdType.ByteString)
def __post_init__(self):
object.__setattr__(self, "NodeIdType", NodeIdType.ByteString)
if not isinstance(self.Identifier, bytes):
raise ValueError(f"{self.__class__.__name__} Identifier must be bytes")
@dataclass(frozen=True, eq=False)
class GuidNodeId(NodeId):
def __init__(self, identifier, namespace=0):
NodeId.__init__(self, identifier, namespace, NodeIdType.Guid)
def __post_init__(self):
object.__setattr__(self, "NodeIdType", NodeIdType.Guid)
if not isinstance(self.Identifier, uuid.UUID):
raise ValueError(f"{self.__class__.__name__} Identifier must be uuid")
@dataclass(frozen=True, eq=False)
class StringNodeId(NodeId):
def __init__(self, identifier, namespace=0):
NodeId.__init__(self, identifier, namespace, NodeIdType.String)
def __post_init__(self):
object.__setattr__(self, "NodeIdType", NodeIdType.String)
if not isinstance(self.Identifier, str):
raise ValueError(f"{self.__class__.__name__} Identifier must be string")
@dataclass(frozen=True, eq=False)
class ExpandedNodeId(NodeId):
NamespaceUri: Optional[String] = field(default=None, compare=True)
ServerIndex: Int32 = field(default=0, compare=True)
ExpandedNodeId = NodeId
def to_string(self):
string = NodeId.to_string(self)
string = [string]
if self.ServerIndex:
if self.NamespaceUri:
return ";".join(string)
class QualifiedName(FrozenClass):
@dataclass(frozen=FROZEN, order=True)
class QualifiedName:
A string qualified with a namespace index.
ua_types = [
('NamespaceIndex', 'UInt16'),
('Name', 'String'),
NamespaceIndex: UInt16 = 0
Name: String = ""
def __init__(self, name=None, namespaceidx=0):
if not isinstance(namespaceidx, int):
raise UaError(f"namespaceidx must be an int not {namespaceidx} of type {type(namespaceidx)}")
self.NamespaceIndex = namespaceidx
self.Name = name
self._freeze = True
def __init__(self, Name, NamespaceIndex=0):
self.Name = Name
self.NamespaceIndex = NamespaceIndex
if isinstance(self.NamespaceIndex, str) and isinstance(self.Name, int):
# originally the order or argument was inversed, try to support it
logger.warning("QualifiedName are str, int, while int, str is expected, swithcing")
if not isinstance(self.NamespaceIndex, int) or not isinstance(self.Name, str):
raise ValueError(f"QualifiedName constructore args have wrong types, {self}")
def to_string(self):
return f"{self.NamespaceIndex}:{self.Name}"
......@@ -491,81 +560,46 @@ class QualifiedName(FrozenClass):
idx, name = string.split(":", 1)
idx = int(idx)
except (TypeError, ValueError) as ex:
raise UaStringParsingError(f"Error parsing string {string}", ex)
raise UaStringParsingError(f"Error parsing string {string}", ex) from ex
idx = 0
name = string
return QualifiedName(name, idx)
def __eq__(self, bname):
return isinstance(bname, QualifiedName) and self.Name == bname.Name and self.NamespaceIndex == bname.NamespaceIndex
def __ne__(self, other):
return not self.__eq__(other)
return QualifiedName(Name=name, NamespaceIndex=idx)
def __lt__(self, other):
if not isinstance(other, QualifiedName):
raise TypeError(f"Cannot compare QualifiedName and {other}")
if self.NamespaceIndex == other.NamespaceIndex:
return self.Name < other.Name
return self.NamespaceIndex < other.NamespaceIndex
def __str__(self):
return f'QualifiedName({self.NamespaceIndex}:{self.Name})'
__repr__ = __str__
class LocalizedText(FrozenClass):
@dataclass(frozen=FROZEN, init=False)
class LocalizedText:
A string qualified with a namespace index.
Encoding: Byte = field(default=0, repr=False, init=False)
Locale: Optional[String] = None
Text: Optional[String] = None
ua_switches = {
'Locale': ('Encoding', 0),
'Text': ('Encoding', 1),
"Locale": ("Encoding", 0),
"Text": ("Encoding", 1),
ua_types = (
('Encoding', 'Byte'),
('Locale', 'String'),
('Text', 'String'),
def __init__(self, text=None, locale=None):
self.Encoding = 0
self._text = None
self._locale = None
if text:
self.Text = text
if locale:
self.Locale = locale
self._freeze = True
def Text(self):
return self._text
def Locale(self):
return self._locale
def Text(self, text):
if not isinstance(text, str):
raise ValueError(f"A LocalizedText object takes a string as argument \"text\", not a {type(text)}, {text}")
self._text = text
if self._text:
self.Encoding |= (1 << 1)
def Locale(self, locale):
if not isinstance(locale, str):
raise ValueError(f"A LocalizedText object takes a string as argument \"locale\"," f" not a {type(locale)}, {locale}")
self._locale = locale
if self._locale:
self.Encoding |= (1)
def __init__(self, Text=None, Locale=None):
# need to write init method since args ar inverted in original implementataion
self.Text = Text
self.Locale = Locale
if self.Text is not None:
if not isinstance(self.Text, str):
raise ValueError(
f'A LocalizedText object takes a string as argument "text"'
f"not a {type(self.Text)}, {self.Text}"
if self.Locale is not None:
if not isinstance(self.Locale, str):
raise ValueError(
f'A LocalizedText object takes a string as argument "locale",'
f" not a {type(self.Locale)}, {self.Locale}"
def to_string(self):
if self.Text is None:
......@@ -580,25 +614,12 @@ class LocalizedText(FrozenClass):
if m:
text = if != str(None) else None
locale = if != str(None) else None
return LocalizedText(text=text, locale=locale)
return LocalizedText(string)
def __str__(self):
return f'LocalizedText(Encoding:{self.Encoding}, Locale:{self.Locale}, Text:{self.Text})'
__repr__ = __str__
def __eq__(self, other):
if isinstance(other, LocalizedText) and self.Locale == other.Locale and self.Text == other.Text:
return True
return False
def __ne__(self, other):
return not self.__eq__(other)
return LocalizedText(Text=text, Locale=locale)
return LocalizedText(string)
class ExtensionObject(FrozenClass):
class ExtensionObject:
Any UA object packed as an ExtensionObject
......@@ -607,33 +628,18 @@ class ExtensionObject(FrozenClass):
:ivar Body:
:vartype Body: bytes
ua_switches = {
'Body': ('Encoding', 0),
"Body": ("Encoding", 0),
ua_types = (
("TypeId", "NodeId"),
("Encoding", "Byte"),
("Body", "ByteString"),
def __init__(self):
self.TypeId = NodeId()
self.Encoding = 0
self.Body = None
self._freeze = True
TypeId: NodeId = NodeId()
Encoding: Byte = field(default=0, repr=False, init=False)
Body: Optional[ByteString] = b""
def __bool__(self):
return self.Body is not None
__nonzero__ = __bool__ # Python2 compatibilty
def __str__(self):
size = len(self.Body) if self.Body is not None else None
return f'ExtensionObject(TypeId:{self.TypeId}, Encoding:{self.Encoding}, {size} bytes)'
__repr__ = __str__
class VariantType(Enum):
......@@ -695,18 +701,21 @@ class VariantType(Enum):
DiagnosticInfo = 25
class VariantTypeCustom(object):
class VariantTypeCustom:
Looks like sometime we get variant with other values than those
defined in VariantType.
FIXME: We should not need this class, as far as I iunderstand the spec
variants can only be of VariantType
def __init__(self, val): = "Custom"
self.value = val
if self.value > 0b00111111:
raise UaError(f"Cannot create VariantType. VariantType must be {0b111111} > x > {25}, received {val}")
raise UaError(
f"Cannot create VariantType. VariantType must be {0b111111} > x > {25}, received {val}"
def __str__(self):
return f"VariantType.Custom:{self.value}"
......@@ -717,7 +726,8 @@ class VariantTypeCustom(object):
return self.value == other.value
class Variant(FrozenClass):
class Variant:
Create an OPC-UA Variant object.
if no argument a Null Variant is created.
......@@ -733,34 +743,53 @@ class Variant(FrozenClass):
:ivar is_array:
:vartype is_array: If the variant is an array. Usually guessed from value.
def __init__(self, value=None, varianttype=None, dimensions=None, is_array=None):
self.Value = value
self.VariantType = varianttype
self.Dimensions = dimensions
self.is_array = is_array
# FIXME: typing is wrong here
Value: Any = None
VariantType: VariantType = None
Dimensions: Optional[Int32] = None
is_array: bool = False
def __post_init__(self):
if self.is_array is None:
if isinstance(value, (list, tuple)):
if isinstance(self.Value, (list, tuple)):
self.is_array = True
self.is_array = False
self._freeze = True
if isinstance(value, Variant):
self.Value = value.Value
self.VariantType = value.VariantType
if isinstance(self.Value, Variant):
self.Value = self.Value.Value
self.VariantType = self.Value.VariantType
if self.VariantType is None:
self.VariantType = self._guess_type(self.Value)
if self.Value is None and not self.is_array and self.VariantType not in (VariantType.Null, VariantType.String, VariantType.DateTime, VariantType.ExtensionObject):
if self.Value == None and self.VariantType == VariantType.NodeId:
self.Value = NodeId(0,0)
if (
self.Value is None
and not self.is_array
and self.VariantType
not in (
if self.Value is None and self.VariantType == VariantType.NodeId:
self.Value = NodeId(0, 0)
raise UaError(f"Non array Variant of type {self.VariantType} cannot have value None")
raise UaError(
f"Non array Variant of type {self.VariantType} cannot have value None"
if self.Dimensions is None and isinstance(self.Value, (list, tuple)):
dims = get_shape(self.Value)
if len(dims) > 1:
self.Dimensions = dims
def __eq__(self, other):
if isinstance(other, Variant) and self.VariantType == other.VariantType and self.Value == other.Value:
if (
isinstance(other, Variant)
and self.VariantType == other.VariantType
and self.Value == other.Value
return True
return False
......@@ -800,17 +829,14 @@ class Variant(FrozenClass):
except AttributeError:
return VariantType.ExtensionObject
raise UaError(f"Could not guess UA type of {val} with type {type(val)}, specify UA type")
def __str__(self):
return f"Variant(val:{self.Value!s},type:{self.VariantType})"
__repr__ = __str__
raise UaError(
f"Could not guess UA type of {val} with type {type(val)}, specify UA type"
def _split_list(l, n):
n = max(1, n)
return [l[i:i + n] for i in range(0, len(l), n)]
return [l[i : i + n] for i in range(0, len(l), n)]
def flatten_and_get_shape(mylist):
......@@ -845,7 +871,8 @@ def get_shape(mylist):
return dims
class DataValue(FrozenClass):
class DataValue:
A value with an associated timestamp, and quality.
Automatically generated from xml , copied and modified here to fix errors in xml spec
......@@ -865,54 +892,27 @@ class DataValue(FrozenClass):
ua_switches = {
'Value': ('Encoding', 0),
'StatusCode': ('Encoding', 1),
'SourceTimestamp': ('Encoding', 2),
'ServerTimestamp': ('Encoding', 3),
'SourcePicoseconds': ('Encoding', 4),
'ServerPicoseconds': ('Encoding', 5),
"Value": ("Encoding", 0),
"StatusCode": ("Encoding", 1),
"SourceTimestamp": ("Encoding", 2),
"ServerTimestamp": ("Encoding", 3),
"SourcePicoseconds": ("Encoding", 4),
"ServerPicoseconds": ("Encoding", 5),
ua_types = (
('Encoding', 'Byte'),
('Value', 'Variant'),
('StatusCode', 'StatusCode'),
('SourceTimestamp', 'DateTime'),
('SourcePicoseconds', 'UInt16'),
('ServerTimestamp', 'DateTime'),
('ServerPicoseconds', 'UInt16'),
def __init__(self, variant=None, status=None, sourceTimestamp=None, sourcePicoseconds=None, serverTimestamp=None, serverPicoseconds=None):
self.Encoding = 0
if not isinstance(variant, Variant):
variant = Variant(variant)
self.Value = variant
if status is None:
self.StatusCode = StatusCode()
self.StatusCode = status
self.SourceTimestamp = sourceTimestamp
self.SourcePicoseconds = sourcePicoseconds
self.ServerTimestamp = serverTimestamp
self.ServerPicoseconds = serverPicoseconds
self._freeze = True
Encoding: Byte = field(default=0, repr=False, init=False)
Value: Optional[Variant] = None
StatusCode: Optional[StatusCode] = None # field(default_factory=StatusCode)
SourceTimestamp: Optional[DateTime] = None
SourcePicoseconds: Optional[UInt16] = None
ServerTimestamp: Optional[DateTime] = None
ServerPicoseconds: Optional[UInt16] = None
def __str__(self):
s = []
if self.StatusCode is not None:
s.append(f', StatusCode:{self.StatusCode}')
if self.SourceTimestamp is not None:
s.append(f', SourceTimestamp:{self.SourceTimestamp}')
if self.ServerTimestamp is not None:
s.append(f', ServerTimestamp:{self.ServerTimestamp}')
if self.SourcePicoseconds is not None:
s.append(f', SourcePicoseconds:{self.SourcePicoseconds}')
if self.ServerPicoseconds is not None:
s.append(f', ServerPicoseconds:{self.ServerPicoseconds}')
return f'DataValue(Value:{self.Value}{"".join(s)})'
__repr__ = __str__
def __post_init__(
if not isinstance(self.Value, Variant):
self.Value = Variant(self.Value)
def datatype_to_varianttype(int_type):
......@@ -988,13 +988,14 @@ def register_enum(name, nodeid, class_type):
enums_by_datatype[nodeid] = class_type
enums_datatypes[class_type] = nodeid
setattr(, name, class_type)
# These dictionnaries are used to register extensions classes for automatic
# decoding and encoding
extension_objects_by_datatype = {} #Dict[Datatype, type]
extension_objects_by_typeid = {} #Dict[EncodingId, type]
extension_objects_by_datatype = {} # Dict[Datatype, type]
extension_objects_by_typeid = {} # Dict[EncodingId, type]
extension_object_typeids = {}
datatype_by_extension_object = {}
......@@ -1003,7 +1004,13 @@ def register_extension_object(name, encoding_nodeid, class_type, datatype_nodeid
Register a new extension object for automatic decoding and make them available in ua module
""""registring new extension object: %s %s %s %s", name, encoding_nodeid, class_type, datatype_nodeid)
"registring new extension object: %s %s %s %s",
if datatype_nodeid:
extension_objects_by_datatype[datatype_nodeid] = class_type
datatype_by_extension_object[class_type] = datatype_nodeid
......@@ -1012,6 +1019,7 @@ def register_extension_object(name, encoding_nodeid, class_type, datatype_nodeid
# FIXME: Next line is not exactly a Python best practices, so feel free to propose something else
# add new extensions objects to ua modules to automate decoding
setattr(, name, class_type)
......@@ -28,13 +28,13 @@ EXAMPLE_BSD_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "exam
def test_variant_array_none():
v = ua.Variant(None, varianttype=ua.VariantType.Int32, is_array=True)
v = ua.Variant(None, VariantType=ua.VariantType.Int32, is_array=True)
data = variant_to_binary(v)
v2 = variant_from_binary(ua.utils.Buffer(data))
assert v == v2
assert v2.is_array
v = ua.Variant(None, varianttype=ua.VariantType.Null, is_array=True)
v = ua.Variant(None, VariantType=ua.VariantType.Null, is_array=True)
data = variant_to_binary(v)
v2 = variant_from_binary(ua.utils.Buffer(data))
assert v == v2
......@@ -42,7 +42,7 @@ def test_variant_array_none():
def test_variant_empty_list():
v = ua.Variant([], varianttype=ua.VariantType.Int32, is_array=True)
v = ua.Variant([], VariantType=ua.VariantType.Int32, is_array=True)
data = variant_to_binary(v)
v2 = variant_from_binary(ua.utils.Buffer(data))
assert v == v2
......@@ -135,14 +135,14 @@ def test_custom_structs_array(tmpdir):
def test_nodeid_nsu():
n = ua.NodeId(100, 2)
n.NamespaceUri = "http://freeopcua/tests"
n.ServerIndex = 4
data = nodeid_to_binary(n)
n1 = ua.ExpandedNodeId(100, 2, NamespaceUri="http://freeopcua/tests", ServerIndex=4)
data = nodeid_to_binary(n1)
n2 = nodeid_from_binary(ua.utils.Buffer(data))
assert n == n2
n3 = ua.NodeId.from_string(n.to_string())
assert n == n3
assert n1 == n2
string = n1.to_string()
n3 = ua.NodeId.from_string(string)
assert n1 == n3
def test_nodeid_ordering():
......@@ -153,7 +153,7 @@ def test_nodeid_ordering():
e = ua.NodeId("aaaaa", 1)
f = ua.NodeId("aaaaa", 2)
g = ua.NodeId(uuid.uuid4(), 1)
h = ua.TwoByteNodeId(2001)
h = ua.TwoByteNodeId(201)
i = ua.NodeId(b"lkjkl", 1, ua.NodeIdType.ByteString)
j = ua.NodeId(b"aaa", 5, ua.NodeIdType.ByteString)
......@@ -333,7 +333,7 @@ def test_guid():
def test_nodeid_guid_string():
n = ua.GuidNodeId(identifier=uuid.uuid4())
n = ua.GuidNodeId(Identifier=uuid.uuid4())
s = n.to_string()
n2 = ua.NodeId.from_string(s)
s2 = n2.to_string()
......@@ -361,9 +361,8 @@ def test__nodeid():
s1 = ua.StringNodeId("53", 0)
bs = ua.ByteStringNodeId(b"53", 0)
gid = uuid.uuid4()
g = ua.ByteStringNodeId(str(gid), 0)
g = ua.ByteStringNodeId(gid.bytes, 0)
guid = ua.GuidNodeId(gid)
assert tb == fb
assert tb == n
assert tb == n1
assert n1 == fb
......@@ -416,18 +415,18 @@ def test_expandednodeid():
def test_null_guid():
with pytest.raises(ua.UaError):
n = ua.NodeId(b'000000', 0, nodeidtype=ua.NodeIdType.Guid)
n = ua.NodeId(uuid.UUID('00000000-0000-0000-0000-000000000000'), 0, nodeidtype=ua.NodeIdType.Guid)
n = ua.NodeId(b'000000', 0, NodeIdType=ua.NodeIdType.Guid)
n = ua.NodeId(uuid.UUID('00000000-0000-0000-0000-000000000000'), 0, NodeIdType=ua.NodeIdType.Guid)
assert n.is_null()
assert n.has_null_identifier()
with pytest.raises(ua.UaError):
n = ua.NodeId(b'000000', 1, nodeidtype=ua.NodeIdType.Guid)
n = ua.NodeId(uuid.UUID('00000000-0000-0000-0000-000000000000'), 1, nodeidtype=ua.NodeIdType.Guid)
n = ua.NodeId(b'000000', 1, NodeIdType=ua.NodeIdType.Guid)
n = ua.NodeId(uuid.UUID('00000000-0000-0000-0000-000000000000'), 1, NodeIdType=ua.NodeIdType.Guid)
assert not n.is_null()
assert n.has_null_identifier()
n = ua.NodeId(uuid.UUID('00000000-0000-0000-0000-000001000000'), 1, nodeidtype=ua.NodeIdType.Guid)
n = ua.NodeId(uuid.UUID('00000000-0000-0000-0000-000001000000'), 1, NodeIdType=ua.NodeIdType.Guid)
assert not n.is_null()
assert not n.has_null_identifier()
......@@ -520,7 +519,7 @@ def test_unicode_string_nodeid():
def test_numeric_nodeid():
nid = ua.NodeId(999, 2)
nid = ua.NumericNodeId(999, 2)
assert nid.NamespaceIndex == 2
assert nid.Identifier == 999
assert nid.NodeIdType == ua.NodeIdType.Numeric
......@@ -588,7 +587,7 @@ def test_variant_array():
def test_variant_array_dim():
v = ua.Variant([1, 2, 3, 4, 5, 6], dimensions=[2, 3])
v = ua.Variant([1, 2, 3, 4, 5, 6], Dimensions=[2, 3])
assert v.Value[1] == 2
v2 = variant_from_binary(ua.utils.Buffer(variant_to_binary(v)))
assert _reshape(v.Value, (2, 3)) == v2.Value
......@@ -607,13 +606,23 @@ def test_text():
assert t1 == t4
def test_text_simple():
t = ua.LocalizedText('Root')
b = struct_to_binary(t)
print("BIN", b)
buf = ua.utils.Buffer(b)
print("BUF", buf)
t2 = struct_from_binary(ua.LocalizedText, buf)
assert t == t2
def test_text_with_locale():
t0 = ua.LocalizedText('Root')
t1 = ua.LocalizedText('Root', 'de-AT')
t2 = ua.LocalizedText('Root', 'de-AT')
t3 = ua.LocalizedText('Root', 'de-DE')
t4 = ua.LocalizedText(locale='de-DE')
t5 = ua.LocalizedText(locale='de-DE')
t4 = ua.LocalizedText(Locale='de-DE')
t5 = ua.LocalizedText(Locale='de-DE')
assert t0 != t1
assert t1 == t2
assert t1 != t3
