Commit d5c5398f authored by ORD's avatar ORD Committed by GitHub

fix and tests for namespaces xml exporter (#326)

* theoretical fix and tests for namespaces xml exporter

* Revert "hack around broken namespace import from xml"

This reverts commit 1a5475f1.

* better testing av exporting and reimporting namespaces
parent 0cbbbe76
......@@ -5,6 +5,7 @@ format is the one from opc-ua specification
import logging
from collections import OrderedDict
import xml.etree.ElementTree as Et
from copy import copy
from opcua import ua
from opcua.ua import object_ids as o_ids
......@@ -28,6 +29,7 @@ class XmlExporter(object):
def build_etree(self, node_list, uris=None):
"""
Create an XML etree object from a list of nodes; custom namespace uris are optional
Namespaces used by nodes are allways exported for consistency.
Args:
node_list: list of Node objects for export
uris: list of namespace uri strings
......@@ -36,6 +38,8 @@ class XmlExporter(object):
"""
self.logger.info('Building XML etree')
self._add_namespaces(node_list, uris)
# add all nodes in the list to the XML etree
for node in node_list:
self.node_to_etree(node)
......@@ -43,9 +47,37 @@ class XmlExporter(object):
# add aliases to the XML etree
self._add_alias_els()
def _add_namespaces(self, nodes, uris):
# first get a list of all indexes used by nodes
idxs = []
for node in nodes:
if node.nodeid.NamespaceIndex != 0 and node.nodeid.NamespaceIndex not in idxs:
idxs.append(node.nodeid.NamespaceIndex)
ns_array = self.server.get_namespace_array()
print("IDXS 1", idxs)
# now add index of provided uris if necessary
if uris:
# add namespace uris to the XML etree
self._add_namespace_uri_els(uris)
for uri in uris:
if uri in ns_array:
i = ns_array.index(uri)
if i not in idxs:
idxs.append(i)
print("IDXS 2", idxs)
# now create a dict of idx_in_address_space to idx_in_exported_file
idxs.sort()
self._addr_idx_to_xml_idx = {0: 0}
ns_to_export = []
for xml_idx, addr_idx in enumerate(idxs):
if addr_idx >= len(ns_array):
break
self._addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
ns_to_export.append(ns_array[addr_idx])
print(" ADDR_TO_ XML", self._addr_idx_to_xml_idx)
# write namespaces to xml
self._add_namespace_uri_els(ns_to_export)
def write_xml(self, xmlpath, pretty=True):
"""
......@@ -112,16 +144,31 @@ class XmlExporter(object):
child_el.text = text
return child_el
def _node_to_string(self, nodeid):
if not isinstance(nodeid, ua.NodeId):
nodeid = nodeid.nodeid
if nodeid.NamespaceIndex in self._addr_idx_to_xml_idx:
nodeid = copy(nodeid)
nodeid.NamespaceIndex = self._addr_idx_to_xml_idx[nodeid.NamespaceIndex]
return nodeid.to_string()
def _bname_to_string(self, bname):
if bname.NamespaceIndex in self._addr_idx_to_xml_idx:
bname = copy(bname)
bname.NamespaceIndex = self._addr_idx_to_xml_idx[bname.NamespaceIndex]
return bname.to_string()
def _add_node_common(self, nodetype, node):
browsename = node.get_browse_name().to_string()
nodeid = node.nodeid.to_string()
browsename = node.get_browse_name()
nodeid = node.nodeid
parent = node.get_parent()
displayname = node.get_display_name().Text.decode('utf-8')
desc = node.get_description().Text
print("NODE COMMON", node)
node_el = Et.SubElement(self.etree.getroot(), nodetype)
node_el.attrib["NodeId"] = nodeid
node_el.attrib["BrowseName"] = browsename
node_el.attrib["NodeId"] = self._node_to_string(nodeid)
node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
if parent is not None:
node_class = node.get_node_class()
if node_class in (ua.NodeClass.Object, ua.NodeClass.Variable, ua.NodeClass.Method):
......
......@@ -92,12 +92,11 @@ class XMLParser(object):
for el in child:
self.aliases[el.attrib["Alias"]] = self._get_node_id(el.text)
elif name == 'NamespaceUris':
s_uris = self.server.get_namespace_array()
for ns_index, ns_element in enumerate(child):
ns_uri = ns_element.text
if ns_uri not in s_uris:
ns_server_index = self.server.register_namespace(ns_uri)
self.namespaces[ns_index + 1] = (ns_server_index, ns_uri)
ns_server_index = self.server.register_namespace(ns_uri)
self.namespaces[ns_index + 1] = (ns_server_index, ns_uri)
print("namespace offset", ns_index + 1, (ns_server_index, ns_uri))
else:
node = self._parse_node(name, child)
nodes.append(node)
......
......@@ -408,8 +408,7 @@ class Server(object):
Export defined nodes to xml
"""
exp = XmlExporter(self)
uris = self.get_namespace_array()[2:]
exp.build_etree(nodes, uris=uris)
exp.build_etree(nodes)
return exp.write_xml(path)
def delete_nodes(self, nodes, recursive=False):
......
from opcua import ua
from opcua import uamethod
from opcua.common import uaerrors
@uamethod
......@@ -94,3 +95,35 @@ class XmlTests(object):
self.assertEqual(arg.DataType, arg2.DataType)
def test_xml_ns(self):
ns_array = self.opc.get_namespace_array()
if len(ns_array) <= 2:
self.opc.register_namespace("dummy_ns")
new_ns = self.opc.register_namespace("my_new_namespace")
o = self.opc.nodes.objects.add_object(0, "xmlns0")
o2 = self.opc.nodes.objects.add_object(2, "xmlns2")
o20 = self.opc.nodes.objects.add_object(20, "xmlns20")
o200 = self.opc.nodes.objects.add_object(200, "xmlns200")
onew = self.opc.nodes.objects.add_object(new_ns, "xmlns_new")
nodes = [o, o2, o20, o200, onew]
self.opc.export_xml(nodes, "export-ns.xml")
# delete node and change index og new_ns before re-importing
self.opc.delete_nodes(nodes)
ns_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
nss = ns_node.get_value()
nss.remove("my_new_namespace")
ns_node.set_value(nss)
new_ns = self.opc.register_namespace("my_new_namespace_offsett")
new_ns = self.opc.register_namespace("my_new_namespace")
self.opc.import_xml("export-ns.xml")
for i in nodes[:-1]:
i.get_browse_name()
with self.assertRaises(uaerrors.BadNodeIdUnknown):
onew.get_browse_name()
onew.nodeid.NamespaceIndex += 1
onew.get_browse_name()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment