Commit 34a4ea5d authored by oroulet's avatar oroulet

make it possible to load one custom structure

parent 26fa7d89
......@@ -4,7 +4,7 @@ from datetime import datetime
import uuid
import logging
import re
from typing import Union, List, TYPE_CHECKING, Tuple, Optional
from typing import Union, List, TYPE_CHECKING, Tuple, Optional, Any
from dataclasses import dataclass, field
from asyncua import ua
......@@ -272,12 +272,37 @@ async def _recursive_parse(server, base_node, dtypes, parent_sdef=None, add_exis
await _recursive_parse(server, server.get_node(desc.NodeId), dtypes, parent_sdef=sdef, add_existing=add_existing)
async def _get_parent_types(node: Node):
parents = []
tmp_node = node
for _ in range(10):
refs = await tmp_node.get_references(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse)
if not refs or refs[0].NodeId.NamespaceIndex == 0 and refs[0].NodeId.Identifier == 22:
return parents
tmp_node = ua.Node(tmp_node.server, refs[0])
parents.append(tmp_node)
logger.warning("Went 10 layers up while look of subtype of given node %s, something is wrong: %s", node, parents)
async def load_custom_struct(node: Node) -> Any:
sdef = await node.read_data_type_definition()
name = (await node.read_browse_name()).Name
for parent in await _get_parent_types(node):
parent_sdef = await parent.read_data_type_definition()
for f in reversed(parent_sdef.fields):
sdef.Fields.insert(0, f)
env = await _generate_object(name, sdef, data_type=node.nodeid)
struct = env[name]
ua.register_extension_object(name, sdef.DefaultEncodingId, struct, node.nodeid)
return env[name]
async def load_data_type_definitions(server: Union["Server", "Client"], base_node: Node = None, overwrite_existing=False) -> None:
"""
Read DataTypeDefition attribute on all Structure and Enumeration defined
on server and generate Python objects in ua namespace to be used to talk with server
"""
await load_enums(server) # we need all enums to generate structure code
new_objects = await load_enums(server) # we need all enums to generate structure code
if base_node is None:
base_node = server.nodes.base_structure_type
dtypes = []
......@@ -286,9 +311,11 @@ async def load_data_type_definitions(server: Union["Server", "Client"], base_nod
for dts in dtypes:
try:
env = await _generate_object(dts.name, dts.sdef, data_type=dts.data_type)
ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.desc.NodeId)
ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.data_type)
new_objects[dts.name] = env[dts.name]
except NotImplementedError:
logger.exception("Structure type %s not implemented", dts.sdef)
return new_objects
async def _read_data_type_definition(server, desc: ua.BrowseDescription, read_existing: bool = False):
......@@ -336,6 +363,7 @@ class {name}(IntEnum):
async def load_enums(server: Union["Server", "Client"], base_node: Node = None) -> None:
if base_node is None:
base_node = server.nodes.enum_data_type
new_enums = {}
for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
name = clean_name(desc.BrowseName.Name)
if hasattr(ua, name):
......@@ -346,3 +374,5 @@ async def load_enums(server: Union["Server", "Client"], base_node: Node = None)
continue
env = await _generate_object(name, edef, enum=True)
ua.register_enum(name, desc.NodeId, env[name])
new_enums[name] = env[name]
return new_enums
......@@ -2,6 +2,7 @@ import asyncio
import logging
from asyncua import Client, Node, ua
from asyncua.common.structures104 import load_custom_struct
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')
......@@ -21,5 +22,9 @@ async def main():
my_struct_opt = await client.nodes.objects.get_child(f"{idx}:my_struct_optional")
print("STRUCT WITH OPTIA VALUE", await my_struct_opt.read_value())
# loading one specific custom struct
mystructnode = await client.nodes.base_structure_type.get_child(f"{idx}:MyStruct")
my_type = await load_custom_struct(mystructnode)
if __name__ == '__main__':
asyncio.run(main())
......@@ -33,7 +33,10 @@ async def main():
"tutu",
])
await server.load_data_type_definitions()
custom_objs = await server.load_data_type_definitions()
print("Custom objects on server")
for name, obj in custom_objs.items():
print(" ", obj)
valnode = await server.nodes.objects.add_variable(idx, "my_enum", ua.MyEnum.toto)
await server.nodes.objects.add_variable(idx, "my_struct", ua.Variant(ua.MyStruct(), ua.VariantType.ExtensionObject))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment