Commit cf2066d6 authored by olivier R-D's avatar olivier R-D

export extensionobjects to xml. add test for xml export

parent 14e7e0f8
......@@ -242,11 +242,12 @@ def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, is
attrs.DataType = _guess_datatype(var)
attrs.Value = var
if isinstance(var, list) or isinstance(var, tuple):
attrs.ValueRank = ua.ValueRank.OneDimension
else:
if not isinstance(var.Value, (list, tuple)):
attrs.ValueRank = ua.ValueRank.Scalar
#attrs.ArrayDimensions = None
else:
if var.Dimensions:
attrs.ValueRank = len(var.Dimensions)
attrs.ArrayDimensions = var.Dimensions
attrs.WriteMask = 0
attrs.UserWriteMask = 0
attrs.Historizing = 0
......
......@@ -58,19 +58,20 @@ class XmlExporter(object):
# try to write the XML etree to a file
self.logger.info('Exporting XML file to %s', xmlpath)
#from IPython import embed
#embed()
# embed()
if pretty:
indent(self.etree.getroot())
self.etree.write(xmlpath,
short_empty_elements=False,
encoding='utf-8',
self.etree.write(xmlpath,
short_empty_elements=False,
encoding='utf-8',
xml_declaration=True
)
)
else:
self.etree.write(xmlpath,
short_empty_elements=False,
encoding='utf-8',
xml_declaration=True)
self.etree.write(xmlpath,
short_empty_elements=False,
encoding='utf-8',
xml_declaration=True
)
def dump_etree(self):
"""
......@@ -130,7 +131,7 @@ class XmlExporter(object):
self._add_sub_el(node_el, 'DisplayName', displayname)
if desc not in (None, ""):
self._add_sub_el(node_el, 'Description', desc.decode('utf-8'))
#FIXME: add WriteMask and UserWriteMask
# FIXME: add WriteMask and UserWriteMask
return node_el
def add_etree_object(self, node):
......@@ -203,7 +204,7 @@ class XmlExporter(object):
abstract = node.get_attribute(ua.AttributeIds.IsAbstract)
if abstract.Value.Value:
var_el.attrib["IsAbstract"] = "true"
var_el.attrib["IsAbstract"] = "true"
self._add_ref_els(var_el, node)
......@@ -267,6 +268,28 @@ class XmlExporter(object):
self.aliases[ref_name] = ref.ReferenceTypeId.to_string()
def member_to_etree(el, name, dtype, val):
member_el = Et.SubElement(el, "uax:" + name)
if isinstance(val, (list, tuple)):
for v in val:
_value_to_etree(member_el, ua.ObjectIdNames[dtype.Identifier], dtype, v)
else:
_val_to_etree(member_el, dtype, val)
def _val_to_etree(el, dtype, val):
if val is None:
val = ""
if dtype == ua.NodeId(ua.ObjectIds.NodeId):
id_el = Et.SubElement(el, "uax:Identifier")
id_el.text = val.to_string()
elif not hasattr(val, "ua_types"):
el.text = str(val)
else:
for name, vtype in val.ua_types.items():
member_to_etree(el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
def value_to_etree(el, dtype_name, dtype, node):
var = node.get_data_value().Value
if var.Value is not None:
......@@ -274,28 +297,35 @@ def value_to_etree(el, dtype_name, dtype, node):
_value_to_etree(val_el, dtype_name, dtype, var.Value)
def _value_to_etree(el, dtype_name, dtype, val):
def _value_to_etree(el, type_name, dtype, val):
if not val:
return
if isinstance(val, (list, tuple)):
list_el = Et.SubElement(el, "uax:ListOf" + dtype_name)
if dtype.Identifier > 21: # this is an extentionObject:
elname = "uax:ListOfExtensionObject"
else:
elname = "uax:ListOf" + type_name
list_el = Et.SubElement(el, elname)
for nval in val:
_value_to_etree(list_el, dtype_name, dtype, nval)
_value_to_etree(list_el, type_name, dtype, nval)
else:
if dtype.Identifier is int and dtype.Identifier > 21: # this is an extentionObject:
_extobj_to_etree(el, dtype_name, dtype, val)
if dtype.Identifier > 21: # this is an extentionObject:
_extobj_to_etree(el, type_name, dtype, val)
else:
val_el = Et.SubElement(el, "uax:" + dtype_name)
val_el.text = str(val)
val_el = Et.SubElement(el, "uax:" + type_name)
_val_to_etree(val_el, dtype, val)
def _extobj_to_etree(val_el, dtype_name, dtype, val):
def _extobj_to_etree(val_el, name, dtype, val):
obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
type_el = Et.SubElement(obj_el, "uax:TypeId")
id_el = Et.SubElement(type_el, "uax:Identifier")
id_el.text = val.TypeId.to_string()
id_el.text = dtype.to_string()
body_el = Et.SubElement(obj_el, "uax:Body")
struct_el = Et.SubElement(body_el, "uax:" + dtype_name)
# FIXME: finish
struct_el = Et.SubElement(body_el, "uax:" + name)
for name, vtype in val.ua_types.items():
member_to_etree(struct_el, name, ua.NodeId(getattr(ua.ObjectIds, vtype)), getattr(val, name))
def indent(elem, level=0):
'''
......@@ -303,14 +333,14 @@ def indent(elem, level=0):
it basically walks your tree and adds spaces and newlines so the tree is
printed in a nice way
'''
i = "\n" + level*" "
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
if not elem.tail or not elem.tail.strip():
elem.tail = i
for elem in elem:
indent(elem, level+1)
indent(elem, level + 1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
......
......@@ -22,6 +22,7 @@ from opcua.client.client import Client
from opcua.crypto import security_policies
from opcua.common.event_objects import BaseEvent
from opcua.common.shortcuts import Shortcuts
from opcua.common.xmlexporter import XmlExporter
use_crypto = True
try:
from opcua.crypto import uacrypto
......@@ -397,11 +398,20 @@ class Server(object):
def import_xml(self, path):
"""
import nodes defined in xml
Import nodes defined in xml
"""
importer = xmlimporter.XmlImporter(self.iserver.node_mgt_service)
return importer.import_xml(path, self)
def export_xml(self, nodes, path):
"""
Export defined nodes to xml
"""
exp = XmlExporter(self)
uris = self.get_namespace_array()[2:]
exp.build_etree(nodes, uris=uris)
return exp.write_xml(path)
def delete_nodes(self, nodes, recursive=False):
return delete_nodes(self.iserver.isession, nodes, recursive)
......
"""
implement ua datatypes
"""
import logging
import struct
from enum import Enum, IntEnum, EnumMeta
from datetime import datetime
......@@ -21,9 +20,6 @@ from opcua.common.uaerrors import UaStatusCodeError
from opcua.common.uaerrors import UaStringParsingError
logger = logging.getLogger('opcua.uaprotocol')
def get_win_epoch():
return uabin.win_epoch_to_datetime(0)
......@@ -275,8 +271,9 @@ class NodeId(FrozenClass):
:ivar ServerIndex:
:vartype ServerIndex: Int
"""
def __init__(self, identifier=None, namespaceidx=0, nodeidtype=None):
self.Identifier = identifier
self.NamespaceIndex = namespaceidx
self.NodeIdType = nodeidtype
......@@ -558,8 +555,8 @@ class LocalizedText(FrozenClass):
'''
ua_types = {
"Text": "Bytes",
"Locale": "Bytes"
"Text": "ByteString",
"Locale": "ByteString"
}
def __init__(self, text=None):
......
......@@ -6,6 +6,7 @@ from opcua import ua
from tests_subscriptions import SubscriptionTests
from tests_common import CommonTests, add_server_methods
from tests_xml import XmlTests
port_num1 = 48510
......
......@@ -5,7 +5,6 @@ from datetime import datetime
from datetime import timedelta
import math
import opcua
from opcua import ua
from opcua import uamethod
from opcua import instantiate
......
......@@ -4,6 +4,7 @@ import shelve
import time
from tests_common import CommonTests, add_server_methods
from tests_xml import XmlTests
from tests_subscriptions import SubscriptionTests
from datetime import timedelta, datetime
from tempfile import NamedTemporaryFile
......@@ -20,7 +21,7 @@ port_num = 48540
port_discovery = 48550
class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
class TestServer(unittest.TestCase, CommonTests, SubscriptionTests, XmlTests):
'''
Run common tests on server side
......@@ -136,31 +137,6 @@ class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
result = o.call_method(v, ua.Variant(2.1))
self.assertEqual(result, 4.2)
def test_xml_import(self):
self.srv.import_xml("tests/custom_nodes.xml")
o = self.opc.get_objects_node()
v = o.get_child(["MyXMLFolder", "MyXMLObject", "MyXMLVariable"])
val = v.get_value()
self.assertEqual(val, "StringValue")
node_path = ["Types", "DataTypes", "BaseDataType", "Enumeration",
"1:MyEnum", "0:EnumStrings"]
o = self.opc.get_root_node().get_child(node_path)
self.assertEqual(len(o.get_value()), 3)
# Check if method is imported
node_path = ["Types", "ObjectTypes", "BaseObjectType",
"1:MyObjectType", "1:MyMethod"]
o = self.opc.get_root_node().get_child(node_path)
self.assertEqual(len(o.get_referenced_nodes()), 4)
# Check if InputArgs are imported and can be read
node_path = ["Types", "ObjectTypes", "BaseObjectType",
"1:MyObjectType", "1:MyMethod", "InputArguments"]
o = self.opc.get_root_node().get_child(node_path)
input_arg = o.get_data_value().Value.Value[0]
self.assertEqual(input_arg.Name, 'Context')
def test_historize_variable(self):
o = self.opc.get_objects_node()
var = o.add_variable(3, "test_hist", 1.0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment