Commit 295d810c authored by Alexander Schrode's avatar Alexander Schrode Committed by oroulet

Add support for pre 1.04 bitfields

parent 9d5a5f60
......@@ -10,7 +10,7 @@ import logging
from datetime import datetime
from enum import IntEnum, EnumMeta
from dataclasses import dataclass, field
from typing import List
from typing import List, Tuple, Union
from xml.etree import ElementTree as ET
......@@ -65,6 +65,7 @@ class Struct:
self.name = clean_name(name)
self.fields = []
self.typeid = None
self.bit_mapping = {}
def __str__(self):
return f"Struct(name={self.name}, fields={self.fields}"
......@@ -92,6 +93,22 @@ class {self.name}:
if isinstance(uavalue, str) and uavalue.startswith("ua."):
uavalue = f"field(default_factory=lambda: {uavalue})"
code += f" {sfield.name}:{uatype} = {uavalue}\n"
for name, encode_tuple in self.bit_mapping.items():
src_field, bit_no, _ = encode_tuple
# Skip Reserved fields they are only for padding
if not name.startswith('Reserved'):
code += f"""
@property
def {name}(self) -> bool:
return self.{src_field} >> {bit_no} != 0
@{name}.setter
def {name}(self, value: bool) -> None:
if value:
self.{src_field} |= (1<<{bit_no})
else:
self.{src_field} &= ~(1<<{bit_no})
"""
return code
......@@ -108,6 +125,38 @@ class Field(object):
__repr__ = __str__
class BitFieldState:
def __init__(self):
self.encoding_field: Union[Field, None] = None
self.bit_size = 0
self.bit_offset = 0
self.encoding_field_counter = 0
def add_bit(self, length: int) -> Union[Field, None]:
""" Returns field if a new one was added. Else None """
if not self.encoding_field:
return self.reset_encoding_field()
else:
if self.bit_size + length > 32:
return self.reset_encoding_field()
else:
self.bit_size += length
self.bit_offset += 1
return None
def reset_encoding_field(self) -> Field:
field = Field(f"BitEncoding{self.encoding_field_counter}")
field.uatype = "UInt32"
self.encoding_field = field
self.bit_offset = 0
self.encoding_field_counter += 1
return field
def get_bit_info(self) -> Tuple[str, int]:
""" With the field name and bit offset, we can extract the bit later."""
return self.encoding_field.name, self.bit_offset
class StructGenerator(object):
def __init__(self):
self.model = []
......@@ -137,26 +186,37 @@ class StructGenerator(object):
for child in root:
if child.tag.endswith("StructuredType"):
bit_state = BitFieldState()
struct = Struct(child.get("Name"))
array = False
# these lines can be reduced in >= Python3.8 with root.iterfind("{*}Field") and similar
for xmlfield in child:
if xmlfield.tag.endswith("Field"):
name = xmlfield.get("Name")
_clean_name = clean_name(name)
if name.startswith("NoOf"):
array = True
continue
field = Field(clean_name(name))
field.uatype = xmlfield.get("TypeName")
if ":" in field.uatype:
field.uatype = field.uatype.split(":")[1]
field.uatype = clean_name(field.uatype)
field.value = get_default_value(field.uatype, enums)
if array:
field.array = True
field.value = "field(default_factory=list)"
array = False
struct.fields.append(field)
_type = xmlfield.get("TypeName")
if ":" in _type:
_type = _type.split(":")[1]
if _type == 'Bit':
# Bit is smaller than 1Byte so we need to chain mutlitple bit fields together
# as one byte
bit_length = int(xmlfield.get("Length", 1))
field = bit_state.add_bit(bit_length)
# Whether or not a new encoding field was added, we want to store the current one.
struct.bit_mapping[name] = (bit_state.encoding_field.name, bit_state.bit_offset, bit_length)
else:
field = Field(_clean_name)
field.uatype = clean_name(_type)
if field:
field.value = get_default_value(field.uatype, enums)
if array:
field.array = True
field.value = "field(default_factory=list)"
array = False
struct.fields.append(field)
self.model.append(struct)
def save_to_file(self, path, register=False):
......
<?xml version="1.0" encoding="UTF-8"?>
<opc:TypeDictionary xmlns:opc="http://opcfoundation.org/BinarySchema/" xmlns:tns="http://example.com/example" xmlns:ua="http://opcfoundation.org/UA/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" DefaultByteOrder="LittleEndian" TargetNamespace="http://example.com/example">
<opc:Import Namespace="http://opcfoundation.org/UA/" />
<opc:StructuredType BaseType="ua:ExtensionObject" Name="ChannelIdDataType">
<opc:Field TypeName="opc:Int16" Name="Id" />
<opc:Field TypeName="tns:ChannelType" Name="Type" />
</opc:StructuredType>
<opc:StructuredType BaseType="ua:ExtensionObject" Name="CurveDataType">
<opc:Field TypeName="opc:Bit" Name="DescriptionSpecified" />
<opc:Field Length="31" TypeName="opc:Bit" Name="Reserved1" />
<opc:Field TypeName="tns:ChannelIdDataType" Name="ChannelId" />
<opc:Field TypeName="opc:Int32" Name="NoOfData" />
<opc:Field LengthField="NoOfData" TypeName="tns:CurvePointDataType" Name="Data" />
<opc:Field SwitchField="DescriptionSpecified" TypeName="opc:CharArray" Name="Description" />
</opc:StructuredType>
<opc:StructuredType BaseType="ua:ExtensionObject" Name="CurvePointDataType">
<opc:Field TypeName="opc:Double" Name="Time" />
<opc:Field TypeName="opc:Double" Name="Data" />
</opc:StructuredType>
<opc:StructuredType BaseType="ua:ExtensionObject" Name="DigitalSignalChangeDataType">
<opc:Field TypeName="tns:InputSignalIdDataType" Name="Id" />
<opc:Field TypeName="opc:Double" Name="Time" />
</opc:StructuredType>
<opc:StructuredType BaseType="ua:ExtensionObject" Name="InputSignalIdDataType">
<opc:Field TypeName="tns:InputSignalCategory" Name="Category" />
<opc:Field TypeName="tns:SignalSubIdDataType" Name="SubId" />
</opc:StructuredType>
<opc:StructuredType BaseType="ua:ExtensionObject" Name="ProcessValueType">
<opc:Field TypeName="opc:Bit" Name="cavityIdSpecified" />
<opc:Field TypeName="opc:Bit" Name="descriptionSpecified" />
<opc:Field Length="30" TypeName="opc:Bit" Name="Reserved1" />
<opc:Field TypeName="opc:CharArray" Name="name" />
<opc:Field TypeName="opc:Double" Name="value" />
<opc:Field TypeName="opc:UInt32" Name="assignment" />
<opc:Field TypeName="opc:UInt32" Name="source" />
<opc:Field SwitchField="cavityIdSpecified" TypeName="opc:UInt32" Name="cavityId" />
<opc:Field TypeName="opc:CharArray" Name="id" />
<opc:Field SwitchField="descriptionSpecified" TypeName="opc:CharArray" Name="description" />
</opc:StructuredType>
<opc:StructuredType BaseType="ua:ExtensionObject" Name="SignalSubIdDataType">
<opc:Field TypeName="opc:Byte" Name="Number" />
<opc:Field TypeName="opc:Byte" Name="Number2" />
<opc:Field TypeName="opc:Byte" Name="Letter" />
</opc:StructuredType>
<opc:EnumeratedType LengthInBits="32" Name="ChannelType">
<opc:EnumeratedValue Name="Universal" Value="0" />
<opc:EnumeratedValue Name="Pressure" Value="1" />
<opc:EnumeratedValue Name="Customizable" Value="2" />
</opc:EnumeratedType>
<opc:EnumeratedType LengthInBits="32" Name="InputSignalCategory">
<opc:EnumeratedValue Name="Unassigned" Value="0" />
<opc:EnumeratedValue Name="MeasuringStart" Value="1" />
<opc:EnumeratedValue Name="MeasuringStop" Value="2" />
<opc:EnumeratedValue Name="CycleEnd" Value="3" />
<opc:EnumeratedValue Name="SwitchOverPoint" Value="4" />
<opc:EnumeratedValue Name="CycleEvent" Value="5" />
</opc:EnumeratedType>
<opc:EnumeratedType LengthInBits="32" Name="MoldTypeEnumeration">
<opc:Documentation>Mold type</opc:Documentation>
<opc:EnumeratedValue Name="NormalMold" Value="0" />
<opc:EnumeratedValue Name="MultiComponentMold" Value="1" />
<opc:EnumeratedValue Name="RTMMold" Value="2" />
</opc:EnumeratedType>
<opc:EnumeratedType LengthInBits="32" Name="SetupChangeType">
<opc:EnumeratedValue Name="SetupLoaded" Value="0" />
<opc:EnumeratedValue Name="SetupChanged" Value="1" />
<opc:EnumeratedValue Name="SetupCreated" Value="2" />
<opc:EnumeratedValue Name="SetupUnloaded" Value="3" />
<opc:EnumeratedValue Name="SetupImported" Value="4" />
</opc:EnumeratedType>
<opc:EnumeratedType LengthInBits="32" Name="StopConditionType">
<opc:Documentation>Type of measurement stop condition</opc:Documentation>
<opc:EnumeratedValue Name="Time" Value="0" />
<opc:EnumeratedValue Name="StartSignal" Value="1" />
<opc:EnumeratedValue Name="StopSignal" Value="2" />
</opc:EnumeratedType>
</opc:TypeDictionary>
\ No newline at end of file
......@@ -3,6 +3,7 @@ import xml.etree.ElementTree as Et
import pytest
from asyncua import ua
from asyncua.common.structures import EnumType, StructGenerator, Struct
import asyncua.common.type_dictionary_builder
from asyncua.common.type_dictionary_builder import OPCTypeDictionaryBuilder, DataTypeDictionaryBuilder
from asyncua.common.type_dictionary_builder import get_ua_class, StructNode
......@@ -389,3 +390,33 @@ async def test_functional_advance(srv):
nested_result = await nested_var.read_value()
assert nested_result == nested_msg
await srv.srv.delete_nodes([basic_var, nested_var])
async def test_bitfields(srv):
# We use a bsd file from a server dict, because we only provide bitsets for backwards compatibility
xmlpath = "tests/custom_extension_with_optional_fields.xml"
structs_dict = {}
c = StructGenerator()
c.make_model_from_file(xmlpath)
c.get_python_classes(structs_dict)
for m in c.model:
if type(m) in (Struct, EnumType):
m.typeid = ua.NodeId(m.name, 1)
ua.register_extension_object(m.name, m.typeid, structs_dict[m.name])
c.set_typeid(m.name, m.typeid.to_string())
await srv.dict_builder.set_dict_byte_string()
await srv.srv.load_type_definitions()
v = ua.ProcessValueType(name='XXX')
v.cavityId = False
v.description = True
bitfield_var = await srv.srv.nodes.objects.add_variable(
ua.NodeId(NamespaceIndex=srv.idx), 'BitFieldSetsTest',
ua.Variant(None, ua.VariantType.ExtensionObject),
datatype=ua.NodeId('ProcessValueType', 1)
)
await bitfield_var.write_value(v)
bit_res = await bitfield_var.read_value()
assert v.name == 'XXX'
assert not v.cavityId
assert v.description
assert v == bit_res
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment