Commit ca19970e authored by smorokin's avatar smorokin Committed by GitHub

improved exporting and importing extension objects to/from xml (#1083)

* improved exporting and importing extension objects to/from xml

* fixed a type annotation

* compability to python 3.7

* fixed type annotation for python < 3.9

* remove optional parameter for <=3.8

* added tests

* comp with old pyhton versions

* fixed an error in tests

* somehow the name of the struct makes a difference
parent edafb6b6
......@@ -5,6 +5,9 @@ Helper function and classes depending on ua object are in ua_utils.py
import os
import logging
import sys
from dataclasses import Field, fields
from typing import get_type_hints, Dict, Tuple, Any, Optional
from ..ua.uaerrors import UaError
_logger = logging.getLogger(__name__)
......@@ -93,3 +96,39 @@ class Buffer:
def create_nonce(size=32):
return os.urandom(size)
def fields_with_resolved_types(
class_or_instance: Any,
globalns: Optional[Dict[str, Any]] = None,
localns: Optional[Dict[str, Any]] = None,
include_extras: bool = False,
) -> Tuple[Field, ...]:
"""Return a tuple describing the fields of this dataclass.
Accepts a dataclass or an instance of one. Tuple elements are of
type Field. ForwardRefs and string types will be resolved.
"""
fields_ = fields(class_or_instance)
if sys.version_info.major == 3 and sys.version_info.minor <= 8:
resolved_fieldtypes = get_type_hints(
class_or_instance,
globalns=globalns,
localns=localns
)
else:
resolved_fieldtypes = get_type_hints(
class_or_instance,
globalns=globalns,
localns=localns,
include_extras=include_extras
)
for field in fields_:
try:
field.type = resolved_fieldtypes[field.name]
except KeyError:
_logger.info(f"could not resolve fieldtype for field={field} of class_or_instance={class_or_instance}")
pass
return fields_
......@@ -14,8 +14,10 @@ from enum import Enum
from asyncua import ua
from asyncua.ua.uatypes import type_string_from_type
from asyncua.ua.uaerrors import UaError
from .. import Node
from ..ua import object_ids as o_ids
from .ua_utils import get_base_data_type
from .utils import fields_with_resolved_types
class XmlExporter:
......@@ -235,9 +237,13 @@ class XmlExporter:
self.aliases[dtype] = dtype_name
else:
dtype_name = self._node_to_string(dtype)
rank = await node.read_value_rank()
if rank != -1:
el.attrib["ValueRank"] = str(int(rank))
try:
rank = await node.read_value_rank()
if rank != -1:
el.attrib["ValueRank"] = str(int(rank))
except ua.uaerrors.BadAttributeIdInvalid:
pass
dim = await node.read_attribute(ua.AttributeIds.ArrayDimensions, raise_on_bad_status=False)
if dim is not None and dim.Value.Value:
el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
......@@ -373,7 +379,18 @@ class XmlExporter:
member_el = Et.SubElement(el, "uax:" + name)
if isinstance(val, (list, tuple)):
for v in val:
await self._value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
try:
type_name = ua.ObjectIdNames[dtype.Identifier]
except KeyError:
dtype_node = self.server.get_node(dtype)
enc_node = (
await dtype_node.get_referenced_nodes(
ua.ObjectIds.HasEncoding, ua.BrowseDirection.Forward
)
)[0]
type_name = ua.extension_objects_by_typeid[enc_node.nodeid].__name__
await self._value_to_etree(member_el, type_name, dtype, v)
else:
await self._val_to_etree(member_el, dtype, val)
......@@ -452,21 +469,57 @@ class XmlExporter:
await self._extobj_to_etree(el, type_name, dtype, val)
async def _extobj_to_etree(self, val_el, name, dtype, val):
if "=" in name:
try:
name = ua.extension_objects_by_datatype[dtype].__name__
except KeyError:
try:
name = ua.enums_by_datatype[dtype].__name__
except KeyError:
node: Node = self.server.get_node(dtype)
browse_name = await node.read_browse_name()
name = browse_name.Name
obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
type_el = Et.SubElement(obj_el, "uax:TypeId")
id_el = Et.SubElement(type_el, "uax:Identifier")
id_el.text = dtype.to_string()
id_el.text = self._node_to_string(dtype)
body_el = Et.SubElement(obj_el, "uax:Body")
struct_el = Et.SubElement(body_el, "uax:" + name)
await self._all_fields_to_etree(struct_el, val)
async def _all_fields_to_etree(self, struct_el, val):
for field in fields(val):
# TODO: adding the 'ua' module to the globals to resolve the type hints might not be enough.
# its possible that the type annotations also refere to classes defined in other modules.
for field in fields_with_resolved_types(val, globalns={"ua": ua}):
# FIXME; what happend if we have a custom type which is not part of ObjectIds???
if field.name == "Encoding":
continue
type_name = type_string_from_type(field.type)
await self.member_to_etree(struct_el, field.name, ua.NodeId(getattr(ua.ObjectIds, type_name)), getattr(val, field.name))
try:
dtype = ua.NodeId(getattr(ua.ObjectIds, type_name))
except AttributeError:
try:
enc_node: Node = self.server.get_node(
ua.extension_object_typeids[type_name]
)
dtype_node = (
await enc_node.get_referenced_nodes(
ua.ObjectIds.HasEncoding, ua.BrowseDirection.Inverse
)
)[0]
dtype = dtype_node.nodeid
except KeyError:
for cls in ua.enums_datatypes:
if cls.__class__ == field.type.__class__:
dtype = ua.enums_datatypes[cls]
break
self.logger.debug(
f"could not find field type {field.type} in registered types"
)
return
await self.member_to_etree(
struct_el, field.name, dtype, getattr(val, field.name)
)
def indent(elem, level=0):
......
......@@ -439,7 +439,10 @@ class XmlImporter:
atttype = type_from_list(atttype)
my_list = []
for vtype, v2 in val:
my_list.append(ua_type_to_python(v2, vtype))
if isinstance(v2, str):
my_list.append(ua_type_to_python(v2, vtype))
else:
my_list.append(v2)
fargs[attname] = my_list
elif issubclass(atttype, ua.NodeId): # NodeId representation does not follow common rules!!
......@@ -582,7 +585,10 @@ class XmlImporter:
else:
parent_node = self.server.get_node(obj.parent)
path = await parent_node.get_path()
if self.server.nodes.base_structure_type in path:
if self.server.nodes.option_set_type in path:
# nodes below option_set_type are enums, not structs
attrs.DataTypeDefinition = self._get_edef(obj)
elif self.server.nodes.base_structure_type in path:
attrs.DataTypeDefinition = self._get_sdef(obj)
else:
_logger.warning(
......
......@@ -550,6 +550,64 @@ async def test_xml_struct_optional(opc, tmpdir):
assert t.MyInt64 == 5
async def test_xml_struct_with_value(opc, tmpdir):
idx = 4
my_struct, _ = await new_struct(opc.opc, idx, "MyStructWithValue", [
new_struct_field("int_value", ua.VariantType.Int64, optional=False),
])
await opc.opc.load_data_type_definitions()
valnode = await opc.opc.nodes.objects.add_variable(idx, "my_struct", ua.Variant(ua.MyStructWithValue(), ua.VariantType.ExtensionObject))
new_value = ua.MyStructWithValue()
new_value.int_value = 14
await valnode.write_value(ua.Variant(new_value, ua.VariantType.ExtensionObject))
tmp_path = tmpdir.join("export-struct-with-value.xml").strpath
await opc.opc.export_xml([my_struct, valnode], tmp_path, export_values=True)
await opc.opc.delete_nodes([my_struct, valnode])
new_nodes = await opc.opc.import_xml(tmp_path)
imported_my_struct = opc.opc.get_node(new_nodes[0])
imported_valnode = opc.opc.get_node(new_nodes[1])
assert my_struct == imported_my_struct
assert valnode == imported_valnode
await opc.opc.load_data_type_definitions()
value = await valnode.read_value()
imported_value = await imported_valnode.read_value()
assert value == imported_value
async def test_xml_struct_in_struct_with_value(opc, tmpdir):
idx = 4
inner_struct, _ = await new_struct(opc.opc, idx, "MyInnerStruct", [
new_struct_field("int_value", ua.VariantType.Int64, optional=False),
])
outer_struct, _ = await new_struct(opc.opc, idx, "MyOuterStruct", [
new_struct_field("inner_struct_value", inner_struct, optional=False),
])
await opc.opc.load_data_type_definitions()
valnode = await opc.opc.nodes.objects.add_variable(idx, "my_outer_struct", ua.Variant(ua.MyOuterStruct(), ua.VariantType.ExtensionObject))
new_value = ua.MyOuterStruct()
new_value.inner_struct_value.int_value = 42
await valnode.write_value(ua.Variant(new_value, ua.VariantType.ExtensionObject))
tmp_path = tmpdir.join("export-struct-in-struct-with-value.xml").strpath
await opc.opc.export_xml([outer_struct, inner_struct, valnode], tmp_path, export_values=True)
await opc.opc.delete_nodes([outer_struct, inner_struct, valnode])
new_nodes = await opc.opc.import_xml(tmp_path)
imported_outer_struct = opc.opc.get_node(new_nodes[0])
imported_inner_struct = opc.opc.get_node(new_nodes[1])
imported_valnode = opc.opc.get_node(new_nodes[2])
assert outer_struct == imported_outer_struct
assert inner_struct == imported_inner_struct
await opc.opc.load_data_type_definitions()
value = await valnode.read_value()
imported_value = await imported_valnode.read_value()
assert value == imported_value
async def test_basetype_alias(opc):
idx = 4
# Alias double
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment