Commit 1068876e authored by oroulet's avatar oroulet Committed by oroulet

remove ua_switches and small cleanup

parent 46ab8597
......@@ -190,19 +190,6 @@ class {struct_name}(ua.FrozenClass):
uatype = f"Optional[{uatype}]"
code += f" {fname}: {uatype} = {default_value}\n"
counter = 0
# FIXME: to support inheritance we probably need to add all fields from parents
# this requires network call etc...
if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
code += ' ua_switches = {\n'
for sfield in sdef.Fields:
fname = clean_name(sfield.Name)
if sfield.IsOptional:
code += f" '{fname}': ('Encoding', {counter}),\n"
counter += 1
code += " }\n\n"
return code
......
......@@ -441,9 +441,6 @@ class XmlExporter:
# FIXME; what happend if we have a custom type which is not part of ObjectIds???
type_name = type_string_from_type(field.type)
await self.member_to_etree(struct_el, field.name, ua.NodeId(getattr(ua.ObjectIds, type_name)), getattr(val, field.name))
# self.member_to_etree(struct_el, name, extension_object_typeids[vtype], getattr(val, name))
# for name in self._get_member_order(dtype, val):
# self.member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, val.ua_types[name])), getattr(val, name))
def indent(elem, level=0):
......
......@@ -226,15 +226,16 @@ def unpack_uatype_array(vtype, data):
def struct_to_binary(obj):
packet = []
has_switch = hasattr(obj, 'ua_switches')
if has_switch:
container_val = 0
for name, switch in obj.ua_switches.items():
member = getattr(obj, name)
container_name, idx = switch
if member is not None:
container_val = container_val | 1 << idx
setattr(obj, container_name, container_val)
enc_count = 0
enc = 0
for field in fields(obj):
if type_is_union(field.type):
if getattr(obj, field.name) is not None:
enc = enc | 1 << enc_count
enc_count += 1
if hasattr(obj, "Encoding"):
setattr(obj, "Encoding", enc)
for field in fields(obj):
uatype = field.type
if field.name == "Encoding":
......@@ -246,7 +247,7 @@ def struct_to_binary(obj):
if type_is_list(uatype):
packet.append(list_to_binary(type_from_list(uatype), val))
else:
if has_switch and val is None and field.name in obj.ua_switches:
if val is None and type_is_union(field.type):
pass
else:
packet.append(to_binary(uatype, val))
......@@ -460,7 +461,7 @@ def extensionobject_to_binary(obj):
def from_binary(uatype, data):
"""
unpack data given an uatype as a string or a python class having a ua_types member
unpack data given an uatype as a string or a python dataclass using ua types
"""
if type_is_union(uatype):
uatype = type_from_union(uatype)
......@@ -488,12 +489,13 @@ def struct_from_binary(objtype, data):
if issubclass(objtype, Enum):
return objtype(Primitives.UInt32.unpack(data))
obj = objtype()
enc_count = -1
for field in fields(obj):
# if our member has a switch and it is not set we skip it
if hasattr(obj, "ua_switches") and field.name in obj.ua_switches:
container_name, idx = obj.ua_switches[field.name]
val = getattr(obj, container_name)
if not test_bit(val, idx):
if type_is_union(field.type):
enc_count += 1
val = getattr(obj, "Encoding")
if not test_bit(val, enc_count):
continue
val = from_binary(field.type, data)
setattr(obj, field.name, val)
......
"""
Autogenerate code from xml spec
Date:2021-02-24 19:45:05.562631
Date:2021-02-27 19:37:13.366414
"""
from datetime import datetime
......@@ -1176,7 +1176,7 @@ class DiagnosticInfo:
data_type = NodeId(ObjectIds.DiagnosticInfo)
Encoding: Byte = field(default=0, repr=False, init=False)
Encoding: Byte = field(default=0, repr=False, init=False, compare=False)
SymbolicId: Optional[Int32] = None
NamespaceURI: Optional[Int32] = None
Locale: Optional[Int32] = None
......@@ -1185,16 +1185,6 @@ class DiagnosticInfo:
InnerStatusCode: Optional[StatusCode] = None
InnerDiagnosticInfo: Optional[ExtensionObject] = None
ua_switches = {
'SymbolicId': ('Encoding', 0),
'NamespaceURI': ('Encoding', 1),
'Locale': ('Encoding', 3),
'LocalizedText': ('Encoding', 2),
'AdditionalInfo': ('Encoding', 4),
'InnerStatusCode': ('Encoding', 5),
'InnerDiagnosticInfo': ('Encoding', 6),
}
@dataclass(frozen=FROZEN)
class KeyValuePair:
......@@ -8138,7 +8128,7 @@ class SessionSecurityDiagnosticsDataType:
ClientUserIdOfSession: String = None
ClientUserIdHistory: List[String] = field(default_factory=list)
AuthenticationMechanism: String = None
Encoding: Byte = field(default=0, repr=False, init=False)
Encoding: Byte = field(default=0, repr=False, init=False, compare=False)
TransportProtocol: String = None
SecurityMode: MessageSecurityMode = MessageSecurityMode.Invalid
SecurityPolicyUri: String = None
......
......@@ -61,25 +61,12 @@ class Header(uatypes.FrozenClass):
@dataclass
class ErrorMessage(uatypes.FrozenClass):
ua_types = (
('Error', 'StatusCode'),
('Reason', 'String'),
)
Error: uatypes.StatusCode = uatypes.StatusCode()
Reason: uatypes.String = ""
@dataclass
class Acknowledge(uatypes.FrozenClass):
ua_types = [
('ProtocolVersion', 'UInt32'),
('ReceiveBufferSize', 'UInt32'),
('SendBufferSize', 'UInt32'),
('MaxMessageSize', 'UInt32'),
('MaxChunkCount', 'UInt32'),
]
ProtocolVersion: uatypes.UInt32 = 0
ReceiveBufferSize: uatypes.UInt32 = 65536
SendBufferSize: uatypes.UInt32 = 65536
......
......@@ -644,11 +644,6 @@ class LocalizedText:
Locale: Optional[String] = None
Text: Optional[String] = None
ua_switches = {
"Locale": ("Encoding", 0),
"Text": ("Encoding", 1),
}
def __init__(self, Text=None, Locale=None):
# need to write init method since args ar inverted in original implementataion
self.Text = Text
......@@ -692,10 +687,6 @@ class ExtensionObject:
:vartype Body: bytes
"""
ua_switches = {
"Body": ("Encoding", 0),
}
TypeId: NodeId = NodeId()
Encoding: Byte = field(default=0, repr=False, init=False, compare=False)
Body: Optional[ByteString] = None
......@@ -947,15 +938,6 @@ class DataValue:
:vartype ServerPicoseconds: int
"""
ua_switches = {
"Value": ("Encoding", 0),
"StatusCode": ("Encoding", 1),
"SourceTimestamp": ("Encoding", 2),
"ServerTimestamp": ("Encoding", 3),
"SourcePicoseconds": ("Encoding", 4),
"ServerPicoseconds": ("Encoding", 5),
}
Encoding: Byte = field(default=0, repr=False, init=False, compare=False)
Value: Optional[Variant] = None
StatusCode_: Optional[StatusCode] = field(default_factory=StatusCode)
......
......@@ -7,10 +7,13 @@ import asyncio
import sys
import datetime
import logging
from dataclasses import fields
# sys.path.insert(0, "..") # load local freeopcua implementation
from asyncua.common import xmlparser
from pathlib import Path
from asyncua.common import xmlparser
from asyncua.ua.uatypes import type_string_from_type
BASE_DIR = Path.cwd().parent
......@@ -25,9 +28,9 @@ def _to_val(objs, attr, val):
def _get_uatype_name(cls, attname):
for name, uat in cls.ua_types:
if name == attname:
return uat
for field in fields(cls):
if field.name == attname:
return type_string_from_type(field.type)
raise Exception(f"Could not find attribute {attname} in obj {cls}")
......
......@@ -168,20 +168,6 @@ class CodeGenerator:
else:
self.write(f"{fieldname}: {typestring} = {'field(default_factory=list)' if field.length else self.get_default_value(field)}")
switch_written = False
for field in obj.fields:
if field.switchfield is not None:
if not switch_written:
self.write("")
self.write('ua_switches = {')
switch_written = True
bit = obj.bits[field.switchfield]
self.write(f" '{field.name}': ('{bit.container}', {bit.idx}),")
# if field.switchvalue is not None: Not sure we need to handle that one
if switch_written:
self.write("}")
if hack_names:
self.write("")
for name in hack_names:
......
......@@ -10,6 +10,8 @@ import uuid
import pytest
import logging
from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional, List
from asyncua import ua
from asyncua.ua.ua_binary import extensionobject_from_binary
......@@ -773,3 +775,24 @@ def test_expandedNodeId():
assert isinstance(nid, ua.ExpandedNodeId)
assert nid.ServerIndex == 0
assert nid.Identifier == 85
def test_struct_104():
@dataclass
class MyStruct:
Encoding: ua.Byte = field(default=0, repr=False, init=False)
a: ua.Int32 = 1
b: Optional[ua.Int32] = None
c: Optional[ua.String] = None
l: List[ua.String] = None
m = MyStruct()
data = struct_to_binary(m)
m2 = struct_from_binary(MyStruct, ua.utils.Buffer(data))
assert m == m2
m = MyStruct(a=4, b=5, c="lkjkæl", l=["a", "b", "c"])
data = struct_to_binary(m)
m2 = struct_from_binary(MyStruct, ua.utils.Buffer(data))
assert m == m2
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment