Commit 7542cbbd authored by ORD's avatar ORD Committed by GitHub

Merge pull request #296 from FreeOpcUa/bin

better support for extension object from xml, split binary code
parents 5df77dab 96559351
......@@ -201,12 +201,12 @@ class Node(object):
def set_attr_bit(self, attr, bit):
val = self.get_attribute(attr)
val.Value.Value = ua.set_bit(val.Value.Value, bit)
val.Value.Value = ua.ua_binary.set_bit(val.Value.Value, bit)
self.set_attribute(attr, val)
def unset_attr_bit(self, attr, bit):
val = self.get_attribute(attr)
val.Value.Value = ua.unset_bit(val.Value.Value, bit)
val.Value.Value = ua.ua_binary.unset_bit(val.Value.Value, bit)
self.set_attribute(attr, val)
def set_read_only(self):
......@@ -326,7 +326,7 @@ class Node(object):
desc.NodeId = self.nodeid
params = ua.BrowseParameters()
params.View.Timestamp = ua.win_epoch_to_datetime(0)
params.View.Timestamp = ua.get_win_epoch()
params.NodesToBrowse.append(desc)
results = self.server.browse(params)
return results[0].References
......@@ -434,11 +434,11 @@ class Node(object):
if starttime:
details.StartTime = starttime
else:
details.StartTime = ua.DateTimeMinValue
details.StartTime = ua.get_win_epoch()
if endtime:
details.EndTime = endtime
else:
details.EndTime = ua.DateTimeMinValue
details.EndTime = ua.get_win_epoch()
details.NumValuesPerNode = numvalues
details.ReturnBounds = True
result = self.history_read(details)
......@@ -473,11 +473,11 @@ class Node(object):
if starttime:
details.StartTime = starttime
else:
details.StartTime = ua.DateTimeMinValue
details.StartTime = ua.get_win_epoch()
if endtime:
details.EndTime = endtime
else:
details.EndTime = ua.DateTimeMinValue
details.EndTime = ua.get_win_epoch()
details.NumValuesPerNode = numvalues
if not isinstance(evtypes, (list, tuple)):
......
......@@ -125,7 +125,7 @@ def get_node_subtypes(node, nodes=None):
return nodes
def get_node_supertypes(node, includeitself = False, skipbase = True):
def get_node_supertypes(node, includeitself=False, skipbase=True):
"""
return get all subtype parents of node recursive
:param server: used in case node is nodeid
......@@ -134,12 +134,12 @@ def get_node_supertypes(node, includeitself = False, skipbase = True):
:param skipbase don't include the toplevel one
:returns list of ua.Node, top parent first
"""
parents =[]
parents = []
if includeitself:
parents.append(node)
parents.extend(_get_node_supertypes(node))
if skipbase and len(parents) > 1:
parents = parents [:-1]
parents = parents[:-1]
return parents
......@@ -151,12 +151,13 @@ def _get_node_supertypes(node):
basetypes = []
parents = node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=True)
if len(parents) != 0:
#TODO: Is it possible to have multiple subtypes ? If so extended support for it
basetypes.append(parents[0])
basetypes.extend( _get_node_supertypes(parents[0]) )
# TODO: Is it possible to have multiple subtypes ? If so extended support for it
basetypes.append(parents[0])
basetypes.extend(_get_node_supertypes(parents[0]))
return basetypes
def is_child_present(node, browsename):
"""
return if a browsename is present a child from the provide node
......@@ -170,3 +171,33 @@ def is_child_present(node, browsename):
return True
return False
def dtype_to_vtype(server, dtype_node):
"""
Given a node datatype, find out the variant type to encode
data. This is not exactly straightforward...
"""
# first check if datatype is a simple built in type
identifier = dtype_node.nodeid.Identifier
if isinstance(identifier, int) and identifier <= 22:
return ua.VariantType(identifier)
# now handle some special cases
parents = _get_node_supertypes(dtype_node)
if not parents:
raise ua.UaError("Datatype must be a subtype of builtin types")
parent = parents[0]
if parent.nodeid.Identifier == 29:
# we have an enumeration, we need to llok at child to find type
descs = dtype_node.get_children_descriptions()
bnames = [d.BrowseName.Name for d in descs]
if "EnumStrings" in bnames:
return ua.VariantType.LocalizedText
elif "EnumValues" in bnames:
return ua.VariantType.ExtensionObject
else:
raise ua.UaError("Enumeration must have a child node describing its type and values")
return dtype_to_vtype(server, parents[0])
......@@ -112,9 +112,7 @@ class XmlExporter(object):
parent = node.get_parent()
displayname = node.get_display_name().Text.decode(encoding='UTF8')
desc = node.get_description().Text
if desc is None:
desc = ""
print("NODE COMMON", node, desc)
print("NODE COMMON", node)
node_el = Et.SubElement(self.etree.getroot(),
nodetype,
BrowseName=browsename,
......@@ -122,7 +120,7 @@ class XmlExporter(object):
if parent is not None:
node_el.attrib["ParentNodeId"] = parent.nodeid.to_string()
if desc not in (None, ""):
node_el.attrib["Description"] = desc
node_el.attrib["Description"] = str(desc)
disp_el = Et.SubElement(node_el, 'DisplayName', )
disp_el.text = displayname
return node_el
......@@ -146,14 +144,14 @@ class XmlExporter(object):
def add_variable_common(self, node, el):
dtype = node.get_data_type()
if dtype.Identifier in o_ids.ObjectIdNames:
datatype = o_ids.ObjectIdNames[dtype.Identifier]
self.aliases[datatype] = dtype.to_string()
dtype_name = o_ids.ObjectIdNames[dtype.Identifier]
self.aliases[dtype_name] = dtype.to_string()
else:
datatype = dtype.to_string()
dtype_name = dtype.to_string()
rank = node.get_value_rank()
el.attrib["DataType"] = datatype
el.attrib["ValueRank"] = str(rank)
variant_to_etree(el, node.get_data_value().Value)
el.attrib["DataType"] = dtype_name
el.attrib["ValueRank"] = str(int(rank))
value_to_etree(el, dtype_name, dtype, node)
def add_etree_variable(self, node):
"""
......@@ -238,10 +236,32 @@ class XmlExporter(object):
self.aliases[ref_name] = ref_nodeid
def variant_to_etree(el, var):
val_el = Et.SubElement(el, 'Value')
valx_el = Et.SubElement(val_el, var.VariantType.name)
valx_el.attrib["xmnls"] = "http://opcfoundation.org/UA/2008/02/Types.xsd"
valx_el.text = str(var.Value)
def value_to_etree(el, dtype_name, dtype, node):
var = node.get_data_value().Value
val_el = Et.SubElement(el, 'Value')
_value_to_etree(val_el, dtype_name, dtype, var.Value)
def _value_to_etree(el, dtype_name, dtype, val):
if isinstance(val, (list, tuple)):
list_el = Et.SubElement(el, "uax:ListOf" + dtype_name)
for nval in val:
_value_to_etree(list_el, dtype_name, dtype, nval)
else:
if dtype.Identifier is int and dtype.Identifier > 21: # this is an extentionObject:
_extobj_to_etree(el, dtype_name, dtype)
else:
val_el = Et.SubElement(el, "uax:" + dtype_name)
val_el.text = str(val)
def _extobj_to_etree(val_el, dtype_name, dtype, val):
obj_el = Et.SubElement(val_el, "uax:ExtensionObject")
type_el = Et.SubElement(obj_el, "uax:TypeId")
id_el = Et.SubElement(type_el, "uax:Identifier")
id_el.text = val.TypeId.to_string()
body_el = Et.SubElement(obj_el, "uax:Body")
struct_el = Et.SubElement(body_el, "uax:" + dtype_name)
# FIXME: finish
......@@ -3,12 +3,34 @@ add node defined in XML to address space
format is the one from opc-ua specification
"""
import logging
import sys
from opcua import ua
from opcua.common import xmlparser
def ua_type_to_python(val, uatype):
if uatype.startswith("Int") or uatype.startswith("UInt"):
return int(val)
elif uatype in ("String"):
return val
elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
if sys.version_info.major > 2:
return bytes(val, 'utf8')
else:
return val
else:
raise Exception("uatype nopt handled", uatype, " for val ", val)
def to_python(val, obj, attname):
if isinstance(obj, ua.NodeId) and attname == "Identifier":
return ua.NodeId.from_string(val)
else:
return ua_type_to_python(val, obj.ua_types[attname])
class XmlImporter(object):
def __init__(self, server):
......@@ -119,36 +141,37 @@ class XmlImporter(object):
res = self.server.add_nodes([node])
self._add_refs(obj)
return res[0].AddedNodeId
def _make_ext_obj(sefl, obj):
ext = getattr(ua, obj.objname)()
for name, val in obj.body.items():
if type(val) is str:
raise Exception("Error val should a dict", name, val)
else:
for attname, v in val.items():
if type(v) is str:
setattr(ext, attname, to_python(v, ext, attname))
else:
for attname2, v2 in v.items():
obj2 = getattr(ext, attname)
setattr(obj2, attname2, to_python(v2, obj2, attname2))
return ext
def _add_variable_value(self, obj):
"""
Returns the value for a Variable based on the objects valuetype.
"""
if obj.valuetype == 'ListOfLocalizedText':
return ua.Variant([ua.LocalizedText(txt) for txt in obj.value], None)
elif obj.valuetype == 'EnumValueType':
if obj.valuetype == 'ListOfExtensionObject':
values = []
for ev in obj.value:
enum_value = ua.EnumValueType()
enum_value.DisplayName = ua.LocalizedText(ev['DisplayName'])
enum_value.Description = ua.LocalizedText(ev['Description'])
enum_value.Value = int(ev['Value'])
values.append(enum_value)
for ext in obj.value:
extobj = self._make_ext_obj(ext)
values.append(extobj)
return values
elif obj.valuetype == 'Argument':
values = []
for arg in obj.value:
argument = ua.Argument()
argument.Name = arg['Name']
argument.Description = ua.LocalizedText(arg['Description'])
argument.DataType = self.to_nodeid(arg['DataType'])
argument.ValueRank = int(arg['ValueRank'])
argument.ArrayDimensions = arg['ArrayDimensions']
values.append(argument)
return values
return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
elif obj.valuetype.startswith("ListOf"):
vtype = obj.valuetype[6:]
return [getattr(ua, vtype)(v) for v in obj.value]
else:
return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
def add_variable_type(self, obj):
node = self._get_node(obj)
......
......@@ -8,6 +8,14 @@ import sys
import xml.etree.ElementTree as ET
def _to_bool(val):
if val in ("True", "true", "on", "On", "1"):
return True
else:
return False
class NodeData(object):
def __init__(self):
......@@ -36,8 +44,8 @@ class NodeData(object):
# referencetype
self.inversename = ""
self.abstract = "false"
self.symmetric = "false"
self.abstract = False
self.symmetric = False
# datatype
self.definition = []
......@@ -51,6 +59,15 @@ class RefStruct(object):
self.target = None
class ExtObj(object):
def __init__(self):
self.typeid = None
self.objname = None
self.bodytype = None
self.body = {}
class XMLParser(object):
def __init__(self, xmlpath, server):
......@@ -194,6 +211,7 @@ class XMLParser(object):
obj.nodetype = name
for key, val in child.attrib.items():
self._set_attr(key, val, obj)
self.logger.info("\n Parsing node: %s %s", obj.nodeid, obj.browsename)
obj.displayname = obj.browsename # give a default value to display name
for el in child:
self._parse_tag(el, obj)
......@@ -202,6 +220,7 @@ class XMLParser(object):
def _set_attr(self, key, val, obj):
if key == "NodeId":
obj.nodeid = self._get_node_id(val)
print("PARSING", obj.nodeid)
elif key == "BrowseName":
obj.browsename = val
elif key == "SymbolicName":
......@@ -211,9 +230,11 @@ class XMLParser(object):
elif key == "DataType":
obj.datatype = val
elif key == "IsAbstract":
obj.abstract = val
obj.abstract = _to_bool(val)
elif key == "Executable":
obj.executable = _to_bool(val)
elif key == "EventNotifier":
obj.eventnotifier = 1 if val == "1" else 0
obj.eventnotifier = int(val)
elif key == "ValueRank":
obj.rank = int(val)
elif key == "ArrayDimensions":
......@@ -225,7 +246,7 @@ class XMLParser(object):
elif key == "UserAccessLevel":
obj.useraccesslevel = int(val)
elif key == "Symmetric":
obj.symmetric = True if val == "true" else False
obj.symmetric = _to_bool(val)
else:
self.logger.info("Attribute not implemented: %s:%s", key, val)
......@@ -249,7 +270,9 @@ class XMLParser(object):
self.logger.info("Not implemented tag: %s", el)
def _parse_value(self, el, obj):
self.logger.info("Parsing value")
for val in el:
self.logger.info("tag %s", val.tag)
ntag = self._retag.match(val.tag).groups()[1]
obj.valuetype = ntag
if ntag in ("Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
......@@ -257,10 +280,7 @@ class XMLParser(object):
elif ntag in ("Float", "Double"):
obj.value = float(val.text)
elif ntag in ("Boolean"):
if val.text in ("True", "true", "1", "on", "On"):
obj.value = bool(1)
else:
obj.value = bool(0)
obj.value = _to_bool(val.text)
elif ntag in ("ByteString", "String"):
mytext = val.text
if mytext is None: # support importing null strings
......@@ -271,17 +291,15 @@ class XMLParser(object):
elif ntag in ("Guid"):
self._parse_value(val, obj)
elif ntag == "ListOfExtensionObject":
obj.value, obj.valuetype = self._parse_list_of_extension_object(el)
obj.value = self._parse_list_of_extension_object(el)
elif ntag == "ListOfLocalizedText":
obj.value = self._parse_list_of_localized_text(el)
else:
self.logger.info("Value type not implemented: %s", ntag)
self.logger.info("Value type not implemented: '%s', %s", ntag)
def _get_text(self, el):
txt = ""
for text in el.itertext():
txt += text
return txt
txtlist = [txt.strip() for txt in el.itertext()]
return "".join(txtlist)
def _parse_list_of_localized_text(self, el):
value = []
......@@ -297,31 +315,43 @@ class XMLParser(object):
def _parse_list_of_extension_object(self, el):
'''
Parse a uax:ListOfExtensionObject Value
Return an array with a value of each uax:ExtensionObject/*/* (each element is convert to a netry in a dict.
also the valuetype is returned. The valuetype is uax:ExtensionObject/*/tag()
Return an list of ExtObj
'''
value = []
valuetype = None
for extension_object_list in el:
for extension_object in extension_object_list:
extension_object.find('Body')
for extension_object_part in extension_object:
ntag = self._retag.match(extension_object_part.tag).groups()[1]
if ntag == 'Body':
data = {}
ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
valuetype = ntag
for body_item in extension_object_part.findall('*/*'):
ntag = self._retag.match(body_item.tag).groups()[1]
child = body_item.find('*')
if child is not None:
data[ntag] = self._get_text(child)
else:
data[ntag] = self._get_text(body_item)
value.append(data)
return value, valuetype
ext_obj = self._parse_ext_obj(extension_object)
value.append(ext_obj)
return value
def _parse_ext_obj(self, el):
ext = ExtObj()
for extension_object_part in el:
ntag = self._retag.match(extension_object_part.tag).groups()[1]
if ntag == 'TypeId':
ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
ext.typeid = self._get_text(extension_object_part)
elif ntag == 'Body':
ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
ext.body = self._parse_body(extension_object_part)
print("body", ext.body)
else:
print("Uknoen ndtag", ntag)
print("ext_obj", ext.objname, ext.typeid, ext.body)
return ext
def _parse_body(self, el):
body = {}
for body_item in el:
otag = self._retag.match(body_item.tag).groups()[1]
childs = [i for i in body_item]
if not childs:
val = self._get_text(body_item)
else:
val = self._parse_body(body_item)
if val:
body[otag] = val
return body
def _parse_refs(self, el, obj):
for ref in el:
......
......@@ -60,7 +60,7 @@ class AttributeService(object):
continue
al = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
ual = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
if not ua.test_bit(al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.test_bit(ual.Value.Value, ua.AccessLevel.CurrentWrite):
if not ua.ua_binary.test_bit(al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.ua_binary.test_bit(ual.Value.Value, ua.AccessLevel.CurrentWrite):
res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
continue
res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
......
......@@ -109,12 +109,12 @@ class HistoryDict(HistoryStorageInterface):
return [], cont
else:
if start is None:
start = ua.DateTimeMinValue
start = ua.get_win_epoch()
if end is None:
end = ua.DateTimeMinValue
if start == ua.DateTimeMinValue:
end = ua.get_win_epoch()
if start == ua.get_win_epoch():
results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
elif end == ua.DateTimeMinValue:
elif end == ua.get_win_epoch():
results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
elif start > end:
results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
......@@ -150,12 +150,12 @@ class HistoryDict(HistoryStorageInterface):
return [], cont
else:
if start is None:
start = ua.DateTimeMinValue
start = ua.get_win_epoch()
if end is None:
end = ua.DateTimeMinValue
if start == ua.DateTimeMinValue:
end = ua.get_win_epoch()
if start == ua.get_win_epoch():
results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
elif end == ua.DateTimeMinValue:
elif end == ua.get_win_epoch():
results = [ev for ev in self._events[source_id] if start <= ev.Time]
elif start > end:
results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
......@@ -305,14 +305,14 @@ class HistoryManager(object):
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
dv, cont = self.storage.read_node_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode)
if cont:
cont = ua.pack_datetime(cont)
cont = ua.ua_binary.Primitives.DateTime.pack(cont)
# rv.IndexRange
# rv.DataEncoding # xml or binary, seems spec say we can ignore that one
return dv, cont
......@@ -324,7 +324,7 @@ class HistoryManager(object):
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
evts, cont = self.storage.read_event_history(rv.NodeId,
starttime,
......@@ -337,7 +337,7 @@ class HistoryManager(object):
field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
results.append(field_list)
if cont:
cont = ua.pack_datetime(cont)
cont = ua.ua_binary.Primitives.DateTime.pack(cont)
return results, cont
def update_history(self, params):
......
......@@ -225,7 +225,7 @@ class HistorySQLite(HistoryStorageInterface):
self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
if nb_values:
if len(results) > nb_values: # start > ua.DateTimeMinValue and
if len(results) > nb_values: # start > ua.get_win_epoch() and
cont = cont_timestamps[nb_values]
results = results[:nb_values]
......@@ -258,11 +258,11 @@ class HistorySQLite(HistoryStorageInterface):
def _get_bounds(start, end, nb_values):
order = "ASC"
if start is None or start == ua.DateTimeMinValue:
if start is None or start == ua.get_win_epoch():
order = "DESC"
start = ua.DateTimeMinValue
start = ua.get_win_epoch()
if end is None or end == ua.DateTimeMinValue:
if end is None or end == ua.get_win_epoch():
end = datetime.utcnow() + timedelta(days=1)
if start < end:
......
......@@ -127,7 +127,7 @@ class MonitoredItemService(object):
ev_notify_byte = self.aspace.get_attribute_value(
params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
if ev_notify_byte is None or not ua.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
return result
# result.FilterResult = ua.EventFilterResult() # spec says we can ignore if not error
......
......@@ -6,8 +6,6 @@ It is automatically generated from opcfoundation.org schemas.
from opcua import ua
false = False #FIXME
true = True #FIXME
def create_standard_address_space_Part13(server):
......@@ -20,33 +18,33 @@ def create_standard_address_space_Part13(server):
node.ReferenceTypeId = ua.NodeId.from_string("i=45")
attrs = ua.ObjectTypeAttributes()
attrs.DisplayName = ua.LocalizedText("AggregateConfigurationType")
attrs.IsAbstract = false
attrs.IsAbstract = False
node.NodeAttributes = attrs
server.add_nodes([node])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = true
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=46")
ref.SourceNodeId = ua.NodeId.from_string("i=11187")
ref.TargetNodeClass = ua.NodeClass.DataType
ref.TargetNodeId = ua.NodeId.from_string("i=11188")
refs.append(ref)
ref = ua.AddReferencesItem()
ref.IsForward = true
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=46")
ref.SourceNodeId = ua.NodeId.from_string("i=11187")
ref.TargetNodeClass = ua.NodeClass.DataType
ref.TargetNodeId = ua.NodeId.from_string("i=11189")
refs.append(ref)
ref = ua.AddReferencesItem()
ref.IsForward = true
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=46")
ref.SourceNodeId = ua.NodeId.from_string("i=11187")
ref.TargetNodeClass = ua.NodeClass.DataType
ref.TargetNodeId = ua.NodeId.from_string("i=11190")
refs.append(ref)
ref = ua.AddReferencesItem()
ref.IsForward = true
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=46")
ref.SourceNodeId = ua.NodeId.from_string("i=11187")
ref.TargetNodeClass = ua.NodeClass.DataType
......@@ -69,7 +67,7 @@ def create_standard_address_space_Part13(server):
server.add_nodes([node])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = true
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=37")
ref.SourceNodeId = ua.NodeId.from_string("i=11188")
ref.TargetNodeClass = ua.NodeClass.DataType
......@@ -92,7 +90,7 @@ def create_standard_address_space_Part13(server):
server.add_nodes([node])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = true
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=37")
ref.SourceNodeId = ua.NodeId.from_string("i=11189")
ref.TargetNodeClass = ua.NodeClass.DataType
......@@ -115,7 +113,7 @@ def create_standard_address_space_Part13(server):
server.add_nodes([node])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = true
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=37")
ref.SourceNodeId = ua.NodeId.from_string("i=11190")
ref.TargetNodeClass = ua.NodeClass.DataType
......@@ -138,7 +136,7 @@ def create_standard_address_space_Part13(server):
server.add_nodes([node])
refs = []
ref = ua.AddReferencesItem()
ref.IsForward = true
ref.IsForward = True
ref.ReferenceTypeId = ua.NodeId.from_string("i=37")
ref.SourceNodeId = ua.NodeId.from_string("i=11191")
ref.TargetNodeClass = ua.NodeClass.DataType
......
......@@ -141,7 +141,7 @@ class UaProcessor(object):
elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
self.logger.info("Close session request")
deletesubs = ua.unpack_uatype('Boolean', body)
deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
self.session.close_session(deletesubs)
......
"""
Binary protocol specific functions and constants
"""
import sys
import struct
import logging
from datetime import datetime, timedelta, tzinfo, MAXYEAR
from calendar import timegm
from opcua.common.uaerrors import UaError
if sys.version_info.major > 2:
unicode = str
logger = logging.getLogger('__name__')
EPOCH_AS_FILETIME = 116444736000000000 # January 1, 1970 as MS file time
HUNDREDS_OF_NANOSECONDS = 10000000
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
def test_bit(data, offset):
mask = 1 << offset
return data & mask
def set_bit(data, offset):
mask = 1 << offset
return data | mask
def unset_bit(data, offset):
mask = 1 << offset
return data & ~mask
class UTC(tzinfo):
"""UTC"""
def utcoffset(self, dt):
return timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return timedelta(0)
# method copied from David Buxton <david@gasmark6.com> sample code
def datetime_to_win_epoch(dt):
if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
dt = dt.replace(tzinfo=UTC())
ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
return ft + (dt.microsecond * 10)
def win_epoch_to_datetime(epch):
try:
return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
except OverflowError:
# FILETIMEs after 31 Dec 9999 can't be converted to datetime
logger.warning("datetime overflow: %s", epch)
return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
def build_array_format_py2(prefix, length, fmtchar):
return prefix + str(length) + fmtchar
def build_array_format_py3(prefix, length, fmtchar):
return prefix + str(length) + chr(fmtchar)
if sys.version_info.major < 3:
build_array_format = build_array_format_py2
else:
build_array_format = build_array_format_py3
class _Primitive(object):
def pack_array(self, array):
if array is None:
return b'\xff\xff\xff\xff'
length = len(array)
b = [self.pack(val) for val in array]
b.insert(0, Primitives.Int32.pack(length))
def unpack_array(self, data):
length = Primitives.Int32.unpack(data)
if length == -1:
return None
elif length == 0:
return []
else:
return [self.unpack(data) for _ in range(length)]
class _DateTime(_Primitive):
@staticmethod
def pack(dt):
epch = datetime_to_win_epoch(dt)
return Primitives.Int64.pack(epch)
@staticmethod
def unpack(data):
epch = Primitives.Int64.unpack(data)
return win_epoch_to_datetime(epch)
class _String(_Primitive):
@staticmethod
def pack(string):
if string is None:
return Primitives.Int32.pack(-1)
if isinstance(string, unicode):
string = string.encode('utf-8')
length = len(string)
return Primitives.Int32.pack(length) + string
@staticmethod
def unpack(data):
b = _Bytes.unpack(data)
if sys.version_info.major < 3:
return b
else:
if b is None:
return b
return b.decode("utf-8")
class _Bytes(_Primitive):
@staticmethod
def pack(data):
return _String.pack(data)
@staticmethod
def unpack(data):
length = Primitives.Int32.unpack(data)
if length == -1:
return None
return data.read(length)
class _Null(_Primitive):
@staticmethod
def pack(data):
return b""
@staticmethod
def unpack(data):
return None
class _Primitive1(_Primitive):
def __init__(self, fmt):
self.struct = struct.Struct(fmt)
self.size = self.struct.size
self.format = self.struct.format
def pack(self, data):
return struct.pack(self.format, data)
def unpack(self, data):
return struct.unpack(self.format, data.read(self.size))[0]
#def pack_array(self, array):
#"""
#Basically the same as the method in _Primitive but MAYBE a bit more efficitent....
#"""
#if array is None:
#return b'\xff\xff\xff\xff'
#length = len(array)
#if length == 0:
#return b'\x00\x00\x00\x00'
#if length == 1:
#return b'\x01\x00\x00\x00' + self.pack(array[0])
#return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
class Primitives1(object):
Int8 = _Primitive1("<b")
SByte = Int8
Int16 = _Primitive1("<h")
Int32 = _Primitive1("<i")
Int64 = _Primitive1("<q")
UInt8 = _Primitive1("<B")
Char = UInt8
Byte = UInt8
UInt16 = _Primitive1("<H")
UInt32 = _Primitive1("<I")
UInt64 = _Primitive1("<Q")
Boolean = _Primitive1("<?")
Double = _Primitive1("<d")
Float = _Primitive1("<f")
class Primitives(Primitives1):
Null = _Null()
String = _String()
Bytes = _Bytes()
ByteString = _Bytes()
CharArray = _Bytes()
DateTime = _DateTime()
def pack_uatype_array(vtype, array):
if array is None:
return b'\xff\xff\xff\xff'
length = len(array)
b = [pack_uatype(vtype, val) for val in array]
b.insert(0, Primitives.Int32.pack(length))
return b"".join(b)
def pack_uatype(vtype, value):
if hasattr(Primitives, vtype.name):
return getattr(Primitives, vtype.name).pack(value)
elif vtype.value > 25:
return Primitives.Bytes.pack(value)
elif vtype.name == "ExtensionObject":
# dependency loop: classes in uaprotocol_auto use Variant defined in this file,
# but Variant can contain any object from uaprotocol_auto as ExtensionObject.
# Using local import to avoid import loop
from opcua.ua.uaprotocol_auto import extensionobject_to_binary
return extensionobject_to_binary(value)
else:
try:
return value.to_binary()
except AttributeError:
raise UaError("{} could not be packed with value {}".format(vtype, value))
def unpack_uatype(vtype, data):
if hasattr(Primitives, vtype.name):
st = getattr(Primitives, vtype.name)
return st.unpack(data)
elif vtype.value > 25:
return Primitives.Bytes.unpack(data)
elif vtype.name == "ExtensionObject":
# dependency loop: classes in uaprotocol_auto use Variant defined in this file,
# but Variant can contain any object from uaprotocol_auto as ExtensionObject.
# Using local import to avoid import loop
from opcua.ua.uaprotocol_auto import extensionobject_from_binary
return extensionobject_from_binary(data)
else:
from opcua.ua import uatypes
if hasattr(uatypes, vtype.name):
klass = getattr(uatypes, vtype.name)
return klass.from_binary(data)
else:
raise UaError("can not unpack unknown vtype %s" % vtype)
def unpack_uatype_array(vtype, data):
if hasattr(Primitives, vtype.name):
st = getattr(Primitives, vtype.name)
return st.unpack_array(data)
else:
length = Primitives.Int32.unpack(data)
if length == -1:
return None
else:
return [unpack_uatype(vtype, data) for _ in range(length)]
This diff is collapsed.
......@@ -5,8 +5,8 @@ from datetime import datetime
from opcua.ua import uaprotocol_auto as auto
from opcua.ua import uatypes
from opcua.ua import ua_binary as uabin
from opcua.ua import UaError
from opcua.ua.uatypes import uatype_UInt32
from opcua.common import utils
logger = logging.getLogger('opcua.uaprotocol')
......@@ -27,23 +27,23 @@ class Hello(uatypes.FrozenClass):
def to_binary(self):
b = []
b.append(uatype_UInt32.pack(self.ProtocolVersion))
b.append(uatype_UInt32.pack(self.ReceiveBufferSize))
b.append(uatype_UInt32.pack(self.SendBufferSize))
b.append(uatype_UInt32.pack(self.MaxMessageSize))
b.append(uatype_UInt32.pack(self.MaxChunkCount))
b.append(uatypes.pack_string(self.EndpointUrl))
b.append(uabin.Primitives.UInt32.pack(self.ProtocolVersion))
b.append(uabin.Primitives.UInt32.pack(self.ReceiveBufferSize))
b.append(uabin.Primitives.UInt32.pack(self.SendBufferSize))
b.append(uabin.Primitives.UInt32.pack(self.MaxMessageSize))
b.append(uabin.Primitives.UInt32.pack(self.MaxChunkCount))
b.append(uabin.Primitives.String.pack(self.EndpointUrl))
return b"".join(b)
@staticmethod
def from_binary(data):
hello = Hello()
hello.ProtocolVersion = uatype_UInt32.unpack(data.read(4))[0]
hello.ReceiveBufferSize = uatype_UInt32.unpack(data.read(4))[0]
hello.SendBufferSize = uatype_UInt32.unpack(data.read(4))[0]
hello.MaxMessageSize = uatype_UInt32.unpack(data.read(4))[0]
hello.MaxChunkCount = uatype_UInt32.unpack(data.read(4))[0]
hello.EndpointUrl = uatypes.unpack_string(data)
hello.ProtocolVersion = uabin.Primitives.UInt32.unpack(data)
hello.ReceiveBufferSize = uabin.Primitives.UInt32.unpack(data)
hello.SendBufferSize = uabin.Primitives.UInt32.unpack(data)
hello.MaxMessageSize = uabin.Primitives.UInt32.unpack(data)
hello.MaxChunkCount = uabin.Primitives.UInt32.unpack(data)
hello.EndpointUrl = uabin.Primitives.String.unpack(data)
return hello
......@@ -83,9 +83,9 @@ class Header(uatypes.FrozenClass):
size = self.body_size + 8
if self.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
size += 4
b.append(uatype_UInt32.pack(size))
b.append(uabin.Primitives.UInt32.pack(size))
if self.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
b.append(uatype_UInt32.pack(self.ChannelId))
b.append(uabin.Primitives.UInt32.pack(self.ChannelId))
return b"".join(b)
@staticmethod
......@@ -95,7 +95,7 @@ class Header(uatypes.FrozenClass):
hdr.body_size = hdr.packet_size - 8
if hdr.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
hdr.body_size -= 4
hdr.ChannelId = uatype_UInt32.unpack(data.read(4))[0]
hdr.ChannelId = uabin.Primitives.UInt32.unpack(data)
return hdr
@staticmethod
......@@ -118,14 +118,14 @@ class ErrorMessage(uatypes.FrozenClass):
def to_binary(self):
b = []
b.append(self.Error.to_binary())
b.append(uatypes.pack_string(self.Reason))
b.append(uabin.Primitives.String.pack(self.Reason))
return b"".join(b)
@staticmethod
def from_binary(data):
ack = ErrorMessage()
ack.Error = uatypes.StatusCode.from_binary(data)
ack.Reason = uatypes.unpack_string(data)
ack.Reason = uabin.Primitives.String.unpack(data)
return ack
def __str__(self):
......@@ -170,17 +170,17 @@ class AsymmetricAlgorithmHeader(uatypes.FrozenClass):
def to_binary(self):
b = []
b.append(uatypes.pack_string(self.SecurityPolicyURI))
b.append(uatypes.pack_string(self.SenderCertificate))
b.append(uatypes.pack_string(self.ReceiverCertificateThumbPrint))
b.append(uabin.Primitives.String.pack(self.SecurityPolicyURI))
b.append(uabin.Primitives.String.pack(self.SenderCertificate))
b.append(uabin.Primitives.String.pack(self.ReceiverCertificateThumbPrint))
return b"".join(b)
@staticmethod
def from_binary(data):
hdr = AsymmetricAlgorithmHeader()
hdr.SecurityPolicyURI = uatypes.unpack_string(data)
hdr.SenderCertificate = uatypes.unpack_bytes(data)
hdr.ReceiverCertificateThumbPrint = uatypes.unpack_bytes(data)
hdr.SecurityPolicyURI = uabin.Primitives.String.unpack(data)
hdr.SenderCertificate = uabin.Primitives.Bytes.unpack(data)
hdr.ReceiverCertificateThumbPrint = uabin.Primitives.Bytes.unpack(data)
return hdr
def __str__(self):
......@@ -199,11 +199,11 @@ class SymmetricAlgorithmHeader(uatypes.FrozenClass):
@staticmethod
def from_binary(data):
obj = SymmetricAlgorithmHeader()
obj.TokenId = uatype_UInt32.unpack(data.read(4))[0]
obj.TokenId = uabin.Primitives.UInt32.unpack(data)
return obj
def to_binary(self):
return uatype_UInt32.pack(self.TokenId)
return uabin.Primitives.UInt32.pack(self.TokenId)
@staticmethod
def max_size():
......@@ -224,14 +224,14 @@ class SequenceHeader(uatypes.FrozenClass):
@staticmethod
def from_binary(data):
obj = SequenceHeader()
obj.SequenceNumber = uatype_UInt32.unpack(data.read(4))[0]
obj.RequestId = uatype_UInt32.unpack(data.read(4))[0]
obj.SequenceNumber = uabin.Primitives.UInt32.unpack(data)
obj.RequestId = uabin.Primitives.UInt32.unpack(data)
return obj
def to_binary(self):
b = []
b.append(uatype_UInt32.pack(self.SequenceNumber))
b.append(uatype_UInt32.pack(self.RequestId))
b.append(uabin.Primitives.UInt32.pack(self.SequenceNumber))
b.append(uabin.Primitives.UInt32.pack(self.RequestId))
return b"".join(b)
@staticmethod
......
This diff is collapsed.
......@@ -3,11 +3,32 @@ Generate address space c++ code from xml file specification
xmlparser.py is a requirement. it is in opcua folder but to avoid importing all code, developer can link xmlparser.py in current directory
"""
import sys
import logging
# sys.path.insert(0, "..") # load local freeopcua implementation
#from opcua import xmlparser
import xmlparser
def _to_val(objs, attr, val):
from opcua import ua
print("got ", objs, attr, val)
cls = getattr(ua, objs[0])
for o in objs[1:]:
cls = getattr(ua, cls.ua_types[o])
if cls == ua.NodeId:
return "ua.NodeId.from_string('val')"
return ua_type_to_python(val, cls.ua_types[attr])
def ua_type_to_python(val, uatype):
if uatype in ("String"):
return "'{}'".format(val)
elif uatype in ("Bytes", "Bytes", "ByteString", "ByteArray"):
return "b'{}'".format(val)
else:
return val
class CodeGenerator(object):
def __init__(self, input_path, output_path):
......@@ -21,7 +42,7 @@ class CodeGenerator(object):
sys.stderr.write("Generating Python code {} for XML file {}".format(self.output_path, self.input_path) + "\n")
self.output_file = open(self.output_path, "w")
self.make_header()
self.parser = xmlparser.XMLParser(self.input_path)
self.parser = xmlparser.XMLParser(self.input_path, None)
for node in self.parser:
if node.nodetype == 'UAObject':
self.make_object_code(node)
......@@ -53,8 +74,6 @@ It is automatically generated from opcfoundation.org schemas.
from opcua import ua
false = False #FIXME
true = True #FIXME
def create_standard_address_space_%s(server):
''' % (self.part))
......@@ -110,27 +129,59 @@ def create_standard_address_space_%s(server):
self.writecode(indent, 'server.add_nodes([node])')
self.make_refs_code(obj, indent)
def make_variable_code(self, obj):
indent = " "
self.writecode(indent)
self.make_node_code(obj, indent)
self.writecode(indent, 'attrs = ua.VariableAttributes()')
def make_common_variable_code(self, indent, obj):
if obj.desc:
self.writecode(indent, 'attrs.Description = ua.LocalizedText("{}")'.format(obj.desc))
self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{}")'.format(obj.displayname))
self.writecode(indent, 'attrs.DataType = {}'.format(self.to_data_type(obj.datatype)))
if obj.value:
self.writecode(indent, 'attrs.Value = ua.Variant({}, ua.VariantType.{})'.format(self.to_value(obj.value), obj.valuetype))
if obj.value is not None:
if obj.valuetype == "ListOfExtensionObject":
self.writecode(indent, 'value = []')
for ext in obj.value:
self.make_ext_obj_code(indent, ext)
self.writecode(indent, 'value.append(extobj)')
self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
elif obj.valuetype == "ExtensionObject":
self.make_ext_obj_code(indent, obj.value)
self.writecode(indent, 'value = extobj')
self.writecode(indent, 'attrs.Value = ua.Variant(value, ua.VariantType.ExtensionObject)')
else:
if obj.valuetype.startswith("ListOf"):
obj.valuetype = obj.valuetype[6:]
self.writecode(indent, 'attrs.Value = ua.Variant({}, ua.VariantType.{})'.format(self.to_value(obj.value), obj.valuetype))
if obj.rank:
self.writecode(indent, 'attrs.ValueRank = {}'.format(obj.rank))
if obj.accesslevel:
self.writecode(indent, 'attrs.AccessLevel = {}'.format(obj.accesslevel))
if obj.useraccesslevel:
self.writecode(indent, 'attrs.UserAccessLevel = {}'.format(obj.useraccesslevel))
if obj.dimensions:
self.writecode(indent, 'attrs.ArrayDimensions = {}'.format(obj.dimensions))
def make_ext_obj_code(self, indent, extobj):
print("makeing code for ", extobj.objname)
self.writecode(indent, 'extobj = ua.{}()'.format(extobj.objname))
for name, val in extobj.body.items():
for k, v in val.items():
if type(v) is str:
val = _to_val([extobj.objname], k, v)
self.writecode(indent, 'extobj.{} = {}'.format(k, val))
else:
if k == "DataType": #hack for strange nodeid xml format
self.writecode(indent, 'extobj.{} = ua.NodeId.from_string("{}")'.format(k, v["Identifier"]))
continue
for k2, v2 in v.items():
val2 = _to_val([extobj.objname, k], k2, v2)
self.writecode(indent, 'extobj.{}.{} = {}'.format(k, k2, val2))
def make_variable_code(self, obj):
indent = " "
self.writecode(indent)
self.make_node_code(obj, indent)
self.writecode(indent, 'attrs = ua.VariableAttributes()')
if obj.minsample:
self.writecode(indent, 'attrs.MinimumSamplingInterval = {}'.format(obj.minsample))
if obj.dimensions:
self.writecode(indent, 'attrs.ArrayDimensions = '.format(obj.dimensions))
self.make_common_variable_code(indent, obj)
self.writecode(indent, 'node.NodeAttributes = attrs')
self.writecode(indent, 'server.add_nodes([node])')
self.make_refs_code(obj, indent)
......@@ -143,18 +194,9 @@ def create_standard_address_space_%s(server):
if obj.desc:
self.writecode(indent, 'attrs.Description = ua.LocalizedText("{}")'.format(obj.desc))
self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{}")'.format(obj.displayname))
self.writecode(indent, 'attrs.DataType = {}'.format(self.to_data_type(obj.datatype)))
if obj.value:
self.writecode(indent, 'attrs.Value = {}'.format(self.to_value(obj.value)))
if obj.value:
self.writecode(indent, 'attrs.Value = {}'.format(self.to_value(obj.value)))
if obj.rank:
self.writecode(indent, 'attrs.ValueRank = {}'.format(obj.rank))
if obj.abstract:
self.writecode(indent, 'attrs.IsAbstract = {}'.format(obj.abstract))
print(obj.dimensions)
if obj.dimensions:
self.writecode(indent, 'attrs.ArrayDimensions = {}'.format(obj.dimensions))
self.make_common_variable_code(indent, obj)
self.writecode(indent, 'node.NodeAttributes = attrs')
self.writecode(indent, 'server.add_nodes([node])')
self.make_refs_code(obj, indent)
......@@ -174,14 +216,6 @@ def create_standard_address_space_%s(server):
if obj.desc:
self.writecode(indent, 'attrs.Description = ua.LocalizedText("{}")'.format(obj.desc))
self.writecode(indent, 'attrs.DisplayName = ua.LocalizedText("{}")'.format(obj.displayname))
if obj.accesslevel:
self.writecode(indent, 'attrs.AccessLevel = {}'.format(obj.accesslevel))
if obj.useraccesslevel:
self.writecode(indent, 'attrs.UserAccessLevel = {}'.format(obj.useraccesslevel))
if obj.minsample:
self.writecode(indent, 'attrs.MinimumSamplingInterval = {}'.format(obj.minsample))
if obj.dimensions:
self.writecode(indent, 'attrs.ArrayDimensions = {}'.format(obj.dimensions))
self.writecode(indent, 'node.NodeAttributes = attrs')
self.writecode(indent, 'server.add_nodes([node])')
self.make_refs_code(obj, indent)
......@@ -224,7 +258,7 @@ def create_standard_address_space_%s(server):
self.writecode(indent, "refs = []")
for ref in obj.refs:
self.writecode(indent, 'ref = ua.AddReferencesItem()')
self.writecode(indent, 'ref.IsForward = true')
self.writecode(indent, 'ref.IsForward = True')
self.writecode(indent, 'ref.ReferenceTypeId = {}'.format(self.to_ref_type(ref.reftype)))
self.writecode(indent, 'ref.SourceNodeId = ua.NodeId.from_string("{}")'.format(obj.nodeid))
self.writecode(indent, 'ref.TargetNodeClass = ua.NodeClass.DataType')
......@@ -245,6 +279,7 @@ def save_aspace_to_disk():
aspace.dump(path)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
for i in (3, 4, 5, 8, 9, 10, 11, 13):
xmlpath = "Opc.Ua.NodeSet2.Part{}.xml".format(str(i))
cpppath = "../opcua/server/standard_address_space/standard_address_space_part{}.py".format(str(i))
......
This diff is collapsed.
......@@ -8,7 +8,7 @@ def extensionobject_from_binary(data):
Encoding = ord(data.read(1))
body = None
if Encoding & (1 << 0):
length = uatype_Int32.unpack(data.read(4))[0]
length = uabin.Primitives.Int32.unpack(data)
if length < 1:
body = Buffer(b"")
else:
......@@ -46,7 +46,7 @@ def extensionobject_to_binary(obj):
Body = obj.to_binary()
packet = []
packet.append(TypeId.to_binary())
packet.append(uatype_UInt8.pack(Encoding))
packet.append(uabin.Primitives.UInt8.pack(Encoding))
if Body:
packet.append(pack_bytes(Body))
packet.append(uabin.Primitives.Bytes.pack(Body))
return b''.join(packet)
This diff is collapsed.
......@@ -624,14 +624,14 @@ class CommonTests(object):
# create enum type
enums = self.opc.get_root_node().get_child(["0:Types", "0:DataTypes", "0:BaseDataType", "0:Enumeration"])
myenum_type = enums.add_data_type(0, "MyEnum")
es = myenum_type.add_variable(0, "EnumStrings", ["String0", "String1", "String2"], ua.VariantType.LocalizedText)
es = myenum_type.add_variable(0, "EnumStrings", [ua.LocalizedText("String0"), ua.LocalizedText("String1"), ua.LocalizedText("String2")], ua.VariantType.LocalizedText)
#es.set_value_rank(1)
# instantiate
o = self.opc.get_objects_node()
myvar = o.add_variable(2, "MyEnumVar", "String1", ua.VariantType.LocalizedText, datatype=myenum_type.nodeid)
myvar = o.add_variable(2, "MyEnumVar", ua.LocalizedText("String1"), datatype=myenum_type.nodeid)
#myvar.set_writable(True)
# tests
self.assertEqual(myvar.get_data_type(), myenum_type.nodeid)
myvar.set_value("String2", ua.VariantType.LocalizedText)
myvar.set_value(ua.LocalizedText("String2"))
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment