Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
opcua-asyncio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Nikola Balog
opcua-asyncio
Commits
77ec2c1a
Commit
77ec2c1a
authored
Oct 10, 2016
by
olivier R-D
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add export import xml tests and fix et obj related bug
parent
8a1ae358
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
54 additions
and
15 deletions
+54
-15
opcua/common/xmlexporter.py
opcua/common/xmlexporter.py
+4
-1
opcua/common/xmlimporter.py
opcua/common/xmlimporter.py
+17
-6
opcua/common/xmlparser.py
opcua/common/xmlparser.py
+10
-6
tests/tests_xml.py
tests/tests_xml.py
+23
-2
No files found.
opcua/common/xmlexporter.py
View file @
77ec2c1a
...
...
@@ -282,7 +282,10 @@ def _val_to_etree(el, dtype, val):
id_el
=
Et
.
SubElement
(
el
,
"uax:Identifier"
)
id_el
.
text
=
val
.
to_string
()
elif
not
hasattr
(
val
,
"ua_types"
):
el
.
text
=
str
(
val
)
if
type
(
val
)
is
bytes
:
el
.
text
=
val
.
decode
(
"utf-8"
)
else
:
el
.
text
=
str
(
val
)
else
:
for
name
,
vtype
in
val
.
ua_types
.
items
():
member_to_etree
(
el
,
name
,
ua
.
NodeId
(
getattr
(
ua
.
ObjectIds
,
vtype
)),
getattr
(
val
,
name
))
...
...
opcua/common/xmlimporter.py
View file @
77ec2c1a
...
...
@@ -26,6 +26,7 @@ def ua_type_to_python(val, uatype):
def
to_python
(
val
,
obj
,
attname
):
if
isinstance
(
obj
,
ua
.
NodeId
)
and
attname
==
"Identifier"
:
raise
RuntimeError
(
"Error we should parse a NodeId here"
)
return
ua
.
NodeId
.
from_string
(
val
)
else
:
return
ua_type_to_python
(
val
,
obj
.
ua_types
[
attname
])
...
...
@@ -144,11 +145,11 @@ class XmlImporter(object):
def
_make_ext_obj
(
sefl
,
obj
):
ext
=
getattr
(
ua
,
obj
.
objname
)()
for
name
,
val
in
obj
.
body
.
items
()
:
for
name
,
val
in
obj
.
body
:
if
type
(
val
)
is
str
:
raise
Exception
(
"Error val should a dict"
,
name
,
val
)
else
:
for
attname
,
v
in
val
.
items
()
:
for
attname
,
v
in
val
:
# tow possible values:
# either we get value directly
# or a dict if it s an object or a list
...
...
@@ -157,15 +158,22 @@ class XmlImporter(object):
else
:
# so we habve either an object or a list...
obj2
=
getattr
(
ext
,
attname
)
if
not
isinstance
(
obj2
,
ua
.
NodeId
)
and
not
hasattr
(
obj2
,
"ua_types"
):
if
isinstance
(
obj2
,
ua
.
NodeId
):
for
attname2
,
v2
in
v
:
if
attname2
==
"Identifier"
:
obj2
=
ua
.
NodeId
.
from_string
(
v2
)
setattr
(
ext
,
attname
,
obj2
)
break
elif
not
isinstance
(
obj2
,
ua
.
NodeId
)
and
not
hasattr
(
obj2
,
"ua_types"
):
# we probably have a list
my_list
=
[]
for
vtype
,
v2
in
v
.
items
()
:
for
vtype
,
v2
in
v
:
my_list
.
append
(
ua_type_to_python
(
v2
,
vtype
))
setattr
(
obj
,
attname
,
my_list
)
setattr
(
ext
,
attname
,
my_list
)
else
:
for
attname2
,
v2
in
v
.
items
()
:
for
attname2
,
v2
in
v
:
setattr
(
obj2
,
attname2
,
to_python
(
v2
,
obj2
,
attname2
))
setattr
(
ext
,
attname
,
obj2
)
return
ext
def
_add_variable_value
(
self
,
obj
):
...
...
@@ -181,6 +189,9 @@ class XmlImporter(object):
elif
obj
.
valuetype
.
startswith
(
"ListOf"
):
vtype
=
obj
.
valuetype
[
6
:]
return
[
getattr
(
ua
,
vtype
)(
v
)
for
v
in
obj
.
value
]
elif
obj
.
valuetype
==
'ExtensionObject'
:
extobj
=
self
.
_make_ext_obj
(
obj
.
value
)
return
ua
.
Variant
(
extobj
,
getattr
(
ua
.
VariantType
,
obj
.
valuetype
))
else
:
return
ua
.
Variant
(
obj
.
value
,
getattr
(
ua
.
VariantType
,
obj
.
valuetype
))
...
...
opcua/common/xmlparser.py
View file @
77ec2c1a
...
...
@@ -294,8 +294,10 @@ class XMLParser(object):
obj
.
value
=
self
.
_parse_list_of_extension_object
(
el
)
elif
ntag
==
"ListOfLocalizedText"
:
obj
.
value
=
self
.
_parse_list_of_localized_text
(
el
)
elif
ntag
==
"ExtensionObject"
:
obj
.
value
=
self
.
_parse_extension_object
(
el
)
else
:
self
.
logger
.
info
(
"Value type not implemented: '%s', %s
"
,
ntag
)
self
.
logger
.
warning
(
"Value type not implemented: '%s'
"
,
ntag
)
def
_get_text
(
self
,
el
):
txtlist
=
[
txt
.
strip
()
for
txt
in
el
.
itertext
()]
...
...
@@ -324,6 +326,10 @@ class XMLParser(object):
value
.
append
(
ext_obj
)
return
value
def
_parse_extension_object
(
self
,
el
):
for
ext_obj
in
el
:
return
self
.
_parse_ext_obj
(
ext_obj
)
def
_parse_ext_obj
(
self
,
el
):
ext
=
ExtObj
()
for
extension_object_part
in
el
:
...
...
@@ -334,14 +340,12 @@ class XMLParser(object):
elif
ntag
==
'Body'
:
ext
.
objname
=
self
.
_retag
.
match
(
extension_object_part
.
find
(
'*'
).
tag
).
groups
()[
1
]
ext
.
body
=
self
.
_parse_body
(
extension_object_part
)
print
(
"body"
,
ext
.
body
)
else
:
print
(
"Uknoen ndtag"
,
ntag
)
print
(
"ext_obj"
,
ext
.
objname
,
ext
.
typeid
,
ext
.
body
)
print
(
"Uknown ndtag"
,
ntag
)
return
ext
def
_parse_body
(
self
,
el
):
body
=
{}
body
=
[]
for
body_item
in
el
:
otag
=
self
.
_retag
.
match
(
body_item
.
tag
).
groups
()[
1
]
childs
=
[
i
for
i
in
body_item
]
...
...
@@ -350,7 +354,7 @@ class XMLParser(object):
else
:
val
=
self
.
_parse_body
(
body_item
)
if
val
:
body
[
otag
]
=
val
body
.
append
((
otag
,
val
))
return
body
def
_parse_refs
(
self
,
el
,
obj
):
...
...
tests/tests_xml.py
View file @
77ec2c1a
...
...
@@ -59,7 +59,7 @@ class XmlTests(object):
# now see if our nodes are here
val
=
inputs
.
get_value
()
self
.
assertEqual
(
len
(
val
),
2
)
#self.assertEqual(val[0].ArrayDimensions, [2, 2]) # seems broken
self
.
assertEqual
(
val
[
0
].
ArrayDimensions
,
[
2
,
2
])
self
.
assertEqual
(
v
.
get_value
(),
6.78
)
self
.
assertEqual
(
v
.
get_data_type
(),
ua
.
NodeId
(
ua
.
ObjectIds
.
Float
))
...
...
@@ -70,6 +70,27 @@ class XmlTests(object):
self
.
assertEqual
(
a2
.
get_value
(),
[[]])
self
.
assertEqual
(
a2
.
get_data_type
(),
ua
.
NodeId
(
ua
.
ObjectIds
.
ByteString
))
self
.
assertIn
(
a2
.
get_value_rank
(),
(
0
,
2
))
self
.
assertEqual
(
a2
.
get_attribute
(
ua
.
AttributeIds
.
ArrayDimensions
).
Value
.
Value
,
[
1
,
0
])
# seems broken
self
.
assertEqual
(
a2
.
get_attribute
(
ua
.
AttributeIds
.
ArrayDimensions
).
Value
.
Value
,
[
1
,
0
])
def
test_export_import_ext_obj
(
self
):
arg
=
ua
.
Argument
()
arg
.
DataType
=
ua
.
NodeId
(
ua
.
ObjectIds
.
Float
)
arg
.
Description
=
ua
.
LocalizedText
(
b"This is a nice description"
)
arg
.
ArrayDimensions
=
[
1
,
2
,
3
]
arg
.
Name
=
"MyArg"
v
=
self
.
opc
.
nodes
.
objects
.
add_variable
(
2
,
"xmlexportobj2"
,
arg
)
nodes
=
[
v
]
self
.
opc
.
export_xml
(
nodes
,
"export.xml"
)
self
.
opc
.
delete_nodes
(
nodes
)
self
.
opc
.
import_xml
(
"export.xml"
)
arg2
=
v
.
get_value
()
self
.
assertEqual
(
arg
.
Name
,
arg2
.
Name
)
self
.
assertEqual
(
arg
.
ArrayDimensions
,
arg2
.
ArrayDimensions
)
self
.
assertEqual
(
arg
.
Description
,
arg2
.
Description
)
self
.
assertEqual
(
arg
.
DataType
,
arg2
.
DataType
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment