Commit d6fa8481 authored by olivier R-D's avatar olivier R-D

cleanup of xml import and test code

parent eb2e16e6
......@@ -3,8 +3,6 @@ add node defined in XML to address space
format is the one from opc-ua specification
"""
import logging
import sys
import re
import uuid
import dateutil.parser
from copy import copy
......@@ -253,7 +251,7 @@ class XmlImporter(object):
return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
elif obj.valuetype == 'LocalizedText':
ltext = ua.LocalizedText()
for name, val in obj.value[0][1]:
for name, val in obj.value:
if name == "Text":
ltext.Text = val.encode("utf-8")
else:
......
......@@ -98,7 +98,7 @@ class XMLParser(object):
self.tree = ET.parse(xmlpath)
self.root = self.tree.getroot()
# FIXME: get these namespaces from XML!!
# FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
self.ns = {
'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
......@@ -112,8 +112,8 @@ class XMLParser(object):
"""
namespaces_uris = []
for child in self.root:
name = self._retag.match(child.tag).groups()[1]
if name == 'NamespaceUris':
tag = self._retag.match(child.tag).groups()[1]
if tag == 'NamespaceUris':
namespaces_uris = [ns_element.text for ns_element in child]
break
return namespaces_uris
......@@ -124,8 +124,8 @@ class XMLParser(object):
"""
aliases = {}
for child in self.root:
name = self._retag.match(child.tag).groups()[1]
if name == 'Aliases':
tag = self._retag.match(child.tag).groups()[1]
if tag == 'Aliases':
for el in child:
aliases[el.attrib["Alias"]] = el.text
break
......@@ -134,25 +134,24 @@ class XMLParser(object):
def get_node_datas(self):
nodes = []
for child in self.root:
name = self._retag.match(child.tag).groups()[1]
if name not in ["Aliases", "NamespaceUris"]:
node = self._parse_node(name, child)
tag = self._retag.match(child.tag).groups()[1]
if tag not in ["Aliases", "NamespaceUris"]:
node = self._parse_node(tag, child)
nodes.append(node)
return nodes
def _parse_node(self, name, child):
def _parse_node(self, nodetype, child):
"""
Parse a XML node and create a NodeData object.
"""
obj = NodeData()
obj.nodetype = name
obj.nodetype = nodetype
for key, val in child.attrib.items():
self._set_attr(key, val, obj)
self.logger.info("\n Parsing node: %s %s", obj.nodeid, obj.browsename)
self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
obj.displayname = obj.browsename # give a default value to display name
for el in child:
self._parse_tag(el, obj)
self._parse_attr(el, obj)
return obj
def _set_attr(self, key, val, obj):
......@@ -187,7 +186,7 @@ class XMLParser(object):
else:
self.logger.info("Attribute not implemented: %s:%s", key, val)
def _parse_tag(self, el, obj):
def _parse_attr(self, el, obj):
tag = self._retag.match(el.tag).groups()[1]
if tag == "DisplayName":
......@@ -206,46 +205,44 @@ class XMLParser(object):
else:
self.logger.info("Not implemented tag: %s", el)
def _parse_value(self, el, obj):
for val in el:
ntag = self._retag.match(val.tag).groups()[1]
obj.valuetype = ntag
if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
obj.value = int(val.text)
elif ntag in ("Float", "Double"):
obj.value = float(val.text)
elif ntag in ("Boolean"):
obj.value = _to_bool(val.text)
elif ntag in ("ByteString", "String"):
mytext = val.text
if mytext is None: # support importing null strings
mytext = ""
mytext = mytext.replace('\n', '').replace('\r', '')
# obj.value.append('b"{}"'.format(mytext))
obj.value = mytext
elif ntag in ("DateTime"):
obj.value = val.text
elif ntag in ("Guid"):
self._parse_value(val, obj)
obj.valuetype = obj.datatype # override parsed string type to guid
elif ntag == "LocalizedText":
from IPython import embed
embed()
obj.value = self._parse_body(el)
elif ntag == "NodeId":
id_el_list = el.findall('uax:NodeId/uax:Identifier', self.ns)
if id_el_list:
obj.value = id_el_list[0].text
elif ntag == "ListOfExtensionObject":
obj.value = self._parse_list_of_extension_object(el)
elif ntag == "ListOfLocalizedText":
obj.value = self._parse_list_of_localized_text(el)
elif ntag.startswith("ListOf"):
obj.value = self._parse_list(el[0])
elif ntag == "ExtensionObject":
obj.value = self._parse_extension_object(el)
else:
self.logger.warning("Value type not implemented: '%s'", ntag)
def _parse_value(self, val_el, obj):
child_el = val_el.find(".//") # should be only one child
ntag = self._retag.match(child_el.tag).groups()[1]
obj.valuetype = ntag
if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
obj.value = int(child_el.text)
elif ntag in ("Float", "Double"):
obj.value = float(child_el.text)
elif ntag in ("Boolean"):
obj.value = _to_bool(child_el.text)
elif ntag in ("ByteString", "String"):
mytext = child_el.text
if mytext is None: # support importing null strings
mytext = ""
mytext = mytext.replace('\n', '').replace('\r', '')
obj.value = mytext
elif ntag in ("DateTime"):
obj.value = child_el.text
elif ntag in ("Guid"):
self._parse_value(child_el, obj)
obj.valuetype = obj.datatype # override parsed string type to guid
elif ntag == "LocalizedText":
obj.value = self._parse_body(child_el)
elif ntag == "NodeId":
id_el = child_el.find("uax:Identifier", self.ns)
if id_el is not None:
obj.value = id_el.text
elif ntag == "ListOfExtensionObject":
obj.value = self._parse_list_of_extension_object(child_el)
elif ntag == "ListOfLocalizedText":
obj.value = self._parse_list_of_localized_text(child_el)
elif ntag.startswith("ListOf"):
obj.value = self._parse_list(child_el)
elif ntag == "ExtensionObject":
obj.value = self._parse_ext_obj(child_el)
else:
self.logger.warning("Parsing value of type '%s' not implemented", ntag)
def _get_text(self, el):
txtlist = [txt.strip() for txt in el.itertext()]
......@@ -264,13 +261,12 @@ class XMLParser(object):
def _parse_list_of_localized_text(self, el):
value = []
for localized_text_list in el:
for localized_text in localized_text_list:
ntag = self._retag.match(localized_text.tag).groups()[1]
for child in localized_text:
ntag = self._retag.match(child.tag).groups()[1]
if ntag == 'Text':
value.append(self._get_text(child))
for localized_text in el:
ntag = self._retag.match(localized_text.tag).groups()[1]
for child in localized_text:
ntag = self._retag.match(child.tag).groups()[1]
if ntag == 'Text':
value.append(self._get_text(child))
return value
def _parse_list_of_extension_object(self, el):
......@@ -279,16 +275,11 @@ class XMLParser(object):
Return an list of ExtObj
"""
value = []
for extension_object_list in el:
for extension_object in extension_object_list:
ext_obj = self._parse_ext_obj(extension_object)
value.append(ext_obj)
for extension_object in el:
ext_obj = self._parse_ext_obj(extension_object)
value.append(ext_obj)
return value
def _parse_extension_object(self, el):
for ext_obj in el:
return self._parse_ext_obj(ext_obj)
def _parse_ext_obj(self, el):
ext = ExtObj()
for extension_object_part in el:
......
......@@ -99,7 +99,7 @@ class XmlTests(object):
def test_xml_vars(self):
self.opc.register_namespace("tititi")
self.opc.register_namespace("whatthefuck")
self.opc.register_namespace("whatthexxx")
o = self.opc.nodes.objects.add_object(2, "xmlexportobj")
v = o.add_variable(3, "myxmlvar", 6.78, ua.VariantType.Float)
a = o.add_variable(3, "myxmlvar-array", [6, 1], ua.VariantType.UInt16)
......@@ -127,29 +127,10 @@ class XmlTests(object):
self.assertIn(a3.get_value_rank(), (0, 2))
self.assertEqual(a3.get_attribute(ua.AttributeIds.ArrayDimensions).Value.Value, [1, 0])
def test_xml_ext_obj(self):
arg = ua.Argument()
arg.DataType = ua.NodeId(ua.ObjectIds.Float)
arg.Description = ua.LocalizedText(b"This is a nice description")
arg.ArrayDimensions = [1, 2, 3]
arg.Name = "MyArg"
v = self.opc.nodes.objects.add_variable(2, "xmlexportobj2", arg)
nodes = [v]
self.opc.export_xml(nodes, "export.xml")
self.opc.delete_nodes(nodes)
self.opc.import_xml("export.xml")
arg2 = v.get_value()
self.assertEqual(arg.Name, arg2.Name)
self.assertEqual(arg.ArrayDimensions, arg2.ArrayDimensions)
self.assertEqual(arg.Description, arg2.Description)
self.assertEqual(arg.DataType, arg2.DataType)
def test_xml_ns(self):
"""
This test is far too complicated but catches a lot of things...
"""
ns_array = self.opc.get_namespace_array()
if len(ns_array) < 3:
self.opc.register_namespace("dummy_ns")
......@@ -249,7 +230,23 @@ class XmlTests(object):
o = self.opc.nodes.objects.add_variable(2, "xmlnodeid", ua.NodeId("mytext", 1))
self._test_xml_var_type(o, "nodeid")
def _test_xml_var_type(self, node, typename):
def test_xml_ext_obj(self):
arg = ua.Argument()
arg.DataType = ua.NodeId(ua.ObjectIds.Float)
arg.Description = ua.LocalizedText(b"Nice description")
arg.ArrayDimensions = [1, 2, 3]
arg.Name = "MyArg"
node = self.opc.nodes.objects.add_variable(2, "xmlexportobj2", arg)
node2 = self._test_xml_var_type(node, "ext_obj", test_equality=False)
arg2 = node2.get_value()
self.assertEqual(arg.Name, arg2.Name)
self.assertEqual(arg.ArrayDimensions, arg2.ArrayDimensions)
self.assertEqual(arg.Description, arg2.Description)
self.assertEqual(arg.DataType, arg2.DataType)
def _test_xml_var_type(self, node, typename, test_equality=True):
dtype = node.get_data_type()
dv = node.get_data_value()
rank = node.get_value_rank()
......@@ -264,7 +261,8 @@ class XmlTests(object):
self.assertEqual(node, node2)
self.assertEqual(dtype, node2.get_data_type())
self.assertEqual(dv.Value, node2.get_data_value().Value)
if test_equality:
self.assertEqual(dv.Value, node2.get_data_value().Value)
self.assertEqual(rank, node2.get_value_rank())
self.assertEqual(dim, node2.get_array_dimensions())
self.assertEqual(nclass, node2.get_node_class())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment