Commit 5d841b35 authored by Georg Schölly's avatar Georg Schölly

improve handling of enums

 * replace int_to_* functions with a smarter enum class
 * consolidate WriteAccessMask enum and WriteAccess enum
parent dc88c9fa
......@@ -106,8 +106,8 @@ def create_variable_type(parent, nodeid, bname, datatype):
attrs.WriteMask = 0
attrs.UserWriteMask = 0
attrs.Historizing = 0
attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
addnode.NodeAttributes = attrs
results = parent.server.add_nodes([addnode])
results[0].StatusCode.check()
......@@ -131,8 +131,8 @@ def create_reference_type(parent, nodeid, bname):
attrs.IsAbstract = False
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
addnode.NodeAttributes = attrs
results = parent.server.add_nodes([addnode])
results[0].StatusCode.check()
......@@ -246,8 +246,8 @@ def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, is
attrs.WriteMask = 0
attrs.UserWriteMask = 0
attrs.Historizing = 0
attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
addnode.NodeAttributes = attrs
results = server.add_nodes([addnode])
results[0].StatusCode.check()
......@@ -276,8 +276,8 @@ def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=N
attrs.WriteMask = 0
attrs.UserWriteMask = 0
attrs.Historizing = 0
attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask
attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask
addnode.NodeAttributes = attrs
results = server.add_nodes([addnode])
results[0].StatusCode.check()
......
......@@ -79,34 +79,33 @@ class Node(object):
def get_access_level(self):
"""
get access level of node as a list of AccessLevel Enum
Get the access level attribute of the node as a set of AccessLevel enum values.
"""
result = self.get_attribute(ua.AttributeIds.AccessLevel)
return ua.int_to_AccessLevel(result.Value.Value)
return ua.AccessLevel.parse_bitfield(result.Value.Value)
def get_user_access_level(self):
"""
get user access level of node as a list of AccessLevel Enum
Get the user access level attribute of the node as a set of AccessLevel enum values.
"""
result = self.get_attribute(ua.AttributeIds.UserAccessLevel)
return ua.int_to_AccessLevel(result.Value.Value)
return ua.AccessLevel.parse_bitfield(result.Value.Value)
def get_event_notifier(self):
"""
get EventNotifier attribute value as a list of EventNotifier Enum
Get the event notifier attribute of the node as a set of EventNotifier enum values.
"""
result = self.get_attribute(ua.AttributeIds.EventNotifier)
return ua.int_to_EventNotifier(result.Value.Value)
return ua.EventNotifier.parse_bitfield(result.Value.Value)
def set_event_notifier(self, enum_list):
def set_event_notifier(self, values):
"""
set event notifier attribute,
arg is a list of EventNotifier Enum
Set the event notifier attribute.
:param values: an iterable of EventNotifier enum values.
"""
res = 0
for en in enum_list:
res = ua.set_bit(res, en.value)
self.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(res, ua.VariantType.Byte)))
event_notifier_bitfield = ua.EventNotifier.to_bitfield(values)
self.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(event_notifier_bitfield, ua.VariantType.Byte)))
def get_node_class(self):
"""
......
......@@ -2,7 +2,7 @@
implement ua datatypes
"""
import logging
from enum import Enum, IntEnum
from enum import Enum, IntEnum, EnumMeta
from datetime import datetime, timedelta, tzinfo, MAXYEAR
from calendar import timegm
import sys
......@@ -10,6 +10,7 @@ import os
import uuid
import struct
import re
import itertools
if sys.version_info.major > 2:
unicode = str
......@@ -302,63 +303,98 @@ class ValueRank(IntEnum):
ThreeDimensions = 3
FourDimensions = 4
class _MaskEnum(IntEnum):
class AccessLevel(IntEnum):
"""
"""
CurrentRead = 0
CurrentWrite = 1
HistoryRead = 2
HistoryWrite = 3
SemanticChange = 4
@classmethod
def parse_bitfield(cls, the_int):
""" Take an integer and interpret it as a set of enum values. """
assert isinstance(the_int, int)
return {cls(b) for b in cls._bits(the_int)}
class AccessLevelMask(IntEnum):
"""
Mask for access level
@classmethod
def to_bitfield(cls, collection):
""" Takes some enum values and creates an integer from them. """
# make sure all elements are of the correct type (use itertools.tee in case we get passed an
# iterator)
iter1, iter2 = itertools.tee(iter(collection))
assert all(isinstance(x, cls) for x in iter1)
return sum(x.mask for x in iter2)
@property
def mask(self):
return 1 << self.value
@staticmethod
def _bits(n):
""" Iterate over the bits in n.
e.g. bits(44) yields at 2, 3, 5
"""
assert n >= 0 # avoid infinite recursion
pos = 0
while n:
if n & 0x1:
yield pos
n = n // 2
pos += 1
class AccessLevel(_MaskEnum):
"""
CurrentRead = 1 << AccessLevel.CurrentRead
CurrentWrite = 1 << AccessLevel.CurrentWrite
HistoryRead = 1 << AccessLevel.HistoryRead
HistoryWrite = 1 << AccessLevel.HistoryWrite
SemanticChange = 1 << AccessLevel.SemanticChange
Bit index to indicate what the access level is.
Spec Part 3, appears multiple times, e.g. paragraph 5.6.2 Variable NodeClass
"""
CurrentRead = 0
CurrentWrite = 1
HistoryRead = 2
HistoryWrite = 3
SemanticChange = 4
StatusWrite = 5
TimestampWrite = 6
class WriteMask(IntEnum):
class WriteMask(_MaskEnum):
"""
Mask to indicate which attribute of a node is writable
Rmq: This is not a mask but bit index....
Bit index to indicate which attribute of a node is writable
Spec Part 3, Paragraph 5.2.7 WriteMask
"""
AccessLevel = 0
ArrayDimensions = 1
BrowseName = 2
ContainsNoLoops = 3
DataType = 4
Description = 5
DisplayName = 6
EventNotifier = 7
Executable = 8
Historizing = 9
InverseName = 10
IsAbstract = 11
AccessLevel = 0
ArrayDimensions = 1
BrowseName = 2
ContainsNoLoops = 3
DataType = 4
Description = 5
DisplayName = 6
EventNotifier = 7
Executable = 8
Historizing = 9
InverseName = 10
IsAbstract = 11
MinimumSamplingInterval = 12
NodeClass = 13
NodeId = 14
Symmetric = 15
UserAccessLevel = 16
UserExecutable = 17
UserWriteMask = 18
ValueRank = 19
WriteMask = 20
ValueForVariableType = 21
class EventNotifier(IntEnum):
NodeClass = 13
NodeId = 14
Symmetric = 15
UserAccessLevel = 16
UserExecutable = 17
UserWriteMask = 18
ValueRank = 19
WriteMask = 20
ValueForVariableType = 21
class EventNotifier(_MaskEnum):
"""
Bit index to indicate how a node can be used for events.
Spec Part 3, appears multiple times, e.g. Paragraph 5.4 View NodeClass
"""
SubscribeToEvents = 0
HistoryRead = 2
HistoryWrite = 3
# Reserved = 1
HistoryRead = 2
HistoryWrite = 3
class Guid(FrozenClass):
......@@ -1239,36 +1275,3 @@ def DataType_to_VariantType(int_type):
return VariantType(int_type)
else:
return VariantTypeCustom(int_type)
def int_to_AccessLevel(level):
"""
take an int and return a list of AccessLevel Enum
"""
res = []
for val in AccessLevel:
if test_bit(level, val.value):
res.append(val)
return res
def int_to_WriteMask(level):
"""
take an int and return a list of WriteMask Enum
"""
res = []
for val in WriteMask:
if test_bit(level, val.value):
res.append(val)
return res
def int_to_EventNotifier(level):
"""
take an int and return a list of EventNotifier Enum
"""
res = []
for val in EventNotifier:
if test_bit(level, val.value):
res.append(val)
return res
......@@ -409,7 +409,12 @@ def check_eventgenerator_SourceServer(test, evgen):
server = test.opc.get_server_node()
test.assertEqual(evgen.event.SourceName, server.get_browse_name().Name)
test.assertEqual(evgen.event.SourceNode, ua.NodeId(ua.ObjectIds.Server))
test.assertEqual(server.get_attribute(ua.AttributeIds.EventNotifier).Value, ua.Variant(5, ua.VariantType.Byte))
test.assertEqual(
server.get_event_notifier(),
{ua.EventNotifier.SubscribeToEvents, ua.EventNotifier.HistoryRead}
)
refs = server.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward, ua.NodeClass.ObjectType, False)
test.assertGreaterEqual(len(refs), 1)
......@@ -417,7 +422,12 @@ def check_eventgenerator_SourceServer(test, evgen):
def check_event_generator_object(test, evgen, obj):
test.assertEqual(evgen.event.SourceName, obj.get_browse_name().Name)
test.assertEqual(evgen.event.SourceNode, obj.nodeid)
test.assertEqual(obj.get_attribute(ua.AttributeIds.EventNotifier).Value, ua.Variant(5, ua.VariantType.Byte))
test.assertEqual(
obj.get_event_notifier(),
{ua.EventNotifier.SubscribeToEvents, ua.EventNotifier.HistoryRead}
)
refs = obj.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward, ua.NodeClass.ObjectType, False)
test.assertEqual(len(refs), 1)
test.assertEqual(refs[0].nodeid, evgen.event.EventType)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment