Commit 777e812f authored by Andrew's avatar Andrew Committed by ORD

Guid ua binary primitive (#338)

* Guid as a primitive

UA Guids based on python UUID type

* Guid primitive fixes, import datetime

* Fix Guid NodeId, typos, tests

* Add guid test, python 2.7 fixes

* Guid fixes for modeler

* Pep fixes for edited files

* Fix pep refactor

* Remove string check for datime/guid

this function assumes correct value is supplied, update doc string
parent 5318bda8
......@@ -75,7 +75,7 @@ class Node(object):
may not be convertible to VariantType
"""
result = self.get_attribute(ua.AttributeIds.DataType)
return ua.DataType_to_VariantType(result.Value.Value)
return ua.datatype_to_varianttype(result.Value.Value)
def get_access_level(self):
"""
......
......@@ -5,6 +5,7 @@ Usefull method and classes not belonging anywhere and depending on opcua library
from dateutil import parser
from datetime import datetime
from enum import Enum, IntEnum
import uuid
from opcua import ua
from opcua.ua.uaerrors import UaError
......@@ -57,7 +58,8 @@ def variant_to_string(var):
def string_to_val(string, vtype):
"""
Convert back a string to a python or python-opcua object
Convert back a string to a python or python-opcua object
Note: no error checking is done here, supplying null strings could raise exceptions (datetime and guid)
"""
string = string.strip()
if string.startswith("["):
......@@ -79,7 +81,7 @@ def string_to_val(string, vtype):
val = float(string)
elif vtype in (ua.VariantType.String, ua.VariantType.XmlElement):
val = string
elif vtype in (ua.VariantType.SByte, ua.VariantType.Guid, ua.VariantType.ByteString):
elif vtype in (ua.VariantType.SByte, ua.VariantType.ByteString):
val = bytes(string)
elif vtype in (ua.VariantType.NodeId, ua.VariantType.ExpandedNodeId):
val = ua.NodeId.from_string(string)
......@@ -91,6 +93,8 @@ def string_to_val(string, vtype):
val = ua.LocalizedText(string)
elif vtype == ua.VariantType.StatusCode:
val = ua.StatusCode(string)
elif vtype == ua.VariantType.Guid:
val = uuid.UUID(string)
else:
# FIXME: Some types are probably missing!
raise NotImplementedError
......@@ -128,7 +132,6 @@ def get_node_subtypes(node, nodes=None):
def get_node_supertypes(node, includeitself=False, skipbase=True):
"""
return get all subtype parents of node recursive
:param server: used in case node is nodeid
:param node: can be a ua.Node or ua.NodeId
:param includeitself: include also node to the list
:param skipbase don't include the toplevel one
......
......@@ -29,7 +29,7 @@ class XmlExporter(object):
def build_etree(self, node_list, uris=None):
"""
Create an XML etree object from a list of nodes; custom namespace uris are optional
Namespaces used by nodes are allways exported for consistency.
Namespaces used by nodes are always exported for consistency.
Args:
node_list: list of Node objects for export
uris: list of namespace uri strings
......@@ -89,7 +89,7 @@ class XmlExporter(object):
"""
# try to write the XML etree to a file
self.logger.info('Exporting XML file to %s', xmlpath)
#from IPython import embed
# from IPython import embed
# embed()
if pretty:
indent(self.etree.getroot())
......@@ -328,6 +328,9 @@ def _val_to_etree(el, dtype, val):
if dtype == ua.NodeId(ua.ObjectIds.NodeId):
id_el = Et.SubElement(el, "uax:Identifier")
id_el.text = val.to_string()
elif dtype == ua.NodeId(ua.ObjectIds.Guid):
id_el = Et.SubElement(el, "uax:String")
id_el.text = str(val)
elif not hasattr(val, "ua_types"):
if type(val) is bytes:
el.text = val.decode("utf-8")
......@@ -376,11 +379,11 @@ def _extobj_to_etree(val_el, name, dtype, val):
def indent(elem, level=0):
'''
"""
copy and paste from http://effbot.org/zone/element-lib.htm#prettyprint
it basically walks your tree and adds spaces and newlines so the tree is
printed in a nice way
'''
"""
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
......
......@@ -5,6 +5,8 @@ format is the one from opc-ua specification
import logging
import sys
import re
import uuid
import dateutil.parser
from opcua import ua
from opcua.common import xmlparser
......@@ -75,8 +77,7 @@ class XmlImporter(object):
self.namespaces = self._map_namespaces(self.parser.get_used_namespaces(), act_server)
self.aliases = self._map_aliases(self.parser.get_aliases())
# The ordering of nodes currently only works if namespaces are
# defined in XML.
# The ordering of nodes currently only works if namespaces are defined in XML.
# Also, it is recommended not to use node ids without namespace prefix!
nodes_parsed = self._sort_nodes_by_parentid(self.parser)
......@@ -233,7 +234,7 @@ class XmlImporter(object):
if type(v) is str:
setattr(ext, attname, to_python(v, ext, attname))
else:
# so we habve either an object or a list...
# so we have either an object or a list...
obj2 = getattr(ext, attname)
if isinstance(obj2, ua.NodeId):
for attname2, v2 in v:
......@@ -255,7 +256,7 @@ class XmlImporter(object):
def _add_variable_value(self, obj):
"""
Returns the value for a Variable based on the objects valuetype.
Returns the value for a Variable based on the objects value type.
"""
if obj.valuetype == 'ListOfExtensionObject':
values = []
......@@ -269,6 +270,10 @@ class XmlImporter(object):
elif obj.valuetype == 'ExtensionObject':
extobj = self._make_ext_obj(obj.value)
return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
elif obj.valuetype == 'DateTime':
return ua.Variant(dateutil.parser.parse(obj.value), getattr(ua.VariantType, obj.valuetype))
elif obj.valuetype == 'Guid':
return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
else:
return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
......@@ -358,8 +363,8 @@ class XmlImporter(object):
# FIX: wrong order of node sorting .. need to find out what is wrong
def _sort_nodes_by_parentid(self, nodes):
"""
Sort the list of nodes according theire parent node in order to respect
the depency between nodes.
Sort the list of nodes according their parent node in order to respect
the dependency between nodes.
:param nodes: list of NodeDataObjects
:returns: list of sorted nodes
......@@ -385,13 +390,11 @@ class XmlImporter(object):
# Insert nodes that
# (1) have no parent / parent_ns is None (e.g. namespace 0)
# (2) ns is not in list of relevant namespaces
if (parent_ns is None or
(len(relevant_namespaces) >= 1 and node_ns not in relevant_namespaces) or
parent_id is None):
if (parent_ns is None or (len(relevant_namespaces) >= 1 and node_ns not in relevant_namespaces) or
parent_id is None):
insert = 0
else:
# Check if the nodes parent is already in the list of
# inserted nodes
# Check if the nodes parent is already in the list of inserted nodes
if node.parent in sorted_nodes_ids:
insert = -1
if insert == 0:
......
......@@ -15,7 +15,6 @@ def _to_bool(val):
return False
class NodeData(object):
def __init__(self):
......@@ -211,8 +210,11 @@ class XMLParser(object):
mytext = mytext.replace('\n', '').replace('\r', '')
# obj.value.append('b"{}"'.format(mytext))
obj.value = mytext
elif ntag in ("DateTime"):
obj.value = val.text
elif ntag in ("Guid"):
self._parse_value(val, obj)
obj.valuetype = obj.datatype # override parsed string type to guid
elif ntag == "ListOfExtensionObject":
obj.value = self._parse_list_of_extension_object(el)
elif ntag == "ListOfLocalizedText":
......@@ -238,10 +240,10 @@ class XMLParser(object):
return value
def _parse_list_of_extension_object(self, el):
'''
"""
Parse a uax:ListOfExtensionObject Value
Return an list of ExtObj
'''
"""
value = []
for extension_object_list in el:
for extension_object in extension_object_list:
......
......@@ -7,6 +7,7 @@ import struct
import logging
from datetime import datetime, timedelta, tzinfo, MAXYEAR
from calendar import timegm
import uuid
from opcua.ua.uaerrors import UaError
......@@ -37,8 +38,9 @@ def unset_bit(data, offset):
class UTC(tzinfo):
"""UTC"""
"""
UTC
"""
def utcoffset(self, dt):
return timedelta(0)
......@@ -160,6 +162,36 @@ class _Null(_Primitive):
return None
class _Guid(_Primitive):
@staticmethod
def pack(guid):
# convert python UUID 6 field format to OPC UA 4 field format
f1 = Primitives.UInt32.pack(guid.time_low)
f2 = Primitives.UInt16.pack(guid.time_mid)
f3 = Primitives.UInt16.pack(guid.time_hi_version)
f4a = Primitives.Byte.pack(guid.clock_seq_hi_variant)
f4b = Primitives.Byte.pack(guid.clock_seq_low)
f4c = struct.pack('>Q', guid.node)[2:8] # no primitive .pack available for 6 byte int
f4 = f4a+f4b+f4c
# concat byte fields
b = f1+f2+f3+f4
return b
@staticmethod
def unpack(data):
# convert OPC UA 4 field format to python UUID bytes
f1 = struct.pack('>I', Primitives.UInt32.unpack(data))
f2 = struct.pack('>H', Primitives.UInt16.unpack(data))
f3 = struct.pack('>H', Primitives.UInt16.unpack(data))
f4 = data.read(8)
# concat byte fields
b = f1 + f2 + f3 + f4
return uuid.UUID(bytes=b)
class _Primitive1(_Primitive):
def __init__(self, fmt):
self.struct = struct.Struct(fmt)
......@@ -174,7 +206,7 @@ class _Primitive1(_Primitive):
#def pack_array(self, array):
#"""
#Basically the same as the method in _Primitive but MAYBE a bit more efficitent....
#Basically the same as the method in _Primitive but MAYBE a bit more efficient....
#"""
#if array is None:
#return b'\xff\xff\xff\xff'
......@@ -186,7 +218,6 @@ class _Primitive1(_Primitive):
#return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
class Primitives1(object):
Int8 = _Primitive1("<b")
SByte = Int8
......@@ -211,6 +242,7 @@ class Primitives(Primitives1):
ByteString = _Bytes()
CharArray = _Bytes()
DateTime = _DateTime()
Guid = _Guid()
def pack_uatype_array(vtype, array):
......
......@@ -161,30 +161,7 @@ class EventNotifier(_MaskEnum):
HistoryWrite = 3
class Guid(FrozenClass):
def __init__(self):
self.uuid = uuid.uuid4()
self._freeze = True
def to_binary(self):
return self.uuid.bytes
def __hash__(self):
return hash(self.uuid.bytes)
@staticmethod
def from_binary(data):
g = Guid()
g.uuid = uuid.UUID(bytes=data.read(16))
return g
def __eq__(self, other):
return isinstance(other, Guid) and self.uuid == other.uuid
class StatusCode(FrozenClass):
"""
:ivar value:
:vartype value: int
......@@ -252,7 +229,6 @@ class NodeIdType(Enum):
class NodeId(FrozenClass):
"""
NodeId Object
......@@ -410,6 +386,9 @@ class NodeId(FrozenClass):
elif self.NodeIdType == NodeIdType.ByteString:
return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
uabin.Primitives.Bytes.pack(self.Identifier)
elif self.NodeIdType == NodeIdType.Guid:
return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
uabin.Primitives.Guid.pack(self.Identifier)
else:
return struct.pack("<BH", self.NodeIdType.value, self.NamespaceIndex) + \
self.Identifier.to_binary()
......@@ -435,7 +414,7 @@ class NodeId(FrozenClass):
nid.Identifier = uabin.Primitives.Bytes.unpack(data)
elif nid.NodeIdType == NodeIdType.Guid:
nid.NamespaceIndex = uabin.Primitives.UInt16.unpack(data)
nid.Identifier = Guid.from_binary(data)
nid.Identifier = uabin.Primitives.Guid.unpack(data)
else:
raise UaError("Unknown NodeId encoding: " + str(nid.NodeIdType))
......@@ -487,10 +466,9 @@ ExpandedNodeId = NodeId
class QualifiedName(FrozenClass):
'''
"""
A string qualified with a namespace index.
'''
"""
def __init__(self, name=None, namespaceidx=0):
if not isinstance(namespaceidx, int):
......@@ -549,10 +527,9 @@ class QualifiedName(FrozenClass):
class LocalizedText(FrozenClass):
'''
"""
A string qualified with a namespace index.
'''
"""
ua_types = {
"Text": "ByteString",
......@@ -614,18 +591,14 @@ class LocalizedText(FrozenClass):
class ExtensionObject(FrozenClass):
'''
"""
Any UA object packed as an ExtensionObject
:ivar TypeId:
:vartype TypeId: NodeId
:ivar Body:
:vartype Body: bytes
'''
"""
def __init__(self):
self.TypeId = NodeId()
......@@ -668,8 +641,7 @@ class ExtensionObject(FrozenClass):
class VariantType(Enum):
'''
"""
The possible types of a variant.
:ivar Null:
......@@ -698,10 +670,8 @@ class VariantType(Enum):
:ivar DataValue:
:ivar Variant:
:ivar DiagnosticInfo:
"""
'''
Null = 0
Boolean = 1
SByte = 2
......@@ -753,7 +723,6 @@ class VariantTypeCustom(object):
class Variant(FrozenClass):
"""
Create an OPC-UA Variant object.
if no argument a Null Variant is created.
......@@ -810,6 +779,8 @@ class Variant(FrozenClass):
return VariantType.ByteString
elif isinstance(val, datetime):
return VariantType.DateTime
elif isinstance(val, uuid.UUID):
return VariantType.Guid
else:
if isinstance(val, object):
try:
......@@ -845,7 +816,7 @@ class Variant(FrozenClass):
dimensions = None
encoding = ord(data.read(1))
int_type = encoding & 0b00111111
vtype = DataType_to_VariantType(int_type)
vtype = datatype_to_varianttype(int_type)
if vtype == VariantType.Null:
return Variant(None, vtype, encoding)
if uabin.test_bit(encoding, 7):
......@@ -910,9 +881,9 @@ def get_shape(mylist):
class XmlElement(FrozenClass):
'''
"""
An XML element encoded as an UTF-8 string.
'''
"""
def __init__(self, binary=None):
if binary is not None:
......@@ -939,8 +910,7 @@ class XmlElement(FrozenClass):
class DataValue(FrozenClass):
'''
"""
A value with an associated timestamp, and quality.
Automatically generated from xml , copied and modified here to fix errors in xml spec
......@@ -956,8 +926,7 @@ class DataValue(FrozenClass):
:vartype ServerTimestamp: datetime
:ivar ServerPicoseconds:
:vartype ServerPicoseconds: int
'''
"""
def __init__(self, variant=None, status=None):
self.Encoding = 0
......@@ -1044,11 +1013,11 @@ class DataValue(FrozenClass):
__repr__ = __str__
def DataType_to_VariantType(int_type):
def datatype_to_varianttype(int_type):
"""
Takes a NodeId or int and return a VariantType
This is only supported if int_type < 63 due to VariantType encoding
At low level we do not have access to address space thus decodig is limited
At low level we do not have access to address space thus decoding is limited
a better version of this method can be find in ua_utils.py
"""
if isinstance(int_type, NodeId):
......
......@@ -5,6 +5,7 @@ import io
from datetime import datetime
import unittest
from collections import namedtuple
import uuid
from opcua import ua
from opcua.ua import extensionobject_from_binary
......@@ -128,8 +129,10 @@ class TestUnit(unittest.TestCase):
self.assertEqual(v, v2)
def test_guid(self):
g = ua.Guid()
sc = ua.StatusCode()
v = ua.Variant(uuid.uuid4(), ua.VariantType.Guid)
v2 = ua.Variant.from_binary(ua.utils.Buffer(v.to_binary()))
self.assertEqual(v.VariantType, v2.VariantType)
self.assertEqual(v, v2)
def test_nodeid(self):
nid = ua.NodeId()
......@@ -150,8 +153,8 @@ class TestUnit(unittest.TestCase):
s = ua.StringNodeId(53, 0) # should we raise an exception???
s1 = ua.StringNodeId("53", 0)
bs = ua.ByteStringNodeId(b"53", 0)
gid = ua.Guid()
g = ua.ByteStringNodeId(gid, 0)
gid = uuid.uuid4()
g = ua.ByteStringNodeId(str(gid), 0)
guid = ua.GuidNodeId(gid)
self.assertEqual(tb, fb)
self.assertEqual(tb, n)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment