Commit 307e321f authored by oroulet's avatar oroulet Committed by oroulet

port old structures, donot compare Encoding

parent 9b946e22
......@@ -9,6 +9,9 @@ import logging
# The next two imports are for generated code
from datetime import datetime
from enum import IntEnum, EnumMeta
from dataclasses import dataclass
from xml.etree import ElementTree as ET
from asyncua import ua
......@@ -70,6 +73,7 @@ class Struct:
def get_code(self):
code = f"""
@dataclass
class {self.name}:
'''
......@@ -77,27 +81,15 @@ class {self.name}:
'''
"""
code += ' ua_types = [\n'
for field in self.fields:
prefix = 'ListOf' if field.array else ''
uatype = prefix + field.uatype
if uatype == 'ListOfChar':
uatype = f"ua.{field.uatype}"
if field.array:
uatype = f"List[{uatype}]"
else:
uatype = uatype
if uatype == 'List[ua.Char]':
uatype = 'String'
code += f" ('{field.name}', '{uatype}'),\n"
code += " ]"
code += """
def __str__(self):
vals = [name + ": " + str(val) for name, val in self.__dict__.items()]
return self.__class__.__name__ + "(" + ", ".join(vals) + ")"
__repr__ = __str__
def __init__(self):
"""
if not self.fields:
code += " pass"
for field in self.fields:
code += f" self.{field.name} = {field.value}\n"
code += f" {field.name}:{uatype} = {field.value}\n"
return code
......@@ -274,6 +266,8 @@ def _generate_python_class(model, env=None):
env['uuid'] = uuid
if "enum" not in env:
env['IntEnum'] = IntEnum
if "dataclass" not in env:
env['dataclass'] = dataclass
# generate classes one by one and add them to dict
for element in model:
code = element.get_code()
......
......@@ -4,7 +4,8 @@ from datetime import datetime
import uuid
import logging
import re
from typing import Union, List, TYPE_CHECKING, Tuple
from typing import Union, List, TYPE_CHECKING, Tuple, Optional
from dataclasses import dataclass, field
from asyncua import ua
from asyncua import Node
......@@ -29,9 +30,9 @@ def new_struct_field(
field.Name = name
field.IsOptional = optional
if description:
field.Description = ua.LocalizedText(text=description)
field.Description = ua.LocalizedText(Text=description)
else:
field.Description = ua.LocalizedText(text=name)
field.Description = ua.LocalizedText(Text=name)
if isinstance(dtype, ua.VariantType):
field.DataType = ua.NodeId(dtype.value, 0)
elif isinstance(dtype, ua.NodeId):
......@@ -91,7 +92,7 @@ async def new_enum(
counter = 0
for val_name in values:
field = ua.EnumField()
field.DisplayName = ua.LocalizedText(text=val_name)
field.DisplayName = ua.LocalizedText(Text=val_name)
field.Name = val_name
field.Value = counter
counter += 1
......@@ -148,6 +149,7 @@ def make_structure_code(data_type, struct_name, sdef):
raise NotImplementedError(f"Only StructureType implemented, not {ua.StructureType(sdef.StructureType).name} for node {struct_name} with DataTypdeDefinition {sdef}")
code = f"""
@dataclass
class {struct_name}(ua.FrozenClass):
'''
......@@ -157,64 +159,51 @@ class {struct_name}(ua.FrozenClass):
data_type = ua.NodeId.from_string("{data_type.to_string()}")
"""
counter = 0
# FIXME: to support inheritance we probably need to add all fields from parents
# this requires network call etc...
if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
code += ' ua_switches = {\n'
for field in sdef.Fields:
fname = clean_name(field.Name)
if field.IsOptional:
code += f" '{fname}': ('Encoding', {counter}),\n"
counter += 1
code += " }\n\n"
code += ' ua_types = [\n'
if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
code += " ('Encoding', 'Byte'),\n"
uatypes = []
for field in sdef.Fields:
fname = clean_name(field.Name)
prefix = ""
if field.ValueRank >= 1 or field.ArrayDimensions:
prefix = 'ListOf'
if field.DataType.NamespaceIndex == 0 and field.DataType.Identifier in ua.ObjectIdNames:
uatype = ua.ObjectIdNames[field.DataType.Identifier]
elif field.DataType in ua.extension_objects_by_datatype:
uatype = ua.extension_objects_by_datatype[field.DataType].__name__
elif field.DataType in ua.enums_by_datatype:
uatype = ua.enums_by_datatype[field.DataType].__name__
code += " Encoding: ua.Byte = field(default=0, repr=False, init=False, compare=False)\n"
for sfield in sdef.Fields:
fname = clean_name(sfield.Name)
if sfield.DataType.NamespaceIndex == 0 and sfield.DataType.Identifier in ua.ObjectIdNames:
uatype = ua.ObjectIdNames[sfield.DataType.Identifier]
elif sfield.DataType in ua.extension_objects_by_datatype:
uatype = ua.extension_objects_by_datatype[sfield.DataType].__name__
elif sfield.DataType in ua.enums_by_datatype:
uatype = ua.enums_by_datatype[sfield.DataType].__name__
else:
# FIXME: we are probably missing many custom tyes here based on builtin types
# maybe we can use ua_utils.get_base_data_type()
raise RuntimeError(f"Unknown datatype for field: {field} in structure:{struct_name}, please report")
if field.ValueRank >= 1 and uatype == 'Char':
uatype = 'String'
uatypes.append((field, uatype))
code += f" ('{fname}', '{prefix + uatype}'),\n"
code += " ]\n"
code += f"""
def __str__(self):
vals = [f"{{field_name}}:{{val}}" for field_name, val in self.__dict__.items()]
return f"{struct_name}({{','.join(vals)}})"
raise RuntimeError(f"Unknown datatype for field: {sfield} in structure:{struct_name}, please report")
__repr__ = __str__
def __init__(self):
"""
if not sdef.Fields:
code += " pass"
if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
code += " self.Encoding = 0\n"
for field, uatype in uatypes:
fname = clean_name(field.Name)
if field.ValueRank >= 1:
if sfield.ValueRank >= 1:
default_value = "[]"
else:
default_value = get_default_value(uatype)
code += f" self.{fname} = {default_value}\n"
code += " self._freeze = True\n"
uatype = f"ua.{uatype}"
if sfield.ValueRank >= 1 and uatype == 'Char':
uatype = 'String'
elif sfield.ValueRank >= 1 or sfield.ArrayDimensions:
uatype = f"List[{uatype}]"
elif sfield.IsOptional:
uatype = f"Optional[{uatype}]"
code += f" {fname}: {uatype} = {default_value}\n"
counter = 0
# FIXME: to support inheritance we probably need to add all fields from parents
# this requires network call etc...
if sdef.StructureType == ua.StructureType.StructureWithOptionalFields:
code += ' ua_switches = {\n'
for sfield in sdef.Fields:
fname = clean_name(sfield.Name)
if sfield.IsOptional:
code += f" '{fname}': ('Encoding', {counter}),\n"
counter += 1
code += " }\n\n"
print("CODE", code)
return code
......@@ -236,6 +225,14 @@ async def _generate_object(name, sdef, data_type=None, env=None, enum=False):
env['uuid'] = uuid
if "enum" not in env:
env['IntEnum'] = IntEnum
if "dataclass" not in env:
env['dataclass'] = dataclass
if "Optional" not in env:
env['Optional'] = Optional
if "List" not in env:
env['List'] = List
if "field" not in env:
env['field'] = field
# generate classe add it to env dict
if enum:
code = make_enum_code(name, sdef)
......
This diff is collapsed.
......@@ -3,7 +3,7 @@ implement ua datatypes
"""
import logging
from typing import Optional, Any, Union
from typing import Optional, Any, Union, get_origin, get_args
from enum import Enum, IntEnum
from calendar import timegm
import uuid
......@@ -22,6 +22,38 @@ HUNDREDS_OF_NANOSECONDS = 10000000
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
def type_is_union(uatype):
return get_origin(uatype) == Union
def type_is_list(uatype):
return get_origin(uatype) == list
def type_from_union(uatype, origin=None):
if origin is None:
origin = get_origin(uatype)
if origin != Union:
raise ValueError(f"{uatype} is not an Union")
# need to find out what real type is
for subtype in get_args(uatype):
if subtype is not None.__class__: # FIXME: strange comparison...
return subtype
raise ValueError(f"Union {uatype} does not seem to contain a valid type")
def type_from_list(uatype):
return get_args(uatype)[0]
def type_string_from_type(uatype):
if type_is_union(uatype):
uatype = type_from_union(uatype)
elif type_is_list(uatype):
uatype = type_from_list(uatype)
return uatype.__name__
class SByte(bytes):
pass
......@@ -584,7 +616,7 @@ class LocalizedText:
A string qualified with a namespace index.
"""
Encoding: Byte = field(default=0, repr=False, init=False)
Encoding: Byte = field(default=0, repr=False, init=False, compare=False)
Locale: Optional[String] = None
Text: Optional[String] = None
......@@ -641,7 +673,7 @@ class ExtensionObject:
}
TypeId: NodeId = NodeId()
Encoding: Byte = field(default=0, repr=False, init=False)
Encoding: Byte = field(default=0, repr=False, init=False, compare=False)
Body: Optional[ByteString] = None
def __bool__(self):
......@@ -813,32 +845,30 @@ class Variant:
val = val[0]
if val is None:
return VariantType.Null
elif isinstance(val, bool):
if isinstance(val, bool):
return VariantType.Boolean
elif isinstance(val, float):
if isinstance(val, float):
return VariantType.Double
elif isinstance(val, IntEnum):
if isinstance(val, IntEnum):
return VariantType.Int32
elif isinstance(val, int):
if isinstance(val, int):
return VariantType.Int64
elif isinstance(val, str):
if isinstance(val, str):
return VariantType.String
elif isinstance(val, bytes):
if isinstance(val, bytes):
return VariantType.ByteString
elif isinstance(val, datetime):
if isinstance(val, datetime):
return VariantType.DateTime
elif isinstance(val, uuid.UUID):
if isinstance(val, uuid.UUID):
return VariantType.Guid
else:
if isinstance(val, object):
try:
return getattr(VariantType, val.__class__.__name__)
except AttributeError:
return VariantType.ExtensionObject
else:
raise UaError(
f"Could not guess UA type of {val} with type {type(val)}, specify UA type"
)
if isinstance(val, object):
try:
return getattr(VariantType, val.__class__.__name__)
except AttributeError:
return VariantType.ExtensionObject
raise UaError(
f"Could not guess UA type of {val} with type {type(val)}, specify UA type"
)
def _split_list(l, n):
......@@ -907,7 +937,7 @@ class DataValue:
"ServerPicoseconds": ("Encoding", 5),
}
Encoding: Byte = field(default=0, repr=False, init=False)
Encoding: Byte = field(default=0, repr=False, init=False, compare=False)
Value: Optional[Variant] = None
StatusCode_: Optional[StatusCode] = field(default_factory=StatusCode)
SourceTimestamp: Optional[DateTime] = None
......@@ -1044,8 +1074,7 @@ def get_extensionobject_class_type(typeid):
"""
if typeid in extension_objects_by_typeid:
return extension_objects_by_typeid[typeid]
else:
return None
return None
class SecurityPolicyType(Enum):
......
......@@ -158,7 +158,7 @@ class CodeGenerator:
if field.name == "Encoding":
val = 0 if not extobj_hack else 1
self.write(f"{field.name}: Byte = field(default={val}, repr=False, init=False)")
self.write(f"{field.name}: Byte = field(default={val}, repr=False, init=False, compare=False)")
elif field.uatype == obj.name: # help!!! selv referencing class
#FIXME: handle better
self.write(f"{fieldname}: Optional[ExtensionObject] = None")
......
......@@ -739,8 +739,6 @@ def test_bin_data_type_def():
dta = ua.DataTypeAttributes()
dta.DisplayName = ua.LocalizedText("titi")
ad.NodeAttributes = dta
from IPython import embed
embed()
data = struct_to_binary(ad)
ad2 = struct_from_binary(ua.AddNodesItem, ua.utils.Buffer(data))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment