Commit cf4943c6 authored by Alexander Schrode's avatar Alexander Schrode Committed by oroulet

Loading of OptionSets

Allow using OptionSets from Addresspace und dynamic creation of OptionSets
parent 05ba8ffa
......@@ -16,6 +16,7 @@ class Shortcuts:
self.base_variable_type = Node(server, ObjectIds.BaseVariableType)
self.folder_type = Node(server, ObjectIds.FolderType)
self.enum_data_type = Node(server, ObjectIds.Enumeration)
self.option_set_type = Node(server, ObjectIds.OptionSet)
self.types = Node(server, ObjectIds.TypesFolder)
self.data_types = Node(server, ObjectIds.DataTypesFolder)
self.event_types = Node(server, ObjectIds.EventTypesFolder)
......
from enum import Enum
from enum import IntEnum
from enum import IntEnum, IntFlag
from datetime import datetime
import uuid
import logging
......@@ -88,6 +88,7 @@ async def new_enum(
idx: Union[int, ua.NodeId],
name: Union[int, ua.QualifiedName],
values: List[str],
option_set: bool = False
) -> Node:
edef = ua.EnumDefinition()
counter = 0
......@@ -98,7 +99,9 @@ async def new_enum(
field.Value = counter
counter += 1
edef.Fields.append(field)
if option_set:
dtype = await server.nodes.option_set_type.add_data_type(idx, name)
else:
dtype = await server.nodes.enum_data_type.add_data_type(idx, name)
await dtype.write_data_type_definition(edef)
return dtype
......@@ -205,7 +208,7 @@ class {struct_name}:
return code
async def _generate_object(name, sdef, data_type=None, env=None, enum=False):
async def _generate_object(name, sdef, data_type=None, env=None, enum=False, option_set=False):
"""
generate Python code and execute in a new environment
return a dict of structures {name: class}
......@@ -223,6 +226,7 @@ async def _generate_object(name, sdef, data_type=None, env=None, enum=False):
env['uuid'] = uuid
if "enum" not in env:
env['IntEnum'] = IntEnum
env['IntFlag'] = IntFlag
if "dataclass" not in env:
env['dataclass'] = dataclass
if "Optional" not in env:
......@@ -233,7 +237,7 @@ async def _generate_object(name, sdef, data_type=None, env=None, enum=False):
env['field'] = field
# generate classe add it to env dict
if enum:
code = make_enum_code(name, sdef)
code = make_enum_code(name, sdef, option_set)
else:
code = make_structure_code(data_type, name, sdef)
logger.debug("Executing code: %s", code)
......@@ -309,6 +313,7 @@ async def load_data_type_definitions(server: Union["Server", "Client"], base_nod
on server and generate Python objects in ua namespace to be used to talk with server
"""
new_objects = await load_enums(server) # we need all enums to generate structure code
new_objects.update(await load_enums(server, server.nodes.option_set_type, True)) # also load all optionsets
if base_node is None:
base_node = server.nodes.base_structure_type
dtypes = []
......@@ -344,13 +349,14 @@ async def _read_data_type_definition(server, desc: ua.BrowseDescription, read_ex
return sdef
def make_enum_code(name, edef):
def make_enum_code(name, edef, option_set):
"""
if node has a DataTypeDefinition attribute, generate enum code
"""
enum_type = "IntEnum" if not option_set else "IntFlag"
code = f"""
class {name}(IntEnum):
class {name}({enum_type}):
'''
{name} EnumInt autogenerated from EnumDefinition
......@@ -360,13 +366,13 @@ class {name}(IntEnum):
for sfield in edef.Fields:
name = clean_name(sfield.Name)
value = sfield.Value
value = sfield.Value if not option_set else (1 << sfield.Value)
code += f" {name} = {value}\n"
logger.error(f"{name} - {sfield} {option_set} {code}")
return code
async def load_enums(server: Union["Server", "Client"], base_node: Node = None) -> None:
async def load_enums(server: Union["Server", "Client"], base_node: Node = None, option_set: bool = False) -> None:
if base_node is None:
base_node = server.nodes.enum_data_type
new_enums = {}
......@@ -374,11 +380,11 @@ async def load_enums(server: Union["Server", "Client"], base_node: Node = None)
name = clean_name(desc.BrowseName.Name)
if hasattr(ua, name):
continue
logger.info("Registring Enum %s %s", desc.NodeId, name)
logger.info("Registring Enum %s %s OptionSet=%s", desc.NodeId, name, option_set)
edef = await _read_data_type_definition(server, desc)
if not edef:
continue
env = await _generate_object(name, edef, enum=True)
env = await _generate_object(name, edef, enum=True, option_set=option_set)
ua.register_enum(name, desc.NodeId, env[name])
new_enums[name] = env[name]
return new_enums
......@@ -646,6 +646,7 @@ def new_enum(
idx: Union[int, ua.NodeId],
name: Union[int, ua.QualifiedName],
values: List[str],
optional: bool = False
) -> SyncNode:
pass
......
......@@ -1184,6 +1184,16 @@ async def test_custom_enum_x(opc):
assert val == 1
async def test_custom_option_set(opc):
idx = 4
await new_enum(opc.opc, idx, "MyOptionSet", ["tata", "titi", "toto", "None"], True)
await opc.opc.load_data_type_definitions()
assert ua.MyOptionSet.toto | ua.MyOptionSet.titi == ua.MyOptionSet((1 << 2) | (1 << 1))
var = await opc.opc.nodes.objects.add_variable(idx, "my_option", ua.MyOptionSet.toto | ua.MyOptionSet.titi)
val = await var.read_value()
assert val == (1 << 2) | (1 << 1)
async def test_custom_struct_(opc):
idx = 4
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment