Commit 136b6fd0 authored by ORD's avatar ORD Committed by GitHub

also export namespace of referenced nodes (#327)

* also export namespace of referenced nodes

* xmlexport: also export ns used in browsename

* __str__ for NodeData

* heavy cleanup of xmlimporter

* more xml import fixes

* fix importing lists of primitives

* export and import array dimensions

* fix by-ref error

* remove debug print

* fix xml test

* do not store ns uri when not necessary

* sdfsdfsd
parent 777e812f
......@@ -17,6 +17,7 @@ class XmlExporter(object):
self.logger = logging.getLogger(__name__)
self.server = server
self.aliases = {}
self._addr_idx_to_xml_idx = {}
node_set_attributes = OrderedDict()
node_set_attributes['xmlns:xsi'] = 'http://www.w3.org/2001/XMLSchema-instance'
......@@ -48,36 +49,52 @@ class XmlExporter(object):
self._add_alias_els()
def _add_namespaces(self, nodes, uris):
# first get a list of all indexes used by nodes
idxs = []
for node in nodes:
if node.nodeid.NamespaceIndex != 0 and node.nodeid.NamespaceIndex not in idxs:
idxs.append(node.nodeid.NamespaceIndex)
idxs = self._get_ns_idxs_of_nodes(nodes)
ns_array = self.server.get_namespace_array()
print("IDXS 1", idxs)
# now add index of provided uris if necessary
if uris:
for uri in uris:
if uri in ns_array:
i = ns_array.index(uri)
if i not in idxs:
idxs.append(i)
print("IDXS 2", idxs)
self._add_idxs_from_uris(idxs, uris, ns_array)
# now create a dict of idx_in_address_space to idx_in_exported_file
self._addr_idx_to_xml_idx = self._make_idx_dict(idxs, ns_array)
ns_to_export = [ns_array[i] for i in sorted(list(self._addr_idx_to_xml_idx.keys())) if i != 0]
# write namespaces to xml
self._add_namespace_uri_els(ns_to_export)
def _make_idx_dict(self, idxs, ns_array):
idxs.sort()
self._addr_idx_to_xml_idx = {0: 0}
ns_to_export = []
addr_idx_to_xml_idx = {0: 0}
for xml_idx, addr_idx in enumerate(idxs):
if addr_idx >= len(ns_array):
break
self._addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
ns_to_export.append(ns_array[addr_idx])
print(" ADDR_TO_ XML", self._addr_idx_to_xml_idx)
addr_idx_to_xml_idx[addr_idx] = xml_idx + 1
return addr_idx_to_xml_idx
def _get_ns_idxs_of_nodes(self, nodes):
"""
get a list of all indexes used or references by nodes
"""
idxs = []
for node in nodes:
node_idxs = [node.nodeid.NamespaceIndex]
node_idxs.append(node.get_browse_name().NamespaceIndex)
node_idxs.extend(ref.NodeId.NamespaceIndex for ref in node.get_references())
node_idxs = list(set(node_idxs)) # remove duplicates
for i in node_idxs:
if i != 0 and i not in idxs:
idxs.append(i)
return idxs
def _add_idxs_from_uris(self, idxs, uris, ns_array):
for uri in uris:
if uri in ns_array:
i = ns_array.index(uri)
if i not in idxs:
idxs.append(i)
# write namespaces to xml
self._add_namespace_uri_els(ns_to_export)
def write_xml(self, xmlpath, pretty=True):
"""
......@@ -96,12 +113,12 @@ class XmlExporter(object):
self.etree.write(xmlpath,
encoding='utf-8',
xml_declaration=True
)
)
else:
self.etree.write(xmlpath,
encoding='utf-8',
xml_declaration=True
)
)
def dump_etree(self):
"""
......@@ -120,7 +137,6 @@ class XmlExporter(object):
Returns:
"""
node_class = node.get_node_class()
print("Exporting: ", node)
if node_class is ua.NodeClass.Object:
self.add_etree_object(node)
......@@ -165,7 +181,6 @@ class XmlExporter(object):
parent = node.get_parent()
displayname = node.get_display_name().Text.decode('utf-8')
desc = node.get_description().Text
print("NODE COMMON", node)
node_el = Et.SubElement(self.etree.getroot(), nodetype)
node_el.attrib["NodeId"] = self._node_to_string(nodeid)
node_el.attrib["BrowseName"] = self._bname_to_string(browsename)
......@@ -209,8 +224,9 @@ class XmlExporter(object):
rank = node.get_value_rank()
if rank != -1:
el.attrib["ValueRank"] = str(rank)
#var = node.get_attribute(ua.AttributeIds.ArrayDimensions())
#self._addobj_el.attrib["ArrayDimensions"] = str(var.Value.Value)
dim = node.get_attribute(ua.AttributeIds.ArrayDimensions)
if dim.Value.Value:
el.attrib["ArrayDimensions"] = ",".join([str(i) for i in dim.Value.Value])
el.attrib["DataType"] = dtype_name
value_to_etree(el, dtype_name, dtype, node)
......@@ -332,7 +348,7 @@ def _val_to_etree(el, dtype, val):
id_el = Et.SubElement(el, "uax:String")
id_el.text = str(val)
elif not hasattr(val, "ua_types"):
if type(val) is bytes:
if isinstance(val, bytes):
el.text = val.decode("utf-8")
else:
el.text = str(val)
......
......@@ -7,33 +7,18 @@ import sys
import re
import uuid
import dateutil.parser
from copy import copy
from opcua import ua
from opcua.common import xmlparser
def ua_type_to_python(val, uatype):
if uatype.startswith("Int") or uatype.startswith("UInt"):
return int(val)
elif uatype in ("Double", "Float"):
return float(val)
elif uatype in ("String"):
return val
elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
if sys.version_info.major > 2:
return bytes(val, 'utf8')
else:
return val
else:
raise Exception("uatype nopt handled", uatype, " for val ", val)
def to_python(val, obj, attname):
if isinstance(obj, ua.NodeId) and attname == "Identifier":
raise RuntimeError("Error we should parse a NodeId here")
return ua.NodeId.from_string(val)
else:
return ua_type_to_python(val, obj.ua_types[attname])
return xmlparser.ua_type_to_python(val, obj.ua_types[attname])
class XmlImporter(object):
......@@ -44,18 +29,16 @@ class XmlImporter(object):
self.server = server
self.namespaces = {}
self.aliases = {}
self._re_nodeid = re.compile(r"^ns=(?P<ns>\d+[^;]*);i=(?P<i>\d+)")
def _map_namespaces(self, namespaces_uris, act_server):
def _map_namespaces(self, namespaces_uris):
"""
creates a mapping between the namespaces in the xml file and in the server.
if not present the namespace is registered.
"""
namespaces = {}
for ns_index, ns_uri in enumerate(namespaces_uris):
ns_server_index = act_server.register_namespace(ns_uri)
namespaces[ns_index + 1] = (ns_server_index, ns_uri)
self.logger.info("namespace offset", ns_index + 1, (ns_server_index, ns_uri))
ns_server_index = self.server.register_namespace(ns_uri)
namespaces[ns_index + 1] = ns_server_index
return namespaces
def _map_aliases(self, aliases):
......@@ -64,22 +47,23 @@ class XmlImporter(object):
"""
aliases_mapped = {}
for alias, node_id in aliases.items():
aliases_mapped[alias] = self._get_node_id(node_id)
aliases_mapped[alias] = self._migrate_ns(self.to_nodeid(node_id))
return aliases_mapped
def import_xml(self, xmlpath, act_server):
def import_xml(self, xmlpath):
"""
import xml and return added nodes
"""
self.logger.info("Importing XML file %s", xmlpath)
self.parser = xmlparser.XMLParser(xmlpath, act_server)
self.parser = xmlparser.XMLParser(xmlpath)
dnodes = self.parser.get_node_datas()
dnodes = self.make_objects(dnodes)
self.namespaces = self._map_namespaces(self.parser.get_used_namespaces(), act_server)
self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
self.aliases = self._map_aliases(self.parser.get_aliases())
# The ordering of nodes currently only works if namespaces are defined in XML.
# Also, it is recommended not to use node ids without namespace prefix!
nodes_parsed = self._sort_nodes_by_parentid(self.parser)
nodes_parsed = self._sort_nodes_by_parentid(dnodes)
nodes = []
for nodedata in nodes_parsed: # self.parser:
......@@ -102,64 +86,51 @@ class XmlImporter(object):
continue
nodes.append(node)
return nodes
def _split_node_id(self, value):
"""
Split the fq node id into namespace and id part.
:returns: (namespace, id)
"""
if not value:
return (None, value)
r_match = self._re_nodeid.search(value)
if r_match:
return r_match.groups()
return (None, value)
def _parse_bname(self, bname):
"""
Parse a browsename and correct the namespace index.
"""
if bname.find(':') != -1:
browse_ns, browse_name = bname.split(':')
if browse_ns:
ns_server = self.namespaces.get(int(browse_ns), None)
if ns_server:
return '%d:%s' % (ns_server[0], browse_name)
return bname
def _get_node_id(self, value):
def make_objects(self, node_datas):
new_nodes = []
for ndata in node_datas:
ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
if ndata.parent:
ndata.parent = ua.NodeId.from_string(ndata.parent)
if ndata.parentlink:
ndata.parentlink = self.to_nodeid(ndata.parentlink)
if ndata.typedef:
ndata.typedef = self.to_nodeid(ndata.typedef)
new_nodes.append(ndata)
return new_nodes
def _migrate_ns(self, nodeid):
"""
Check if the nodeid given in the xml model file must be converted
to a already existing namespace id based on the files namespace uri
Check if the index of nodeid or browsename given in the xml model file
must be converted to a already existing namespace id based on the files
namespace uri
:returns: NodeId (str)
"""
result = value
node_ns, node_id = self._split_node_id(value)
if node_ns:
ns_server = self.namespaces.get(int(node_ns), None)
if ns_server:
result = "ns={};i={}".format(ns_server[0], node_id)
return result
if nodeid.NamespaceIndex in self.namespaces:
nodeid = copy(nodeid)
nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
return nodeid
def _get_node(self, obj):
node = ua.AddNodesItem()
node.RequestedNewNodeId = ua.NodeId.from_string(self._get_node_id(obj.nodeid))
node.BrowseName = ua.QualifiedName.from_string(self._parse_bname(obj.browsename))
node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
node.BrowseName = self._migrate_ns(obj.browsename)
node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
if obj.parent:
node.ParentNodeId = ua.NodeId.from_string(self._get_node_id(obj.parent))
node.ParentNodeId = self._migrate_ns(obj.parent)
if obj.parentlink:
node.ReferenceTypeId = self.to_nodeid(self._get_node_id(obj.parentlink))
node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
if obj.typedef:
node.TypeDefinition = ua.NodeId.from_string(self._get_node_id(obj.typedef))
node.TypeDefinition = self._migrate_ns(obj.typedef)
return node
def to_nodeid(self, nodeid):
if not nodeid:
if isinstance(nodeid, ua.NodeId):
return nodeid
elif not nodeid:
return ua.NodeId(ua.ObjectIds.String)
elif "=" in nodeid:
return ua.NodeId.from_string(nodeid)
......@@ -167,10 +138,9 @@ class XmlImporter(object):
return ua.NodeId(getattr(ua.ObjectIds, nodeid))
else:
if nodeid in self.aliases:
nodeid = self.aliases[nodeid]
return self.aliases[nodeid]
else:
nodeid = "i={}".format(getattr(ua.ObjectIds, nodeid))
return ua.NodeId.from_string(nodeid)
return ua.NodeId(getattr(ua.ObjectIds, nodeid))
def add_object(self, obj):
node = self._get_node(obj)
......@@ -180,8 +150,9 @@ class XmlImporter(object):
attrs.DisplayName = ua.LocalizedText(obj.displayname)
attrs.EventNotifier = obj.eventnotifier
node.NodeAttributes = attrs
res = self.server.add_nodes([node])
res = self.server.iserver.isession.add_nodes([node])
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
def add_object_type(self, obj):
......@@ -192,8 +163,9 @@ class XmlImporter(object):
attrs.DisplayName = ua.LocalizedText(obj.displayname)
attrs.IsAbstract = obj.abstract
node.NodeAttributes = attrs
res = self.server.add_nodes([node])
res = self.server.iserver.isession.add_nodes([node])
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
def add_variable(self, obj):
......@@ -203,7 +175,6 @@ class XmlImporter(object):
attrs.Description = ua.LocalizedText(obj.desc)
attrs.DisplayName = ua.LocalizedText(obj.displayname)
attrs.DataType = self.to_nodeid(obj.datatype)
# if obj.value and len(obj.value) == 1:
if obj.value is not None:
attrs.Value = self._add_variable_value(obj,)
if obj.rank:
......@@ -217,8 +188,9 @@ class XmlImporter(object):
if obj.dimensions:
attrs.ArrayDimensions = obj.dimensions
node.NodeAttributes = attrs
res = self.server.add_nodes([node])
res = self.server.iserver.isession.add_nodes([node])
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
def _make_ext_obj(sefl, obj):
......@@ -246,7 +218,7 @@ class XmlImporter(object):
# we probably have a list
my_list = []
for vtype, v2 in v:
my_list.append(ua_type_to_python(v2, vtype))
my_list.append(xmlparser.ua_type_to_python(v2, vtype))
setattr(ext, attname, my_list)
else:
for attname2, v2 in v:
......@@ -266,7 +238,10 @@ class XmlImporter(object):
return values
elif obj.valuetype.startswith("ListOf"):
vtype = obj.valuetype[6:]
return [getattr(ua, vtype)(v) for v in obj.value]
if hasattr(ua.ua_binary.Primitives, vtype):
return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
else:
return [getattr(ua, vtype)(v) for v in obj.value]
elif obj.valuetype == 'ExtensionObject':
extobj = self._make_ext_obj(obj.value)
return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
......@@ -293,8 +268,9 @@ class XmlImporter(object):
if obj.dimensions:
attrs.ArrayDimensions = obj.dimensions
node.NodeAttributes = attrs
res = self.server.add_nodes([node])
res = self.server.iserver.isession.add_nodes([node])
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
def add_method(self, obj):
......@@ -312,8 +288,9 @@ class XmlImporter(object):
if obj.dimensions:
attrs.ArrayDimensions = obj.dimensions
node.NodeAttributes = attrs
res = self.server.add_nodes([node])
res = self.server.iserver.isession.add_nodes([node])
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
def add_reference_type(self, obj):
......@@ -329,8 +306,9 @@ class XmlImporter(object):
if obj.symmetric:
attrs.Symmetric = obj.symmetric
node.NodeAttributes = attrs
res = self.server.add_nodes([node])
res = self.server.iserver.isession.add_nodes([node])
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
def add_datatype(self, obj):
......@@ -342,8 +320,9 @@ class XmlImporter(object):
if obj.abstract:
attrs.IsAbstract = obj.abstract
node.NodeAttributes = attrs
res = self.server.add_nodes([node])
res = self.server.iserver.isession.add_nodes([node])
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
def _add_refs(self, obj):
......@@ -354,14 +333,13 @@ class XmlImporter(object):
ref = ua.AddReferencesItem()
ref.IsForward = True
ref.ReferenceTypeId = self.to_nodeid(data.reftype)
ref.SourceNodeId = ua.NodeId.from_string(self._get_node_id(obj.nodeid))
ref.SourceNodeId = self._migrate_ns(obj.nodeid)
ref.TargetNodeClass = ua.NodeClass.DataType
ref.TargetNodeId = ua.NodeId.from_string(self._get_node_id(data.target))
ref.TargetNodeId = self._migrate_ns(self.to_nodeid(data.target))
refs.append(ref)
self.server.add_references(refs)
self.server.iserver.isession.add_references(refs)
# FIX: wrong order of node sorting .. need to find out what is wrong
def _sort_nodes_by_parentid(self, nodes):
def _sort_nodes_by_parentid(self, ndatas):
"""
Sort the list of nodes according their parent node in order to respect
the dependency between nodes.
......@@ -369,44 +347,37 @@ class XmlImporter(object):
:param nodes: list of NodeDataObjects
:returns: list of sorted nodes
"""
_nodes = list(nodes)
_ndatas = list(ndatas)
# list of node ids that are already sorted / inserted
sorted_nodes_ids = []
# list of sorted nodes (i.e. XML Elements)
sorted_nodes = []
sorted_ndatas = []
all_node_ids = [data.nodeid for data in ndatas]
# list of namespace indexes that are relevant for this import
# we can only respect ordering nodes for namespaces indexes that
# are defined in the xml file itself. Thus we assume that all other
# references namespaces are already known to the server and should
# not create any dependency problems (like "NodeNotFound")
relevant_namespaces = [str(ns) for ns in self.namespaces.keys()]
while len(_nodes) > 0:
while len(_ndatas) > 0:
pop_nodes = []
for node in _nodes:
insert = None
# Get the node and parent node namespace and id parts
node_ns, node_id = self._split_node_id(node.nodeid)
parent_ns, parent_id = self._split_node_id(node.parent)
for ndata in _ndatas:
# Insert nodes that
# (1) have no parent / parent_ns is None (e.g. namespace 0)
# (2) ns is not in list of relevant namespaces
if (parent_ns is None or (len(relevant_namespaces) >= 1 and node_ns not in relevant_namespaces) or
parent_id is None):
insert = 0
if ndata.nodeid.NamespaceIndex not in self.namespaces or \
ndata.parent is None or \
ndata.parent not in all_node_ids:
sorted_ndatas.append(ndata)
sorted_nodes_ids.append(ndata.nodeid)
pop_nodes.append(ndata)
else:
# Check if the nodes parent is already in the list of inserted nodes
if node.parent in sorted_nodes_ids:
insert = -1
if insert == 0:
sorted_nodes.insert(insert, node)
sorted_nodes_ids.insert(insert, node.nodeid)
pop_nodes.append(node)
elif insert == -1:
sorted_nodes.append(node)
sorted_nodes_ids.append(node.nodeid)
pop_nodes.append(node)
# Check if the nodes parent is already in the list of
# inserted nodes
if ndata.parent in sorted_nodes_ids:
sorted_ndatas.append(ndata)
sorted_nodes_ids.append(ndata.nodeid)
pop_nodes.append(ndata)
# Remove inserted nodes from the list
for node in pop_nodes:
_nodes.pop(_nodes.index(node))
return sorted_nodes
for ndata in pop_nodes:
_ndatas.pop(_ndatas.index(ndata))
return sorted_ndatas
......@@ -15,6 +15,24 @@ def _to_bool(val):
return False
def ua_type_to_python(val, uatype):
if uatype.startswith("Int") or uatype.startswith("UInt"):
return int(val)
elif uatype.lower().startswith("bool"):
return _to_bool(val)
elif uatype in ("Double", "Float"):
return float(val)
elif uatype in ("String"):
return val
elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
if sys.version_info.major > 2:
return bytes(val, 'utf8')
else:
return val
else:
raise Exception("uatype nopt handled", uatype, " for val ", val)
class NodeData(object):
def __init__(self):
......@@ -49,6 +67,10 @@ class NodeData(object):
# datatype
self.definition = []
def __str__(self):
return "NodeData(nodeid:{})".format(self.nodeid)
__repr__ = __str__
class RefStruct(object):
......@@ -69,8 +91,7 @@ class ExtObj(object):
class XMLParser(object):
def __init__(self, xmlpath, server):
self.server = server # POC
def __init__(self, xmlpath):
self.logger = logging.getLogger(__name__)
self._retag = re.compile(r"(\{.*\})(.*)")
self.path = xmlpath
......@@ -104,16 +125,15 @@ class XMLParser(object):
break
return aliases
def __iter__(self):
def get_node_datas(self):
nodes = []
for child in self.root:
name = self._retag.match(child.tag).groups()[1]
if name not in ["Aliases", "NamespaceUris"]:
node = self._parse_node(name, child)
nodes.append(node)
self.it = iter(nodes)
return self
return nodes
def __next__(self):
while True:
......@@ -219,6 +239,8 @@ class XMLParser(object):
obj.value = self._parse_list_of_extension_object(el)
elif ntag == "ListOfLocalizedText":
obj.value = self._parse_list_of_localized_text(el)
elif ntag.startswith("ListOf"):
obj.value = self._parse_list(el[0])
elif ntag == "ExtensionObject":
obj.value = self._parse_extension_object(el)
else:
......@@ -228,6 +250,17 @@ class XMLParser(object):
txtlist = [txt.strip() for txt in el.itertext()]
return "".join(txtlist)
def _parse_list(self, el):
value = []
for val_el in el:
ntag = self._retag.match(val_el.tag).groups()[1]
if ntag.startswith("ListOf"):
val = self._parse_list(val_el)
else:
val = ua_type_to_python(val_el.text, ntag)
value.append(val)
return value
def _parse_list_of_localized_text(self, el):
value = []
for localized_text_list in el:
......
......@@ -400,8 +400,8 @@ class Server(object):
"""
Import nodes defined in xml
"""
importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
return importer.import_xml(path, self)
importer = xmlimporter.XmlImporter(self)
return importer.import_xml(path)
def export_xml(self, nodes, path):
"""
......
......@@ -443,49 +443,6 @@ class TestUnit(unittest.TestCase):
self.assertTrue(wce.eval(ev))
def test_xmlparser_get_node_id(self):
server = None
importer = XmlImporter(server)
res1 = importer._get_node_id('i=1001')
self.assertEqual(res1, 'i=1001')
res2 = importer._get_node_id('ns=1;i=1001')
self.assertEqual(res2, 'ns=1;i=1001')
importer.namespaces = {1: [3, 'http://someuri.com']}
res3 = importer._get_node_id('ns=1;i=1001')
self.assertEqual(res3, 'ns=3;i=1001')
importer.namespaces = {1: [3, 'http://someuri.com']}
res4 = importer._get_node_id('ns=2;i=1001')
self.assertEqual(res4, 'ns=2;i=1001')
def test_xmlparser_sort_nodes_by_parentid(self):
NodeMock = namedtuple('NodeMock', 'nodeid parent')
server = None
unordered_nodes = [
NodeMock('ns=1;i=1001', None),
NodeMock('ns=1;i=1002', 'ns=1;i=1003'),
NodeMock('ns=1;i=1003', 'ns=1;i=1001'),
NodeMock('ns=1;i=1004', 'ns=1;i=1002')
]
ordered_nodes = [
unordered_nodes[0],
unordered_nodes[2],
unordered_nodes[1],
unordered_nodes[3],
]
namespaces = {'1': (1, 'http://someuri.com')}
importer = XmlImporter(server)
importer.namespaces = namespaces
res = importer._sort_nodes_by_parentid(unordered_nodes)
self.assertEqual(res, ordered_nodes)
class TestMaskEnum(unittest.TestCase):
class MyEnum(_MaskEnum):
......
......@@ -10,6 +10,10 @@ def func(parent, value, string):
class XmlTests(object):
srv = None
opc = None # just to remove pylint warnings
assertEqual = dir
def test_xml_import(self):
self.srv.import_xml("tests/custom_nodes.xml")
o = self.opc.get_objects_node()
......@@ -58,12 +62,10 @@ class XmlTests(object):
self.assertEqual(r3.NodeId.NamespaceIndex, ns)
def test_xml_method(self):
o = self.opc.nodes.objects.add_object(2, "xmlexportobj")
self.opc.register_namespace("tititi")
self.opc.register_namespace("whatthefuck")
o = self.opc.nodes.objects.add_object(2, "xmlexportmethod")
m = o.add_method(2, "callme", func, [ua.VariantType.Double, ua.VariantType.String], [ua.VariantType.Float])
v = o.add_variable(3, "myxmlvar", 6.78, ua.VariantType.Float)
a = o.add_variable(3, "myxmlvar", [6, 1], ua.VariantType.UInt16)
a2 = o.add_variable(3, "myxmlvar", [[]], ua.VariantType.ByteString)
print(a.get_value_rank)
# set an arg dimension to a list to test list export
inputs = m.get_child("InputArguments")
val = inputs.get_value()
......@@ -78,7 +80,6 @@ class XmlTests(object):
self.opc.export_xml(nodes, "export.xml")
self.opc.delete_nodes(nodes)
self.opc.import_xml("export.xml")
# now see if our nodes are here
......@@ -87,19 +88,38 @@ class XmlTests(object):
self.assertEqual(val[0].ArrayDimensions, [2, 2])
self.assertEqual(val[0].Description.Text, desc)
def test_xml_vars(self):
self.opc.register_namespace("tititi")
self.opc.register_namespace("whatthefuck")
o = self.opc.nodes.objects.add_object(2, "xmlexportobj")
v = o.add_variable(3, "myxmlvar", 6.78, ua.VariantType.Float)
a = o.add_variable(3, "myxmlvar-array", [6, 1], ua.VariantType.UInt16)
a2 = o.add_variable(3, "myxmlvar-2dim", [[1, 2], [3,4]], ua.VariantType.UInt32)
a3 = o.add_variable(3, "myxmlvar-2dim", [[]], ua.VariantType.ByteString)
nodes = [o, v, a, a2, a3]
self.opc.export_xml(nodes, "export-vars.xml")
self.opc.delete_nodes(nodes)
self.opc.import_xml("export-vars.xml")
self.assertEqual(v.get_value(), 6.78)
self.assertEqual(v.get_data_type(), ua.NodeId(ua.ObjectIds.Float))
self.assertEqual(a.get_value(), [6, 1])
self.assertEqual(a.get_data_type(), ua.NodeId(ua.ObjectIds.UInt16))
self.assertIn(a.get_value_rank(), (0, 1))
self.assertEqual(a.get_value(), [6, 1])
self.assertEqual(a2.get_value(), [[]])
self.assertEqual(a2.get_data_type(), ua.NodeId(ua.ObjectIds.ByteString))
self.assertEqual(a2.get_value(), [[1, 2],[3, 4]])
self.assertEqual(a2.get_data_type(), ua.NodeId(ua.ObjectIds.UInt32))
self.assertIn(a2.get_value_rank(), (0, 2))
self.assertEqual(a2.get_attribute(ua.AttributeIds.ArrayDimensions).Value.Value, [1, 0])
self.assertEqual(a2.get_attribute(ua.AttributeIds.ArrayDimensions).Value.Value, [2, 2])
#self.assertEqual(a3.get_value(), [[]]) # would require special code ...
self.assertEqual(a3.get_data_type(), ua.NodeId(ua.ObjectIds.ByteString))
self.assertIn(a3.get_value_rank(), (0, 2))
self.assertEqual(a3.get_attribute(ua.AttributeIds.ArrayDimensions).Value.Value, [1, 0])
def test_export_import_ext_obj(self):
def test_xml_ext_obj(self):
arg = ua.Argument()
arg.DataType = ua.NodeId(ua.ObjectIds.Float)
arg.Description = ua.LocalizedText(b"This is a nice description")
......@@ -123,37 +143,57 @@ class XmlTests(object):
def test_xml_ns(self):
ns_array = self.opc.get_namespace_array()
if len(ns_array) <= 2:
if len(ns_array) < 3:
self.opc.register_namespace("dummy_ns")
print("ARRAY", self.opc.get_namespace_array())
ref_ns = self.opc.register_namespace("ref_namespace")
new_ns = self.opc.register_namespace("my_new_namespace")
bname_ns = self.opc.register_namespace("bname_namespace")
o = self.opc.nodes.objects.add_object(0, "xmlns0")
o2 = self.opc.nodes.objects.add_object(2, "xmlns2")
o20 = self.opc.nodes.objects.add_object(20, "xmlns20")
o50 = self.opc.nodes.objects.add_object(50, "xmlns20")
o200 = self.opc.nodes.objects.add_object(200, "xmlns200")
onew = self.opc.nodes.objects.add_object(new_ns, "xmlns_new")
vnew = onew.add_variable(new_ns, "xmlns_new_var", 9.99)
o_no_export = self.opc.nodes.objects.add_object(ref_ns, "xmlns_parent")
v_no_parent = o_no_export.add_variable(new_ns, "xmlns_new_var_no_parent", 9.99)
o_bname = onew.add_object("ns={};i=4000".format(new_ns), "{}:BNAME".format(bname_ns))
nodes = [o, o2, o20, o200, onew, vnew]
nodes = [o, o50, o200, onew, vnew, v_no_parent, o_bname]
print("CREATED", nodes, o_no_export)
self.opc.export_xml(nodes, "export-ns.xml")
# delete node and change index og new_ns before re-importing
self.opc.delete_nodes(nodes)
ns_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
nss = ns_node.get_value()
nss.remove("my_new_namespace")
#nss.remove("ref_namespace")
nss.remove("bname_namespace")
ns_node.set_value(nss)
new_ns = self.opc.register_namespace("my_new_namespace_offsett")
new_ns = self.opc.register_namespace("my_new_namespace")
self.opc.import_xml("export-ns.xml")
new_nodes = self.opc.import_xml("export-ns.xml")
print("NEW NODES", new_nodes)
for i in nodes[:-2]:
for i in [o, o50, o200]:
print(i)
i.get_browse_name()
with self.assertRaises(uaerrors.BadNodeIdUnknown):
onew.get_browse_name()
onew.nodeid.NamespaceIndex += 1
# since my_new_namesspace2 is referenced byt a node it should have been reimported
nss = self.opc.get_namespace_array()
self.assertIn("bname_namespace", nss)
# get index of namespaces after import
new_ns = self.opc.register_namespace("my_new_namespace")
bname_ns = self.opc.register_namespace("bname_namespace")
print("ARRAY 2", self.opc.get_namespace_array())
print("NEW NS", new_ns, onew)
onew.nodeid.NamespaceIndex = new_ns
print("OENE", onew)
onew.get_browse_name()
vnew2 = onew.get_children()[0]
self.assertEqual(vnew.nodeid.NamespaceIndex + 1, vnew2.nodeid.NamespaceIndex)
vnew.nodeid.NamespaceIndex += 1
self.assertEqual(new_ns, vnew2.nodeid.NamespaceIndex)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment