Commit deca8fa6 authored by ORD's avatar ORD

Merge pull request #95 from FreeOpcUa/dev

Use IntEnum where possible instead of simple class
parents e6a6db32 555c2d5a
#AUTOGENERATED!!!
from enum import IntEnum
class AttributeIds:
class AttributeIds(IntEnum):
NodeId = 1
NodeClass = 2
BrowseName = 3
......
......@@ -289,6 +289,16 @@ class BinaryClient(object):
data = self._uasocket.send_request(request)
response = ua.ReadResponse.from_binary(data)
response.ResponseHeader.ServiceResult.check()
# cast to Enum attributes that need to
for idx, rv in enumerate(parameters.NodesToRead):
if rv.AttributeId == ua.AttributeIds.NodeClass:
dv = response.Results[idx]
if dv.StatusCode.is_good():
dv.Value.Value = ua.NodeClass(dv.Value.Value)
elif rv.AttributeId == ua.AttributeIds.ValueRank:
dv = response.Results[idx]
if dv.StatusCode.is_good() and dv.Value.Value in (-3, -2, -1, 0, 1, 2, 3, 4):
dv.Value.Value = ua.ValueRank(dv.Value.Value)
return response.Results
def write(self, params):
......
......@@ -169,7 +169,8 @@ class MonitoredItemService(object):
for sattr in evfilter.SelectClauses:
try:
if not sattr.BrowsePath:
val = getattr(event, ua.AttributeIdsInv[sattr.Attribute])
#val = getattr(event, ua.AttributeIdsInv[sattr.Attribute])
val = getattr(event, sattr.Attribute.name)
val = copy.deepcopy(val)
fields.append(ua.Variant(val))
else:
......
......@@ -449,10 +449,10 @@ def _create_variable(server, parentnodeid, nodeid, qname, val, isproperty=False)
attrs.DataType = _guess_uatype(val)
attrs.Value = val
if isinstance(val, list) or isinstance(val, tuple):
attrs.ValueRank = 0
attrs.ValueRank = ua.ValueRank.OneDimension
else:
attrs.ValueRank = -1
# attrs.ArrayDimensions = 0
attrs.ValueRank = ua.ValueRank.Scalar
#attrs.ArrayDimensions = None
attrs.WriteMask = ua.OpenFileMode.Read
attrs.UserWriteMask = ua.OpenFileMode.Read
attrs.Historizing = 0
......
#AUTOGENERATED!!!
from enum import IntEnum
class ObjectIds:
class ObjectIds(object):
Boolean = 1
SByte = 2
Byte = 3
......
......@@ -162,7 +162,8 @@ class Subscription(object):
if len(sattr.BrowsePath) == 0:
#fields[ua.AttributeIdsInv[sattr.AttributeId]] = event.EventFields[idx].Value
setattr(result, ua.AttributeIdsInv[sattr.AttributeId], event.EventFields[idx].Value)
#setattr(result, ua.AttributeIdsInv[sattr.AttributeId], event.EventFields[idx].Value)
setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
else:
setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
if hasattr(self._handler, "event_notification"):
......
......@@ -159,7 +159,6 @@ def _val_to_variant(val, args):
if args.datatype == "guess":
if val in ("true", "True", "false", "False"):
return _arg_to_variant(val, array, _arg_to_bool)
# FIXME: guess bool value
try:
return _arg_to_variant(val, array, int)
except ValueError:
......@@ -387,25 +386,13 @@ def uasubscribe():
print(args)
# converts numeric value to its enum name.
def enum_to_string(klass, value):
if isinstance(value, Enum):
return value.name
# if value is not a subtype of Enum, try to find a constant
# with this value in this class
for k, v in vars(klass).items():
if not k.startswith('__') and v == value:
return k
return 'Unknown {} ({})'.format(klass.__name__, value)
def application_to_strings(app):
result = []
result.append(('Application URI', app.ApplicationUri))
optionals = [
('Product URI', app.ProductUri),
('Application Name', app.ApplicationName.to_string()),
('Application Type', enum_to_string(ua.ApplicationType, app.ApplicationType)),
('Application Type', str(app.ApplicationType)),
('Gateway Server URI', app.GatewayServerUri),
('Discovery Profile URI', app.DiscoveryProfileUri),
]
......@@ -422,12 +409,12 @@ def endpoint_to_strings(ep):
result += application_to_strings(ep.Server)
result += [
('Server Certificate', len(ep.ServerCertificate)),
('Security Mode', enum_to_string(ua.MessageSecurityMode, ep.SecurityMode)),
('Security Mode', str(ep.SecurityMode)),
('Security Policy URI', ep.SecurityPolicyUri)]
for tok in ep.UserIdentityTokens:
result += [
('User policy', tok.PolicyId),
(' Token type', enum_to_string(ua.UserTokenType, tok.TokenType))]
(' Token type', str(tok.TokenType))]
if tok.IssuedTokenType or tok.IssuerEndpointUrl:
result += [
(' Issued Token type', tok.IssuedTokenType),
......@@ -593,7 +580,7 @@ def print_history(o):
def str_to_datetime(s):
if not s:
return datetime.utcnow()
# try different datetime formats
# FIXME: try different datetime formats
for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
try:
return datetime.strptime(s, fmt)
......
This diff is collapsed.
import struct
import logging
import hashlib
from enum import IntEnum
import opcua.uaprotocol_auto as auto
import opcua.uatypes as uatypes
......@@ -13,9 +14,27 @@ logger = logging.getLogger('opcua.uaprotocol')
OPC_TCP_SCHEME = 'opc.tcp'
class ValueRank(IntEnum):
"""
Defines dimensions of a variable.
This enum does not support all cases since ValueRank support any n>0
but since it is an IntEnum it can be replace by a normal int
"""
ScalarOrOneDimension = -3
Any = -2
Scalar = -1
OneOrMoreDimensions = 0
OneDimension = 1
# the next names are not in spec but so common we express them here
TwoDimensions = 2
ThreeDimensions = 3
FourDimensions = 3
class AccessLevelMask(object):
"""
used by AccessLevel and UserAccessLevel
Used by AccessLevel and UserAccessLevel
"""
CurrentRead = 0
CurrentWrite = 1
......@@ -528,4 +547,4 @@ class Argument(auto.Argument):
self.ValueRank = -2
AttributeIdsInv = {v: k for k, v in AttributeIds.__dict__.items()}
#AttributeIdsInv = {v: k for k, v in AttributeIds.__dict__.items()}
......@@ -765,7 +765,7 @@ class Variant(FrozenClass):
raise Exception("Could not guess UA type of {} with type {}, specify UA type".format(val, type(val)))
def __str__(self):
return "Variant(val:{},type:{})".format(self.Value, self.VariantType)
return "Variant(val:{!s},type:{})".format(self.Value, self.VariantType)
__repr__ = __str__
def to_binary(self):
......
......@@ -3,8 +3,9 @@ if __name__ == "__main__":
outputfile = open("../opcua/object_ids.py", "w")
outputfile.write("#AUTOGENERATED!!!\n")
outputfile.write("\n")
outputfile.write("from enum import IntEnum\n")
outputfile.write("\n")
outputfile.write("class ObjectIds:\n")
outputfile.write("class ObjectIds(object):\n")
for line in inputfile:
name, nb, datatype = line.split(",")
outputfile.write(" {} = {}\n".format(name, nb))
......@@ -13,8 +14,9 @@ if __name__ == "__main__":
outputfile = open("../opcua/attribute_ids.py", "w")
outputfile.write("#AUTOGENERATED!!!\n")
outputfile.write("\n")
outputfile.write("from enum import IntEnum\n")
outputfile.write("\n")
outputfile.write("class AttributeIds:\n")
outputfile.write("class AttributeIds(IntEnum):\n")
for line in inputfile:
name, nb = line.split(",")
outputfile.write(" {} = {}\n".format(name.strip(), nb.strip()))
......
......@@ -3,6 +3,7 @@ import generate_model as gm
IgnoredEnums = ["NodeIdType"]
IgnoredStructs = ["QualifiedName", "NodeId", "ExpandedNodeId", "FilterOperand", "Variant", "DataValue", "LocalizedText", "ExtensionObject"]
numerics = ("Int8", "UInt8", "Sbyte", "Byte", "Char", "Boolean", "Int16", "UInt16", "Int32", "UInt32", "Float", "Int64", "UInt64", "Double")
class CodeGenerator(object):
......@@ -55,6 +56,7 @@ class CodeGenerator(object):
self.write("'''")
self.write("")
self.write("from datetime import datetime")
self.write("from enum import Enum, IntEnum")
self.write("")
self.write("from opcua.utils import Buffer")
self.write("from opcua.uatypes import *")
......@@ -63,7 +65,7 @@ class CodeGenerator(object):
def generate_enum_code(self, enum):
self.write("")
self.write("")
self.write("class {}(object):".format(enum.name))
self.write("class {}(IntEnum):".format(enum.name))
self.iidx = 1
self.write("'''")
if enum.doc:
......@@ -165,8 +167,8 @@ class CodeGenerator(object):
if field.is_native_type():
self.write_pack_uatype(listname, fname, field.uatype)
elif field.uatype in self.model.enum_list:
uatype = self.model.get_enum(field.uatype).uatype
self.write_pack_uatype(listname, fname, uatype)
enum = self.model.get_enum(field.uatype)
self.write_pack_enum(listname, fname, enum)
elif field.uatype in ("ExtensionObject"):
self.write("{}.append(extensionobject_to_binary({}))".format(listname, fname))
else:
......@@ -214,8 +216,10 @@ class CodeGenerator(object):
else:
self.write_unpack_uatype(field.name, field.uatype)
elif field.uatype in self.model.enum_list:
uatype = self.model.get_enum(field.uatype).uatype
self.write_unpack_uatype(field.name, uatype)
#uatype = self.model.get_enum(field.uatype).uatype
#self.write_unpack_uatype(field.name, uatype)
enum = self.model.get_enum(field.uatype)
self.write_unpack_enum(field.name, enum)
else:
if field.uatype in ("ExtensionObject"):
frombinary = "extensionobject_from_binary(data)"
......@@ -264,15 +268,25 @@ class CodeGenerator(object):
self.iix = 0
def write_unpack_uatype(self, name, uatype):
def write_unpack_enum(self, name, enum):
self.write("self.{} = {}(uatype_{}.unpack(data.read({}))[0])".format(name, enum.name, enum.uatype, self.get_size_from_uatype(enum.uatype)))
def get_size_from_uatype(self, uatype):
if uatype in ("Int8", "UInt8", "Sbyte", "Byte", "Char", "Boolean"):
size = 1
return 1
elif uatype in ("Int16", "UInt16"):
size = 2
return 2
elif uatype in ("Int32", "UInt32", "Float"):
size = 4
return 4
elif uatype in ("Int64", "UInt64", "Double"):
size = 8
return 8
else:
raise Exception("Cannot get size from type {}".format(uatype))
def write_unpack_uatype(self, name, uatype):
if uatype in numerics:
size = self.get_size_from_uatype(uatype)
self.write("self.{} = uatype_{}.unpack(data.read({}))[0]".format(name, uatype, size))
elif uatype == "String":
self.write("self.{} = unpack_string(data)".format(name))
return
......@@ -285,15 +299,12 @@ class CodeGenerator(object):
else:
self.write("self.{} = unpack_uatype('{}', data)".format(name, uatype))
return
self.write("self.{} = uatype_{}.unpack(data.read({}))[0]".format(name, uatype, size))
def write_pack_enum(self, listname, name, enum):
self.write("{}.append(uatype_{}.pack({}.value))".format(listname, enum.uatype, name))
def write_pack_uatype(self, listname, name, uatype):
if uatype in (
"Int8", "UInt8", "Sbyte", "Byte", "Char", "Boolean",
"Int16", "UInt16",
"Int32", "UInt32", "Float",
"Int64", "UInt64", "Double"
):
if uatype in numerics:
self.write("{}.append(uatype_{}.pack({}))".format(listname, uatype, name))
return
elif uatype == "String":
......@@ -311,7 +322,8 @@ class CodeGenerator(object):
def get_default_value(self, field):
if field.uatype in self.model.enum_list:
return 0
enum = self.model.get_enum(field.uatype)
return enum.name + "(0)"
if field.uatype in ("String"):
return "''"
elif field.uatype in ("ByteString", "CharArray", "Char"):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment