Commit bf32fdbc authored by oroulet's avatar oroulet Committed by oroulet

start work on savng structures using definition from 1.04 spec

parent f874f7db
......@@ -10,6 +10,59 @@ from asyncua import ua
logger = logging.getLogger(__name__)
def new_struct_field(name, dtype, array=False, optional=False):
"""
simple way to create a StructureField
"""
field = ua.StructureField()
field.Name = name
field.IsOptional = optional
if isinstance(dtype, ua.VariantType):
field.DataType = ua.NodeId(dtype.value, 0)
elif isinstance(dtype, ua.NodeId):
field.DataType = dtype
elif isinstance(dtype, ua.Node):
field.DataType = dtype.nodeid
else:
raise ValueError(f"Datatype of a field must be a NodeId, not {dtype} of type {type(dtype)}")
if array:
field.ValueRand = ua.ValueRank.OneOrMoreDimensions
field.ArrayDimensions = [1]
return field
async def new_struct(server, idx, name, *fields):
"""
simple way to create a new structure
"""
sdef = ua.StructureDefinition()
sdef.StructureType = ua.StructureType.Structure
sdef.fields = fields
sdef.BaseDatatype = ??
sdef.DefaultEncodingId = ??
dtype = await server.nodes.base_structure_type.add_data_type(idx, name)
await dtype.write_data_type_definition(sdef)
return dtype
async def new_enum(server, idx, name, *args):
edef = ua.EnumDefinition()
edef.Name = name
counter = 0
for val_name in args:
field = ua.EnumField()
field.DisplayName = ua.LocalizedText(text=val_name)
field.Name = val_name
field.Value = counter
counter += 1
edef.Fields.append(field)
dtype = await server.nodes.enum_data_type.add_data_type(idx, name)
await dtype.write_data_type_definition(edef)
return dtype
def clean_name(name):
"""
Remove characters that might be present in OPC UA structures
......
import logging
import asyncio
from asyncua import ua, Server
from asyncua.common.structures104 import new_struct, new_enum, new_struct_field
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')
async def main():
# setup our server
server = Server()
await server.init()
server.set_endpoint('opc.tcp://0.0.0.0:4840/freeopcua/server/')
# setup our own namespace, not really necessary but should as spec
uri = 'http://examples.freeopcua.github.io'
idx = await server.register_namespace(uri)
mystruct = await server.nodes.base_structure_type.add_data_type(idx, "MyStruct")
sdef = ua.StructureDefinition()
sdef.StructureType = ua.StructureType.Structure
sdef.Fields = [new_struct_field("MyBool", ua.VariantType.Boolean), new_struct_field("MyUInt32", ua.VariantType.UInt32)]
await mystruct.write_data_type_definition(sdef)
await new_struct(server, idx, "MySimpleStruct", [new_struct_field("MyBool", ua.VariantType.Boolean), new_struct_field("MyUInt32", ua.VariantType.UInt32)])
await new_enum(server, idx, "MyEnum", "titi", "toto", "tutu")
async with server:
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment