Commit 2df2392e authored by olivier R-D's avatar olivier R-D

better support for extension object import

parent 4fb4eaa0
......@@ -9,6 +9,25 @@ from opcua import ua
from opcua.common import xmlparser
def ua_type_to_python(val, uatype):
if uatype.startswith("Int") or uatype.startswith("UInt"):
return int(val)
elif uatype in ("String"):
return val
elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
return bytes(val, 'utf8')
else:
raise Exception("uatype nopt handled", uatype, " for val ", val)
def to_python(val, obj, attname):
print(val, obj, attname)
if isinstance(obj, ua.NodeId) and attname == "Identifier":
return ua.NodeId.from_string(val)
else:
return ua_type_to_python(val, obj.ua_types[attname])
class XmlImporter(object):
def __init__(self, server):
......@@ -119,36 +138,63 @@ class XmlImporter(object):
res = self.server.add_nodes([node])
self._add_refs(obj)
return res[0].AddedNodeId
def _make_ext_obj(sefl, obj):
ext = getattr(ua, obj.objname)()
for name, val in obj.body.items():
if type(val) is str:
raise Exception("Error val should a dict", name, val)
else:
for attname, v in val.items():
if type(v) is str:
setattr(ext, attname, to_python(v, ext, attname))
else:
print("v is not string, it is", v, type(v))
for attname2, v2 in v.items():
obj2 = getattr(ext, attname)
setattr(obj2, attname2, to_python(v2, obj2, attname2))
return ext
def _add_variable_value(self, obj):
"""
Returns the value for a Variable based on the objects valuetype.
"""
if obj.valuetype == 'ListOfLocalizedText':
return ua.Variant([ua.LocalizedText(txt) for txt in obj.value], None)
elif obj.valuetype == 'EnumValueType':
if obj.valuetype == 'ListOfExtensionObject':
values = []
for ev in obj.value:
enum_value = ua.EnumValueType()
enum_value.DisplayName = ua.LocalizedText(ev['DisplayName'])
enum_value.Description = ua.LocalizedText(ev['Description'])
enum_value.Value = int(ev['Value'])
values.append(enum_value)
for ext in obj.value:
extobj = self._make_ext_obj(ext)
values.append(extobj)
return values
elif obj.valuetype == 'Argument':
values = []
for arg in obj.value:
argument = ua.Argument()
argument.Name = arg['Name']
argument.Description = ua.LocalizedText(arg['Description'])
argument.DataType = self.to_nodeid(arg['DataType'])
argument.ValueRank = int(arg['ValueRank'])
argument.ArrayDimensions = arg['ArrayDimensions']
values.append(argument)
return values
return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
elif obj.valuetype.startswith("ListOf"):
vtype = obj.valuetype[6:]
return [getattr(ua, vtype)(v) for v in obj.value]
else:
return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
#if obj.valuetype == 'ListOfLocalizedText':
#return ua.Variant([ua.LocalizedText(txt) for txt in obj.value], None)
#elif obj.valuetype == 'ListOfExtension':
#elif obj.valuetype == 'EnumValueType':
#values = []
#for ev in obj.value:
#enum_value = ua.EnumValueType()
#enum_value.DisplayName = ua.LocalizedText(ev['DisplayName'])
#enum_value.Description = ua.LocalizedText(ev['Description'])
#enum_value.Value = int(ev['Value'])
#values.append(enum_value)
#return values
#elif obj.valuetype == 'Argument':
#values = []
#for arg in obj.value:
#argument = ua.Argument()
#argument.Name = arg['Name']
#argument.Description = ua.LocalizedText(arg['Description'])
#argument.DataType = self.to_nodeid(arg['DataType'])
#argument.ValueRank = int(arg['ValueRank'])
#argument.ArrayDimensions = arg['ArrayDimensions']
#values.append(argument)
#return values
#return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
def add_variable_type(self, obj):
node = self._get_node(obj)
......
......@@ -8,6 +8,14 @@ import sys
import xml.etree.ElementTree as ET
def _to_bool(val):
if val in ("True", "true", "on", "On", "1"):
return True
else:
return False
class NodeData(object):
def __init__(self):
......@@ -222,11 +230,11 @@ class XMLParser(object):
elif key == "DataType":
obj.datatype = val
elif key == "IsAbstract":
obj.abstract = self._to_bool(val)
obj.abstract = _to_bool(val)
elif key == "Executable":
obj.executable = self._to_bool(val)
obj.executable = _to_bool(val)
elif key == "EventNotifier":
obj.eventnotifier = 1 if val == "1" else 0
obj.eventnotifier = int(val)
elif key == "ValueRank":
obj.rank = int(val)
elif key == "ArrayDimensions":
......@@ -238,15 +246,10 @@ class XMLParser(object):
elif key == "UserAccessLevel":
obj.useraccesslevel = int(val)
elif key == "Symmetric":
obj.symmetric = self._to_bool(val)
obj.symmetric = _to_bool(val)
else:
self.logger.info("Attribute not implemented: %s:%s", key, val)
def _to_bool(self, val):
if val in ("False, false"):
return False
else:
return True
def _parse_tag(self, el, obj):
tag = self._retag.match(el.tag).groups()[1]
......@@ -277,10 +280,7 @@ class XMLParser(object):
elif ntag in ("Float", "Double"):
obj.value = float(val.text)
elif ntag in ("Boolean"):
if val.text in ("True", "true", "1", "on", "On"):
obj.value = bool(1)
else:
obj.value = bool(0)
obj.value = _to_bool(val.text)
elif ntag in ("ByteString", "String"):
mytext = val.text
if mytext is None: # support importing null strings
......@@ -291,20 +291,15 @@ class XMLParser(object):
elif ntag in ("Guid"):
self._parse_value(val, obj)
elif ntag == "ListOfExtensionObject":
obj.valuetype = "ExtensionObject"
obj.value = self._parse_list_of_extension_object(el)
elif ntag == "ListOfLocalizedText":
obj.valuetype = "LocalizedText"
obj.value = self._parse_list_of_localized_text(el)
else:
self.logger.info("Value type not implemented: '%s', %s", ntag)
def _get_text(self, el):
txt = ""
for text in el.itertext():
txt += text
txt = txt.strip()
return txt
txtlist = [txt.strip() for txt in el.itertext()]
return "".join(txtlist)
def _parse_list_of_localized_text(self, el):
value = []
......@@ -320,9 +315,7 @@ class XMLParser(object):
def _parse_list_of_extension_object(self, el):
'''
Parse a uax:ListOfExtensionObject Value
Return an array with a value of each uax:ExtensionObject/*/* (each element is convert to a netry in a dict.
also the valuetype is returned. The valuetype is uax:ExtensionObject/*/tag()
Return an list of ExtObj
'''
value = []
for extension_object_list in el:
......@@ -349,15 +342,11 @@ class XMLParser(object):
def _parse_body(self, el):
body = {}
#ntag = self._retag.match(el.find('*').tag).groups()[1]
#body[ntag] = {}
for body_item in el:
otag = self._retag.match(body_item.tag).groups()[1]
childs = [i for i in body_item]
if not childs:
val = self._get_text(body_item)
elif len(childs) == 1:
val = self._get_text(childs[0])
else:
val = self._parse_body(body_item)
if val:
......
......@@ -555,6 +555,11 @@ class LocalizedText(FrozenClass):
A string qualified with a namespace index.
'''
ua_types = {
"Text": "Bytes",
"Locale": "Bytes"
}
def __init__(self, text=None):
self.Encoding = 0
self.Text = text
......
......@@ -9,6 +9,26 @@ import logging
import xmlparser
def _to_val(objs, attr, val):
from opcua import ua
print("got ", objs, attr, val)
cls = getattr(ua, objs[0])
for o in objs[1:]:
cls = getattr(ua, cls.ua_types[o])
if cls == ua.NodeId:
return "ua.NodeId.from_string('val')"
return ua_type_to_python(val, cls.ua_types[attr])
def ua_type_to_python(val, uatype):
if uatype in ("String"):
return "'{}'".format(val)
elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
return "b'{}'".format(val)
else:
return val
class CodeGenerator(object):
def __init__(self, input_path, output_path):
......@@ -115,17 +135,19 @@ def create_standard_address_space_%s(server):
self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{}")'.format(obj.displayname))
self.writecode(indent, 'attrs.DataType = {}'.format(self.to_data_type(obj.datatype)))
if obj.value is not None:
if obj.valuetype == "ExtensionObject":
if isinstance(obj.value, (list, tuple)):
self.writecode(indent, 'value = []')
for ext in obj.value:
self.make_ext_obj_code(indent, ext)
self.writecode(indent, 'value.append(extobj)')
else:
self.make_ext_obj_code(indent, obj.value)
self.writecode(indent, 'value = extobj')
if obj.valuetype == "ListOfExtensionObject":
self.writecode(indent, 'value = []')
for ext in obj.value:
self.make_ext_obj_code(indent, ext)
self.writecode(indent, 'value.append(extobj)')
self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
elif obj.valuetype == "ExtensionObject":
self.make_ext_obj_code(indent, obj.value)
self.writecode(indent, 'value = extobj')
self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
else:
if obj.valuetype.startswith("ListOf"):
obj.valuetype = obj.valuetype[6:]
self.writecode(indent, 'attrs.Value = ua.Variant({}, ua.VariantType.{})'.format(self.to_value(obj.value), obj.valuetype))
if obj.rank:
self.writecode(indent, 'attrs.ValueRank = {}'.format(obj.rank))
......@@ -146,10 +168,16 @@ def create_standard_address_space_%s(server):
else:
for k, v in val.items():
if type(v) is str:
self.writecode(indent, 'extobj.{} = "{}"'.format(k, v))
val = _to_val([extobj.objname], k, v)
self.writecode(indent, 'extobj.{} = "{}"'.format(k, val))
else:
if k == "DataType": #hack for strange nodeid xml format
self.writecode(indent, 'extobj.{} = ua.NodeId.from_string("{}")'.format(k, v["Identifier"]))
continue
for k2, v2 in v.items():
self.writecode(indent, 'extobj.{}.{} = "{}"'.format(k, k2, v2))
val2 = _to_val([extobj.objname, k], k2, v2)
self.writecode(indent, 'extobj.{}.{} = "{}"'.format(k, k2, val2))
def make_variable_code(self, obj):
indent = " "
......
......@@ -121,6 +121,13 @@ class CodeGenerator(object):
self.write(":vartype {}: {}".format(field.name, field.uatype))
self.write("'''")
self.write("")
self.write("ua_types = {")
for field in obj.fields:
self.write(" '{}': '{}',".format(field.name, field.uatype))
self.write(" }")
self.write("")
self.write("def __init__(self, binary=None):")
self.iidx += 1
self.write("if binary is not None:")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment