Commit b9b1be18 authored by maljac's avatar maljac Committed by ORD

Use namespaces in XML import (#253)

* Use server namespaces uri to identify namepsace index for node imports (2)

* Fixes test failures / Add unittest

* Add unittest (2)
parent 77907973
......@@ -16,9 +16,9 @@ class XmlImporter(object):
self.parser = None
self.server = server
def import_xml(self, xmlpath):
def import_xml(self, xmlpath, act_server):
self.logger.info("Importing XML file %s", xmlpath)
self.parser = xmlparser.XMLParser(xmlpath)
self.parser = xmlparser.XMLParser(xmlpath, act_server)
for node in self.parser:
if node.nodetype == 'UAObject':
self.add_object(node)
......
......@@ -53,7 +53,8 @@ class RefStruct(object):
class XMLParser(object):
def __init__(self, xmlpath):
def __init__(self, xmlpath, server):
self.server = server # POC
self.logger = logging.getLogger(__name__)
self._retag = re.compile(r"(\{.*\})(.*)")
self.path = xmlpath
......@@ -63,6 +64,9 @@ class XMLParser(object):
self.root = self.tree.getroot()
self.it = None
self.namespaces = {}
self._re_nodeid = re.compile(r"^ns=(?P<ns>\d+[^;]*);i=(?P<i>\d+)")
def __iter__(self):
self.it = iter(self.root)
return self
......@@ -77,6 +81,11 @@ class XMLParser(object):
if name == "Aliases":
for el in child:
self.aliases[el.attrib["Alias"]] = el.text
elif name == 'NamespaceUris':
for ns_index, ns_element in enumerate(child):
ns_uri = ns_element.text
ns_server_index = self.server.register_namespace(ns_uri)
self.namespaces[ns_index + 1] = (ns_server_index, ns_uri)
else:
node = self._parse_node(name, child)
return node
......@@ -84,6 +93,22 @@ class XMLParser(object):
def next(self): # support for python2
return self.__next__()
def _get_node_id(self, value):
"""
Check if the nodeid given in the xml model file must be converted
to a already existing namespace id based on the files namespace uri
:returns: NodeId (str)
"""
result = value
r_match = self._re_nodeid.search(value)
if r_match:
node_ns, node_id = r_match.groups()
ns_server = self.namespaces.get(int(node_ns), None)
if ns_server:
result = "ns={};i={}".format(ns_server[0], node_id)
return result
def _parse_node(self, name, child):
obj = NodeData()
obj.nodetype = name
......@@ -96,19 +121,18 @@ class XMLParser(object):
def _set_attr(self, key, val, obj):
if key == "NodeId":
obj.nodeid = val
obj.nodeid = self._get_node_id(val)
elif key == "BrowseName":
obj.browsename = val
elif key == "SymbolicName":
obj.symname = val
elif key == "ParentNodeId":
obj.parent = val
obj.parent = self._get_node_id(val)
elif key == "DataType":
obj.datatype = val
elif key == "IsAbstract":
obj.abstract = val
elif key == "EventNotifier":
print("Notfiier", key, val)
obj.eventnotifier = 1 if val == "1" else 0
elif key == "ValueRank":
obj.rank = int(val)
......@@ -173,7 +197,7 @@ class XMLParser(object):
def _get_text(self, el):
txt = ""
for text in el.itertext():
for text in el.itertext():
txt += text
return txt
......@@ -220,16 +244,16 @@ class XMLParser(object):
def _parse_refs(self, el, obj):
for ref in el:
if ref.attrib["ReferenceType"] == "HasTypeDefinition":
obj.typedef = ref.text
obj.typedef = self._get_node_id(ref.text)
elif "IsForward" in ref.attrib and ref.attrib["IsForward"] == "false":
# if obj.parent:
# sys.stderr.write("Parent is already set with: "+ obj.parent + " " + ref.text + "\n")
obj.parent = ref.text
obj.parent = self._get_node_id(ref.text)
obj.parentlink = ref.attrib["ReferenceType"]
else:
struct = RefStruct()
if "IsForward" in ref.attrib:
struct.forward = ref.attrib["IsForward"]
struct.target = ref.text
struct.target = self._get_node_id(ref.text)
struct.reftype = ref.attrib["ReferenceType"]
obj.refs.append(struct)
......@@ -334,6 +334,8 @@ class Server(object):
"""
ns_node = self.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
uries = ns_node.get_value()
if uri in uries:
return uries.index(uri)
uries.append(uri)
ns_node.set_value(uries)
return len(uries) - 1
......@@ -398,7 +400,7 @@ class Server(object):
import nodes defined in xml
"""
importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
importer.import_xml(path)
importer.import_xml(path, self)
def delete_nodes(self, nodes, recursive=False):
return delete_nodes(self.iserver.isession, nodes, recursive)
......
......@@ -111,6 +111,14 @@ class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
idx2 = self.opc.get_namespace_index(uri)
self.assertEqual(idx1, idx2)
def test_register_existing_namespace(self):
uri = 'http://mycustom.Namespace.com'
idx1 = self.opc.register_namespace(uri)
idx2 = self.opc.register_namespace(uri)
idx3 = self.opc.get_namespace_index(uri)
self.assertEqual(idx1, idx2)
self.assertEqual(idx1, idx3)
def test_register_use_namespace(self):
uri = 'http://my_very_custom.Namespace.com'
idx = self.opc.register_namespace(uri)
......
......@@ -12,7 +12,7 @@ from opcua.ua.uatypes import flatten, get_shape, reshape
from opcua.server.internal_subscription import WhereClauseEvaluator
from opcua.common.event_objects import BaseEvent
from opcua.common.ua_utils import string_to_variant, variant_to_string, string_to_val, val_to_string
from opcua.common.xmlparser import XMLParser
class TestUnit(unittest.TestCase):
......@@ -438,6 +438,26 @@ class TestUnit(unittest.TestCase):
self.assertTrue(wce.eval(ev))
def test_xmlparser_get_node_id(self):
server = None
xmlpath = 'tests/custom_nodes.xml'
parser = XMLParser(xmlpath, server)
res1 = parser._get_node_id('i=1001')
self.assertEqual(res1, 'i=1001')
res2 = parser._get_node_id('ns=1;i=1001')
self.assertEqual(res2, 'ns=1;i=1001')
parser.namespaces = {1: [3, 'http://someuri.com']}
res3 = parser._get_node_id('ns=1;i=1001')
self.assertEqual(res3, 'ns=3;i=1001')
parser.namespaces = {1: [3, 'http://someuri.com']}
res4 = parser._get_node_id('ns=2;i=1001')
self.assertEqual(res4, 'ns=2;i=1001')
if __name__ == '__main__':
logging.basicConfig(level=logging.WARN)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment