Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
75bc0049
Commit
75bc0049
authored
May 22, 2022
by
Alexander Schrode
Committed by
oroulet
Jun 05, 2022
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
rework struct1_04 resoultion
parent
59cf26d7
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
106 additions
and
23 deletions
+106
-23
asyncua/common/structures104.py
asyncua/common/structures104.py
+89
-23
asyncua/ua/uatypes.py
asyncua/ua/uatypes.py
+17
-0
No files found.
asyncua/common/structures104.py
View file @
75bc0049
...
...
@@ -154,7 +154,7 @@ def get_default_value(uatype, enums=None):
return f"ua.{uatype}()"
def make_structure_code(data_type, struct_name, sdef):
def make_structure_code(data_type, struct_name, sdef
, log_error=True
):
"""
given a StructureDefinition object, generate Python code
"""
...
...
@@ -193,9 +193,11 @@ class {struct_name}{base_class}:
uatype = ua.extension_objects_by_datatype[sfield.DataType].__name__
elif sfield.DataType in ua.enums_by_datatype:
uatype = ua.enums_by_datatype[sfield.DataType].__name__
elif sfield.DataType in ua.basetype_by_datatype:
uatype = ua.basetype_by_datatype[sfield.DataType]
else:
# FIXME: we are probably missing many custom tyes here based on builtin types
# maybe we can use ua_utils.get_base_data_type(
)
if log_error:
logger.error(f"Unknown datatype for field: {sfield} in structure:{struct_name}, please report"
)
raise RuntimeError(f"Unknown datatype for field: {sfield} in structure:{struct_name}, please report")
if sfield.ValueRank >= 0:
...
...
@@ -238,7 +240,7 @@ class {struct_name}{base_class}:
return code
async def _generate_object(name, sdef, data_type=None, env=None, enum=False, option_set=False):
async def _generate_object(name, sdef, data_type=None, env=None, enum=False, option_set=False
, log_fail=True
):
"""
generate Python code and execute in a new environment
return a dict of structures {name: class}
...
...
@@ -271,12 +273,13 @@ async def _generate_object(name, sdef, data_type=None, env=None, enum=False, opt
if enum:
code = make_enum_code(name, sdef, option_set)
else:
code = make_structure_code(data_type, name, sdef)
code = make_structure_code(data_type, name, sdef
, log_error=log_fail
)
logger.debug("Executing code: %s", code)
try:
exec(code, env)
except Exception:
logger.exception("Failed to execute auto-generated code from UA datatype: %s", code)
if log_fail:
logger.exception("Failed to execute auto-generated code from UA datatype: %s", code)
raise
return env
...
...
@@ -302,16 +305,19 @@ class DataTypeSorter:
async def _recursive_parse(server, base_node, dtypes, parent_sdef=None, add_existing=False):
for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
ch = await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype)
for desc in ch:
sdef = await _read_data_type_definition(server, desc, read_existing=add_existing)
if not sdef:
continue
name = clean_name(desc.BrowseName.Name)
if parent_sdef:
for sfield in reversed(parent_sdef.Fields):
sdef.Fields.insert(0, sfield)
dtypes.append(DataTypeSorter(desc.NodeId, name, desc, sdef))
await _recursive_parse(server, server.get_node(desc.NodeId), dtypes, parent_sdef=sdef, add_existing=add_existing)
if clean_name(desc.BrowseName.Name) == '
Union
':
# Union don'
t
contain
a
type
defintion
but
there
could
be
subtypes
await
_recursive_parse
(
server
,
server
.
get_node
(
desc
.
NodeId
),
dtypes
,
parent_sdef
,
add_existing
=
add_existing
)
elif
sdef
is
not
None
:
name
=
clean_name
(
desc
.
BrowseName
.
Name
)
if
parent_sdef
:
for
sfield
in
reversed
(
parent_sdef
.
Fields
):
sdef
.
Fields
.
insert
(
0
,
sfield
)
dtypes
.
append
(
DataTypeSorter
(
desc
.
NodeId
,
name
,
desc
,
sdef
))
await
_recursive_parse
(
server
,
server
.
get_node
(
desc
.
NodeId
),
dtypes
,
parent_sdef
=
sdef
,
add_existing
=
add_existing
)
async
def
_get_parent_types
(
node
:
Node
):
...
...
@@ -339,25 +345,85 @@ async def load_custom_struct(node: Node) -> Any:
return
env
[
name
]
async
def
_recursive_parse_basedatatypes
(
server
,
base_node
,
parent_datatype
,
new_alias
)
->
Any
:
for
desc
in
await
base_node
.
get_children_descriptions
(
refs
=
ua
.
ObjectIds
.
HasSubtype
):
name
=
clean_name
(
desc
.
BrowseName
.
Name
)
if
parent_datatype
not
in
'Number'
:
# Don't insert Number alias, they should be allready insert because they have to be basetypes allready
if
not
hasattr
(
ua
,
name
):
env
=
make_basetype_code
(
name
,
parent_datatype
)
ua
.
register_basetype
(
name
,
desc
.
NodeId
,
parent_datatype
)
new_alias
[
name
]
=
env
[
name
]
await
_recursive_parse_basedatatypes
(
server
,
server
.
get_node
(
desc
.
NodeId
),
name
,
new_alias
)
def
make_basetype_code
(
name
,
parent_datatype
):
"""
alias basetypes
"""
code
=
f"""
{
name
}
= ua.
{
parent_datatype
}
"""
env
=
{}
env
[
'ua'
]
=
ua
logger
.
debug
(
"Executing code: %s"
,
code
)
try
:
exec
(
code
,
env
)
except
Exception
:
logger
.
exception
(
"Failed to execute auto-generated code from UA datatype: %s"
,
code
)
raise
return
env
async
def
_load_base_datatypes
(
server
:
Union
[
"Server"
,
"Client"
])
->
Any
:
new_alias
=
{}
descriptions
=
await
server
.
nodes
.
base_data_type
.
get_children_descriptions
()
for
desc
in
descriptions
:
name
=
clean_name
(
desc
.
BrowseName
.
Name
)
if
name
not
in
[
'Structure'
,
'Enumeration'
]:
await
_recursive_parse_basedatatypes
(
server
,
server
.
get_node
(
desc
.
NodeId
),
name
,
new_alias
)
return
new_alias
async
def
load_data_type_definitions
(
server
:
Union
[
"Server"
,
"Client"
],
base_node
:
Node
=
None
,
overwrite_existing
=
False
)
->
Dict
:
"""
Read DataTypeDefition attribute on all Structure and Enumeration defined
on server and generate Python objects in ua namespace to be used to talk with server
"""
new_objects = await load_enums(server) # we need all enums to generate structure code
new_objects
=
await
_load_base_datatypes
(
server
)
# we need to load all basedatatypes alias first
new_objects
.
update
(
await
load_enums
(
server
))
# we need all enums to generate structure code
new_objects
.
update
(
await
load_enums
(
server
,
server
.
nodes
.
option_set_type
,
True
))
# also load all optionsets
if
base_node
is
None
:
base_node
=
server
.
nodes
.
base_structure_type
dtypes
=
[]
await
_recursive_parse
(
server
,
base_node
,
dtypes
,
add_existing
=
overwrite_existing
)
dtypes
.
sort
()
for dts in dtypes:
try:
env = await _generate_object(dts.name, dts.sdef, data_type=dts.data_type)
ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.data_type)
new_objects[dts.name] = env[dts.name] # type: ignore
except NotImplementedError:
logger.exception("Structure type %s not implemented", dts.sdef)
retries
=
10
for
cnt
in
range
(
retries
):
# Retry to resolve datatypes
failed_types
=
[]
log_ex
=
retries
==
cnt
+
1
for
dts
in
dtypes
:
try
:
env
=
await
_generate_object
(
dts
.
name
,
dts
.
sdef
,
data_type
=
dts
.
data_type
,
log_fail
=
log_ex
)
ua
.
register_extension_object
(
dts
.
name
,
dts
.
encoding_id
,
env
[
dts
.
name
],
dts
.
data_type
)
new_objects
[
dts
.
name
]
=
env
[
dts
.
name
]
# type: ignore
except
NotImplementedError
:
logger
.
exception
(
"Structure type %s not implemented"
,
dts
.
sdef
)
except
AttributeError
:
# Failed to resolve datatypes
failed_types
.
append
(
dts
)
if
log_ex
:
raise
except
RuntimeError
:
# Failed to resolve datatypes
failed_types
.
append
(
dts
)
if
log_ex
:
raise
if
not
failed_types
:
break
dtypes
=
failed_types
return
new_objects
...
...
asyncua/ua/uatypes.py
View file @
75bc0049
...
...
@@ -1062,6 +1062,23 @@ def get_default_value(vtype):
raise RuntimeError(f"
function
take
a
uatype
as
argument
,
got
:
{
vtype
}
")
basetype_by_datatype = {}
basetype_datatypes = {}
# register of alias of basetypes
def register_basetype(name, nodeid, class_type):
"""
Register a new allias of basetypes for automatic decoding and make them available in ua module
"""
logger.info("
registring
new
basetype
alias
:
%
s
%
s
%
s
", name, nodeid, class_type)
basetype_by_datatype[nodeid] = class_type
basetype_datatypes[class_type] = nodeid
import asyncua.ua
setattr(asyncua.ua, name, class_type)
# register of custom enums (Those loaded with load_enums())
enums_by_datatype = {}
enums_datatypes = {}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment