Commit 3976fb13 authored by olivier R-D's avatar olivier R-D

restructure binary code

parent 1fcfef14
......@@ -201,12 +201,12 @@ class Node(object):
def set_attr_bit(self, attr, bit):
val = self.get_attribute(attr)
val.Value.Value = ua.set_bit(val.Value.Value, bit)
val.Value.Value = ua.ua_binary.set_bit(val.Value.Value, bit)
self.set_attribute(attr, val)
def unset_attr_bit(self, attr, bit):
val = self.get_attribute(attr)
val.Value.Value = ua.unset_bit(val.Value.Value, bit)
val.Value.Value = ua.ua_binary.unset_bit(val.Value.Value, bit)
self.set_attribute(attr, val)
def set_read_only(self):
......@@ -326,7 +326,7 @@ class Node(object):
desc.NodeId = self.nodeid
params = ua.BrowseParameters()
params.View.Timestamp = ua.win_epoch_to_datetime(0)
params.View.Timestamp = ua.get_win_epoch()
params.NodesToBrowse.append(desc)
results = self.server.browse(params)
return results[0].References
......@@ -434,11 +434,11 @@ class Node(object):
if starttime:
details.StartTime = starttime
else:
details.StartTime = ua.DateTimeMinValue
details.StartTime = ua.get_win_epoch()
if endtime:
details.EndTime = endtime
else:
details.EndTime = ua.DateTimeMinValue
details.EndTime = ua.get_win_epoch()
details.NumValuesPerNode = numvalues
details.ReturnBounds = True
result = self.history_read(details)
......@@ -473,11 +473,11 @@ class Node(object):
if starttime:
details.StartTime = starttime
else:
details.StartTime = ua.DateTimeMinValue
details.StartTime = ua.get_win_epoch()
if endtime:
details.EndTime = endtime
else:
details.EndTime = ua.DateTimeMinValue
details.EndTime = ua.get_win_epoch()
details.NumValuesPerNode = numvalues
if not isinstance(evtypes, (list, tuple)):
......
......@@ -60,7 +60,7 @@ class AttributeService(object):
continue
al = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
ual = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
if not ua.test_bit(al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.test_bit(ual.Value.Value, ua.AccessLevel.CurrentWrite):
if not ua.ua_binary.test_bit(al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.ua_binary.test_bit(ual.Value.Value, ua.AccessLevel.CurrentWrite):
res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
continue
res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
......
......@@ -109,12 +109,12 @@ class HistoryDict(HistoryStorageInterface):
return [], cont
else:
if start is None:
start = ua.DateTimeMinValue
start = ua.get_win_epoch()
if end is None:
end = ua.DateTimeMinValue
if start == ua.DateTimeMinValue:
end = ua.get_win_epoch()
if start == ua.get_win_epoch():
results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
elif end == ua.DateTimeMinValue:
elif end == ua.get_win_epoch():
results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
elif start > end:
results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
......@@ -150,12 +150,12 @@ class HistoryDict(HistoryStorageInterface):
return [], cont
else:
if start is None:
start = ua.DateTimeMinValue
start = ua.get_win_epoch()
if end is None:
end = ua.DateTimeMinValue
if start == ua.DateTimeMinValue:
end = ua.get_win_epoch()
if start == ua.get_win_epoch():
results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
elif end == ua.DateTimeMinValue:
elif end == ua.get_win_epoch():
results = [ev for ev in self._events[source_id] if start <= ev.Time]
elif start > end:
results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
......@@ -305,14 +305,14 @@ class HistoryManager(object):
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
dv, cont = self.storage.read_node_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode)
if cont:
cont = ua.pack_datetime(cont)
cont = ua.ua_binary.Primitives.DateTime.pack(cont)
# rv.IndexRange
# rv.DataEncoding # xml or binary, seems spec say we can ignore that one
return dv, cont
......@@ -324,7 +324,7 @@ class HistoryManager(object):
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
starttime = ua.ua_binary.Primitives.DateTime.unpack(utils.Buffer(rv.ContinuationPoint))
evts, cont = self.storage.read_event_history(rv.NodeId,
starttime,
......@@ -337,7 +337,7 @@ class HistoryManager(object):
field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
results.append(field_list)
if cont:
cont = ua.pack_datetime(cont)
cont = ua.ua_binary.Primitives.DateTime.pack(cont)
return results, cont
def update_history(self, params):
......
......@@ -225,7 +225,7 @@ class HistorySQLite(HistoryStorageInterface):
self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
if nb_values:
if len(results) > nb_values: # start > ua.DateTimeMinValue and
if len(results) > nb_values: # start > ua.get_win_epoch() and
cont = cont_timestamps[nb_values]
results = results[:nb_values]
......@@ -258,11 +258,11 @@ class HistorySQLite(HistoryStorageInterface):
def _get_bounds(start, end, nb_values):
order = "ASC"
if start is None or start == ua.DateTimeMinValue:
if start is None or start == ua.get_win_epoch():
order = "DESC"
start = ua.DateTimeMinValue
start = ua.get_win_epoch()
if end is None or end == ua.DateTimeMinValue:
if end is None or end == ua.get_win_epoch():
end = datetime.utcnow() + timedelta(days=1)
if start < end:
......
......@@ -127,7 +127,7 @@ class MonitoredItemService(object):
ev_notify_byte = self.aspace.get_attribute_value(
params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
if ev_notify_byte is None or not ua.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
if ev_notify_byte is None or not ua.ua_binary.test_bit(ev_notify_byte, ua.EventNotifier.SubscribeToEvents):
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
return result
# result.FilterResult = ua.EventFilterResult() # spec says we can ignore if not error
......
......@@ -141,7 +141,7 @@ class UaProcessor(object):
elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
self.logger.info("Close session request")
deletesubs = ua.unpack_uatype('Boolean', body)
deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
self.session.close_session(deletesubs)
......
"""
Binary protocol specific functions and constants
"""
import sys
import struct
import logging
from datetime import datetime, timedelta, tzinfo, MAXYEAR
from calendar import timegm
from opcua.common.uaerrors import UaError
if sys.version_info.major > 2:
unicode = str
logger = logging.getLogger('__name__')
EPOCH_AS_FILETIME = 116444736000000000 # January 1, 1970 as MS file time
HUNDREDS_OF_NANOSECONDS = 10000000
FILETIME_EPOCH_AS_DATETIME = datetime(1601, 1, 1)
def test_bit(data, offset):
mask = 1 << offset
return data & mask
def set_bit(data, offset):
mask = 1 << offset
return data | mask
def unset_bit(data, offset):
mask = 1 << offset
return data & ~mask
class UTC(tzinfo):
"""UTC"""
def utcoffset(self, dt):
return timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return timedelta(0)
# method copied from David Buxton <david@gasmark6.com> sample code
def datetime_to_win_epoch(dt):
if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
dt = dt.replace(tzinfo=UTC())
ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
return ft + (dt.microsecond * 10)
def win_epoch_to_datetime(epch):
try:
return FILETIME_EPOCH_AS_DATETIME + timedelta(microseconds=epch // 10)
except OverflowError:
# FILETIMEs after 31 Dec 9999 can't be converted to datetime
logger.warning("datetime overflow: %s", epch)
return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
def build_array_format_py2(prefix, length, fmtchar):
return prefix + str(length) + fmtchar
def build_array_format_py3(prefix, length, fmtchar):
return prefix + str(length) + chr(fmtchar)
if sys.version_info.major < 3:
build_array_format = build_array_format_py2
else:
build_array_format = build_array_format_py3
class _Primitive(object):
def pack_array(self, array):
if array is None:
return b'\xff\xff\xff\xff'
length = len(array)
b = [self.pack(val) for val in array]
b.insert(0, Primitives.Int32.pack(length))
def unpack_array(self, data):
length = Primitives.Int32.unpack(data)
if length == -1:
return None
elif length == 0:
return []
else:
return [self.unpack(data) for _ in range(length)]
class _DateTime(_Primitive):
@staticmethod
def pack(dt):
epch = datetime_to_win_epoch(dt)
return Primitives.Int64.pack(epch)
@staticmethod
def unpack(data):
epch = Primitives.Int64.unpack(data)
return win_epoch_to_datetime(epch)
class _String(_Primitive):
@staticmethod
def pack(string):
if string is None:
return Primitives.Int32.pack(-1)
if isinstance(string, unicode):
string = string.encode('utf-8')
length = len(string)
return Primitives.Int32.pack(length) + string
@staticmethod
def unpack(data):
b = _Bytes.unpack(data)
if sys.version_info.major < 3:
return b
else:
if b is None:
return b
return b.decode("utf-8")
class _Bytes(_Primitive):
@staticmethod
def pack(data):
return _String.pack(data)
@staticmethod
def unpack(data):
length = Primitives.Int32.unpack(data)
if length == -1:
return None
return data.read(length)
class _Null(_Primitive):
@staticmethod
def pack(data):
return b""
@staticmethod
def unpack(data):
return None
class _Primitive1(_Primitive):
def __init__(self, fmt):
self.struct = struct.Struct(fmt)
self.size = self.struct.size
self.format = self.struct.format
def pack(self, data):
return struct.pack(self.format, data)
def unpack(self, data):
return struct.unpack(self.format, data.read(self.size))[0]
#def pack_array(self, array):
#"""
#Basically the same as the method in _Primitive but MAYBE a bit more efficitent....
#"""
#if array is None:
#return b'\xff\xff\xff\xff'
#length = len(array)
#if length == 0:
#return b'\x00\x00\x00\x00'
#if length == 1:
#return b'\x01\x00\x00\x00' + self.pack(array[0])
#return struct.pack(build_array_format("<i", length, self.format[1]), length, *array)
class Primitives1(object):
Int8 = _Primitive1("<b")
SByte = Int8
Int16 = _Primitive1("<h")
Int32 = _Primitive1("<i")
Int64 = _Primitive1("<q")
UInt8 = _Primitive1("<B")
Char = UInt8
Byte = UInt8
UInt16 = _Primitive1("<H")
UInt32 = _Primitive1("<I")
UInt64 = _Primitive1("<Q")
Boolean = _Primitive1("<?")
Double = _Primitive1("<d")
Float = _Primitive1("<f")
class Primitives(Primitives1):
Null = _Null()
String = _String()
Bytes = _Bytes()
ByteString = _Bytes()
CharArray = _Bytes()
DateTime = _DateTime()
def pack_uatype_array(vtype, array):
if array is None:
return b'\xff\xff\xff\xff'
length = len(array)
b = [pack_uatype(vtype, val) for val in array]
b.insert(0, Primitives.Int32.pack(length))
return b"".join(b)
def pack_uatype(vtype, value):
if hasattr(Primitives, vtype.name):
return getattr(Primitives, vtype.name).pack(value)
elif vtype.value > 25:
return Primitives.Bytes.pack(value)
elif vtype.name == "ExtensionObject":
# dependency loop: classes in uaprotocol_auto use Variant defined in this file,
# but Variant can contain any object from uaprotocol_auto as ExtensionObject.
# Using local import to avoid import loop
from opcua.ua.uaprotocol_auto import extensionobject_to_binary
return extensionobject_to_binary(value)
else:
try:
return value.to_binary()
except AttributeError:
raise UaError("{} could not be packed with value {}".format(vtype, value))
def unpack_uatype(vtype, data):
if hasattr(Primitives, vtype.name):
st = getattr(Primitives, vtype.name)
return st.unpack(data)
elif vtype.value > 25:
return Primitives.Bytes.unpack(data)
elif vtype.name == "ExtensionObject":
# dependency loop: classes in uaprotocol_auto use Variant defined in this file,
# but Variant can contain any object from uaprotocol_auto as ExtensionObject.
# Using local import to avoid import loop
from opcua.ua.uaprotocol_auto import extensionobject_from_binary
return extensionobject_from_binary(data)
else:
from opcua.ua import uatypes
if hasattr(uatypes, vtype.name):
klass = getattr(uatypes, vtype.name)
return klass.from_binary(data)
else:
raise UaError("can not unpack unknown vtype %s" % vtype)
def unpack_uatype_array(vtype, data):
if hasattr(Primitives, vtype.name):
st = getattr(Primitives, vtype.name)
return st.unpack_array(data)
else:
length = Primitives.Int32.unpack(data)
if length == -1:
return None
else:
return [unpack_uatype(vtype, data) for _ in range(length)]
This diff is collapsed.
......@@ -5,8 +5,8 @@ from datetime import datetime
from opcua.ua import uaprotocol_auto as auto
from opcua.ua import uatypes
from opcua.ua import ua_binary as uabin
from opcua.ua import UaError
from opcua.ua.uatypes import uatype_UInt32
from opcua.common import utils
logger = logging.getLogger('opcua.uaprotocol')
......@@ -27,23 +27,23 @@ class Hello(uatypes.FrozenClass):
def to_binary(self):
b = []
b.append(uatype_UInt32.pack(self.ProtocolVersion))
b.append(uatype_UInt32.pack(self.ReceiveBufferSize))
b.append(uatype_UInt32.pack(self.SendBufferSize))
b.append(uatype_UInt32.pack(self.MaxMessageSize))
b.append(uatype_UInt32.pack(self.MaxChunkCount))
b.append(uatypes.pack_string(self.EndpointUrl))
b.append(uabin.Primitives.UInt32.pack(self.ProtocolVersion))
b.append(uabin.Primitives.UInt32.pack(self.ReceiveBufferSize))
b.append(uabin.Primitives.UInt32.pack(self.SendBufferSize))
b.append(uabin.Primitives.UInt32.pack(self.MaxMessageSize))
b.append(uabin.Primitives.UInt32.pack(self.MaxChunkCount))
b.append(uabin.Primitives.String.pack(self.EndpointUrl))
return b"".join(b)
@staticmethod
def from_binary(data):
hello = Hello()
hello.ProtocolVersion = uatype_UInt32.unpack(data.read(4))[0]
hello.ReceiveBufferSize = uatype_UInt32.unpack(data.read(4))[0]
hello.SendBufferSize = uatype_UInt32.unpack(data.read(4))[0]
hello.MaxMessageSize = uatype_UInt32.unpack(data.read(4))[0]
hello.MaxChunkCount = uatype_UInt32.unpack(data.read(4))[0]
hello.EndpointUrl = uatypes.unpack_string(data)
hello.ProtocolVersion = uabin.Primitives.UInt32.unpack(data)
hello.ReceiveBufferSize = uabin.Primitives.UInt32.unpack(data)
hello.SendBufferSize = uabin.Primitives.UInt32.unpack(data)
hello.MaxMessageSize = uabin.Primitives.UInt32.unpack(data)
hello.MaxChunkCount = uabin.Primitives.UInt32.unpack(data)
hello.EndpointUrl = uabin.Primitives.String.unpack(data)
return hello
......@@ -83,9 +83,9 @@ class Header(uatypes.FrozenClass):
size = self.body_size + 8
if self.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
size += 4
b.append(uatype_UInt32.pack(size))
b.append(uabin.Primitives.UInt32.pack(size))
if self.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
b.append(uatype_UInt32.pack(self.ChannelId))
b.append(uabin.Primitives.UInt32.pack(self.ChannelId))
return b"".join(b)
@staticmethod
......@@ -95,7 +95,7 @@ class Header(uatypes.FrozenClass):
hdr.body_size = hdr.packet_size - 8
if hdr.MessageType in (MessageType.SecureOpen, MessageType.SecureClose, MessageType.SecureMessage):
hdr.body_size -= 4
hdr.ChannelId = uatype_UInt32.unpack(data.read(4))[0]
hdr.ChannelId = uabin.Primitives.UInt32.unpack(data)
return hdr
@staticmethod
......@@ -118,14 +118,14 @@ class ErrorMessage(uatypes.FrozenClass):
def to_binary(self):
b = []
b.append(self.Error.to_binary())
b.append(uatypes.pack_string(self.Reason))
b.append(uabin.Primitives.String.pack(self.Reason))
return b"".join(b)
@staticmethod
def from_binary(data):
ack = ErrorMessage()
ack.Error = uatypes.StatusCode.from_binary(data)
ack.Reason = uatypes.unpack_string(data)
ack.Reason = uabin.Primitives.String.unpack(data)
return ack
def __str__(self):
......@@ -170,17 +170,17 @@ class AsymmetricAlgorithmHeader(uatypes.FrozenClass):
def to_binary(self):
b = []
b.append(uatypes.pack_string(self.SecurityPolicyURI))
b.append(uatypes.pack_string(self.SenderCertificate))
b.append(uatypes.pack_string(self.ReceiverCertificateThumbPrint))
b.append(uabin.Primitives.String.pack(self.SecurityPolicyURI))
b.append(uabin.Primitives.String.pack(self.SenderCertificate))
b.append(uabin.Primitives.String.pack(self.ReceiverCertificateThumbPrint))
return b"".join(b)
@staticmethod
def from_binary(data):
hdr = AsymmetricAlgorithmHeader()
hdr.SecurityPolicyURI = uatypes.unpack_string(data)
hdr.SenderCertificate = uatypes.unpack_bytes(data)
hdr.ReceiverCertificateThumbPrint = uatypes.unpack_bytes(data)
hdr.SecurityPolicyURI = uabin.Primitives.String.unpack(data)
hdr.SenderCertificate = uabin.Primitives.Bytes.unpack(data)
hdr.ReceiverCertificateThumbPrint = uabin.Primitives.Bytes.unpack(data)
return hdr
def __str__(self):
......@@ -199,11 +199,11 @@ class SymmetricAlgorithmHeader(uatypes.FrozenClass):
@staticmethod
def from_binary(data):
obj = SymmetricAlgorithmHeader()
obj.TokenId = uatype_UInt32.unpack(data.read(4))[0]
obj.TokenId = uabin.Primitives.UInt32.unpack(data)
return obj
def to_binary(self):
return uatype_UInt32.pack(self.TokenId)
return uabin.Primitives.UInt32.pack(self.TokenId)
@staticmethod
def max_size():
......@@ -224,14 +224,14 @@ class SequenceHeader(uatypes.FrozenClass):
@staticmethod
def from_binary(data):
obj = SequenceHeader()
obj.SequenceNumber = uatype_UInt32.unpack(data.read(4))[0]
obj.RequestId = uatype_UInt32.unpack(data.read(4))[0]
obj.SequenceNumber = uabin.Primitives.UInt32.unpack(data)
obj.RequestId = uabin.Primitives.UInt32.unpack(data)
return obj
def to_binary(self):
b = []
b.append(uatype_UInt32.pack(self.SequenceNumber))
b.append(uatype_UInt32.pack(self.RequestId))
b.append(uabin.Primitives.UInt32.pack(self.SequenceNumber))
b.append(uabin.Primitives.UInt32.pack(self.RequestId))
return b"".join(b)
@staticmethod
......
This diff is collapsed.
......@@ -3,7 +3,32 @@ import generate_model as gm
IgnoredEnums = ["NodeIdType"]
IgnoredStructs = ["QualifiedName", "NodeId", "ExpandedNodeId", "FilterOperand", "Variant", "DataValue", "LocalizedText", "ExtensionObject", "XmlElement"]
numerics = ("Int8", "UInt8", "Sbyte", "Byte", "Char", "Boolean", "Int16", "UInt16", "Int32", "UInt32", "Float", "Int64", "UInt64", "Double")
class Primitives1(object):
Int8 = 0
SByte = 0
Int16 = 0
Int32 = 0
Int64 = 0
UInt8 = 0
Char = 0
Byte = 0
UInt16 = 0
UInt32 = 0
UInt64 = 0
Boolean = 0
Double = 0
Float = 0
class Primitives(Primitives1):
Null = 0
String = 0
Bytes = 0
ByteString = 0
CharArray = 0
DateTime = 0
class CodeGenerator(object):
......@@ -61,6 +86,7 @@ class CodeGenerator(object):
self.write("from opcua.common.utils import Buffer")
self.write("from opcua.common.uaerrors import UaError")
self.write("from opcua.ua.uatypes import *")
self.write("from opcua.ua import ua_binary as uabin")
self.write("from opcua.ua.object_ids import ObjectIds")
def generate_enum_code(self, enum):
......@@ -161,7 +187,7 @@ class CodeGenerator(object):
self.write("if self.{}: ".format(field.name))
self.iidx += 1
if field.length:
self.write("{}.append(uatype_Int32.pack(len(self.{})))".format(listname, field.name))
self.write("{}.append(uabin.Primitives.Int32.pack(len(self.{})))".format(listname, field.name))
self.write("for fieldname in self.{}:".format(field.name))
fname = "fieldname"
self.iidx += 1
......@@ -199,7 +225,7 @@ class CodeGenerator(object):
for idx, field in enumerate(obj.fields):
self.iidx = iidx
# if field.name == "Body" and idx <= (len(obj.fields)-1):
#self.write("bodylength = struct.unpack('<i', data.read(4))[0]")
#self.write("bodylength = struct.unpack('<i', data)[0]")
# continue
if field.switchfield:
bit = obj.bits[field.switchfield]
......@@ -210,10 +236,9 @@ class CodeGenerator(object):
else:
self.write("if self.{} & (1 << {}):".format(bit.container, bit.idx))
self.iidx += 1
array = False
if field.is_native_type():
if field.length:
self.write("self.{} = unpack_uatype_array('{}', data)".format(field.name, field.uatype))
self.write("self.{} = uabin.Primitives.{}.unpack_array(data)".format(field.name, field.uatype))
else:
self.write_unpack_uatype(field.name, field.uatype)
elif field.uatype in self.model.enum_list:
......@@ -227,7 +252,7 @@ class CodeGenerator(object):
else:
frombinary = "{}.from_binary(data)".format(field.uatype)
if field.length:
self.write("length = uatype_Int32.unpack(data.read(4))[0]")
self.write("length = uabin.Primitives.Int32.unpack(data)")
self.write("array = []")
self.write("if length != -1:")
self.iidx += 1
......@@ -270,7 +295,7 @@ class CodeGenerator(object):
self.iix = 0
def write_unpack_enum(self, name, enum):
self.write("self.{} = {}(uatype_{}.unpack(data.read({}))[0])".format(name, enum.name, enum.uatype, self.get_size_from_uatype(enum.uatype)))
self.write("self.{} = {}(uabin.Primitives.{}.unpack(data))".format(name, enum.name, enum.uatype))
def get_size_from_uatype(self, uatype):
if uatype in ("Int8", "UInt8", "Sbyte", "Byte", "Char", "Boolean"):
......@@ -285,40 +310,19 @@ class CodeGenerator(object):
raise Exception("Cannot get size from type {}".format(uatype))
def write_unpack_uatype(self, name, uatype):
if uatype in numerics:
size = self.get_size_from_uatype(uatype)
self.write("self.{} = uatype_{}.unpack(data.read({}))[0]".format(name, uatype, size))
elif uatype == "String":
self.write("self.{} = unpack_string(data)".format(name))
return
elif uatype in ("CharArray", "ByteString"):
self.write("self.{} = unpack_bytes(data)".format(name))
return
elif uatype == "DateTime":
self.write("self.{} = unpack_datetime(data)".format(name))
return
if hasattr(Primitives, uatype):
self.write("self.{} = uabin.Primitives.{}.unpack(data)".format(name, uatype))
else:
self.write("self.{} = unpack_uatype('{}', data)".format(name, uatype))
return
self.write("self.{} = {}.from_binary(data))".format(name, uatype))
def write_pack_enum(self, listname, name, enum):
self.write("{}.append(uatype_{}.pack({}.value))".format(listname, enum.uatype, name))
self.write("{}.append(uabin.Primitives.{}.pack({}.value))".format(listname, enum.uatype, name))
def write_pack_uatype(self, listname, name, uatype):
if uatype in numerics:
self.write("{}.append(uatype_{}.pack({}))".format(listname, uatype, name))
return
elif uatype == "String":
self.write("{}.append(pack_string({}))".format(listname, name))
return
elif uatype in ("CharArray", "ByteString"):
self.write("{}.append(pack_bytes({}))".format(listname, name))
return
elif uatype == "DateTime":
self.write("{}.append(pack_datetime({}))".format(listname, name))
return
if hasattr(Primitives, uatype):
self.write("{}.append(uabin.Primitives.{}.pack({}))".format(listname, uatype, name))
else:
self.write("{}.append(pack_uatype('{}', {}))".format(listname, uatype, name))
self.write("{}.append({}.to_binary(}))".format(listname, name))
return
def get_default_value(self, field):
......
......@@ -8,7 +8,7 @@ def extensionobject_from_binary(data):
Encoding = ord(data.read(1))
body = None
if Encoding & (1 << 0):
length = uatype_Int32.unpack(data.read(4))[0]
length = uabin.Primitives.Int32.unpack(data)
if length < 1:
body = Buffer(b"")
else:
......@@ -46,7 +46,7 @@ def extensionobject_to_binary(obj):
Body = obj.to_binary()
packet = []
packet.append(TypeId.to_binary())
packet.append(uatype_UInt8.pack(Encoding))
packet.append(uabin.Primitives.UInt8.pack(Encoding))
if Body:
packet.append(pack_bytes(Body))
packet.append(uabin.Primitives.Bytes.pack(Body))
return b''.join(packet)
......@@ -624,14 +624,14 @@ class CommonTests(object):
# create enum type
enums = self.opc.get_root_node().get_child(["0:Types", "0:DataTypes", "0:BaseDataType", "0:Enumeration"])
myenum_type = enums.add_data_type(0, "MyEnum")
es = myenum_type.add_variable(0, "EnumStrings", ["String0", "String1", "String2"], ua.VariantType.LocalizedText)
es = myenum_type.add_variable(0, "EnumStrings", [ua.LocalizedText("String0"), ua.LocalizedText("String1"), ua.LocalizedText("String2")], ua.VariantType.LocalizedText)
#es.set_value_rank(1)
# instantiate
o = self.opc.get_objects_node()
myvar = o.add_variable(2, "MyEnumVar", "String1", ua.VariantType.LocalizedText, datatype=myenum_type.nodeid)
myvar = o.add_variable(2, "MyEnumVar", ua.LocalizedText("String1"), datatype=myenum_type.nodeid)
#myvar.set_writable(True)
# tests
self.assertEqual(myvar.get_data_type(), myenum_type.nodeid)
myvar.set_value("String2", ua.VariantType.LocalizedText)
myvar.set_value(ua.LocalizedText("String2"))
......@@ -237,25 +237,25 @@ class TestUnit(unittest.TestCase):
def test_datetime(self):
now = datetime.utcnow()
epch = ua.datetime_to_win_epoch(now)
dt = ua.win_epoch_to_datetime(epch)
epch = ua.ua_binary.datetime_to_win_epoch(now)
dt = ua.ua_binary.win_epoch_to_datetime(epch)
self.assertEqual(now, dt)
# python's datetime has a range from Jan 1, 0001 to the end of year 9999
# windows' filetime has a range from Jan 1, 1601 to approx. year 30828
# let's test an overlapping range [Jan 1, 1601 - Dec 31, 9999]
dt = datetime(1601, 1, 1)
self.assertEqual(ua.win_epoch_to_datetime(ua.datetime_to_win_epoch(dt)), dt)
self.assertEqual(ua.ua_binary.win_epoch_to_datetime(ua.ua_binary.datetime_to_win_epoch(dt)), dt)
dt = datetime(9999, 12, 31, 23, 59, 59)
self.assertEqual(ua.win_epoch_to_datetime(ua.datetime_to_win_epoch(dt)), dt)
self.assertEqual(ua.ua_binary.win_epoch_to_datetime(ua.ua_binary.datetime_to_win_epoch(dt)), dt)
epch = 128930364000001000
dt = ua.win_epoch_to_datetime(epch)
epch2 = ua.datetime_to_win_epoch(dt)
dt = ua.ua_binary.win_epoch_to_datetime(epch)
epch2 = ua.ua_binary.datetime_to_win_epoch(dt)
self.assertEqual(epch, epch2)
epch = 0
self.assertEqual(ua.datetime_to_win_epoch(ua.win_epoch_to_datetime(epch)), epch)
self.assertEqual(ua.ua_binary.datetime_to_win_epoch(ua.ua_binary.win_epoch_to_datetime(epch)), epch)
def test_equal_nodeid(self):
nid1 = ua.NodeId(999, 2)
......@@ -514,5 +514,4 @@ class TestMaskEnum(unittest.TestCase):
if __name__ == '__main__':
logging.basicConfig(level=logging.WARN)
sclt = SubHandler()
unittest.main(verbosity=3)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment