Commit 67809d59 authored by Olivier R-D's avatar Olivier R-D

fix filetime <> datetime convertion, crappy stuff

parent 81c10085
...@@ -2,7 +2,9 @@ ...@@ -2,7 +2,9 @@
implement ua datatypes implement ua datatypes
""" """
from enum import Enum from enum import Enum
from datetime import datetime, timedelta from datetime import datetime, timedelta, tzinfo
from calendar import timegm
import uuid import uuid
import struct import struct
...@@ -11,18 +13,38 @@ import opcua.status_code as status_code ...@@ -11,18 +13,38 @@ import opcua.status_code as status_code
#types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases #types that will packed and unpacked directly using struct (string, bytes and datetime are handles as special cases
UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double") UaTypes = ("Boolean", "SByte", "Byte", "Int8", "UInt8", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64", "Float", "Double")
EPOCH_AS_FILETIME = 116444736000000000 # January 1, 1970 as MS file time
HUNDREDS_OF_NANOSECONDS = 10000000
class UTC(tzinfo):
"""UTC"""
def utcoffset(self, dt):
return timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return timedelta(0)
#methods copied from David Buxton <david@gasmark6.com> sample code
def datetime_to_win_epoch(dt): def datetime_to_win_epoch(dt):
epch = (dt - datetime(1601, 1, 1, 0, 0)).total_seconds() * 10 ** 7 if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
return int(epch) dt = dt.replace(tzinfo=UTC())
ft = EPOCH_AS_FILETIME + (timegm(dt.timetuple()) * HUNDREDS_OF_NANOSECONDS)
return ft + (dt.microsecond * 10)
def win_epoch_to_datetime(epch): def win_epoch_to_datetime(epch):
return datetime(1601, 1, 1) + timedelta(microseconds=epch/10.0) (s, ns100) = divmod(epch - EPOCH_AS_FILETIME, HUNDREDS_OF_NANOSECONDS)
dt = datetime.utcfromtimestamp(s)
dt = dt.replace(microsecond=(ns100 // 10))
return dt
def uatype_to_fmt(uatype): def uatype_to_fmt(uatype):
#if uatype == "String":
#return "s"
#elif uatype == "CharArray":
#return "s"
if uatype == "Char": if uatype == "Char":
return "B" return "B"
elif uatype == "SByte": elif uatype == "SByte":
......
#! /usr/bin/env python #! /usr/bin/env python
import io import io
import sys import sys
from datetime import datetime from datetime import datetime, timedelta
import unittest import unittest
from threading import Thread, Event from threading import Thread, Event
try: try:
...@@ -93,10 +93,18 @@ class Unit(unittest.TestCase): ...@@ -93,10 +93,18 @@ class Unit(unittest.TestCase):
qn = ua.QualifiedName("Root", 0) qn = ua.QualifiedName("Root", 0)
v = Variant v = Variant
def test_datetime(self):
now = datetime.now()
epch = ua.datetime_to_win_epoch(now)
dt = ua.win_epoch_to_datetime(epch)
self.assertEqual(now, dt)
epch = 128930364000001000
dt = ua.win_epoch_to_datetime(epch)
print("dt", dt)
epch2 = ua.datetime_to_win_epoch(dt)
print("epch2", epch2)
self.assertEqual(epch, epch2)
def test_equal_nodeid(self): def test_equal_nodeid(self):
nid1 = ua.NodeId(999, 2) nid1 = ua.NodeId(999, 2)
...@@ -145,7 +153,6 @@ class Unit(unittest.TestCase): ...@@ -145,7 +153,6 @@ class Unit(unittest.TestCase):
self.assertEqual(dv.Value, ua.Variant('abc')) self.assertEqual(dv.Value, ua.Variant('abc'))
now = datetime.now() now = datetime.now()
dv.source_timestamp = now dv.source_timestamp = now
#self.assertEqual(int(dv.source_timestamp.to_time_t()), tnow)
def test_variant(self): def test_variant(self):
dv = ua.Variant(True, ua.VariantType.Boolean) dv = ua.Variant(True, ua.VariantType.Boolean)
...@@ -156,7 +163,7 @@ class Unit(unittest.TestCase): ...@@ -156,7 +163,7 @@ class Unit(unittest.TestCase):
self.assertEqual(v.Value, now) self.assertEqual(v.Value, now)
self.assertEqual(v.VariantType, ua.VariantType.DateTime) self.assertEqual(v.VariantType, ua.VariantType.DateTime)
v2 = ua.Variant.from_binary(ua.utils.Buffer(v.to_binary())) v2 = ua.Variant.from_binary(ua.utils.Buffer(v.to_binary()))
#self.assertEqual(v.Value, v2.Value) self.assertEqual(v.Value, v2.Value)
self.assertEqual(v.VariantType, v2.VariantType) self.assertEqual(v.VariantType, v2.VariantType)
def test_variant_array(self): def test_variant_array(self):
...@@ -172,7 +179,7 @@ class Unit(unittest.TestCase): ...@@ -172,7 +179,7 @@ class Unit(unittest.TestCase):
self.assertEqual(v.Value[0], now) self.assertEqual(v.Value[0], now)
self.assertEqual(v.VariantType, ua.VariantType.DateTime) self.assertEqual(v.VariantType, ua.VariantType.DateTime)
v2 = ua.Variant.from_binary(ua.utils.Buffer(v.to_binary())) v2 = ua.Variant.from_binary(ua.utils.Buffer(v.to_binary()))
#self.assertEqual(v.Value, v2.Value) self.assertEqual(v.Value, v2.Value)
self.assertEqual(v.VariantType, v2.VariantType) self.assertEqual(v.VariantType, v2.VariantType)
...@@ -268,10 +275,9 @@ class CommonTests(object): ...@@ -268,10 +275,9 @@ class CommonTests(object):
def test_datetime_read(self): def test_datetime_read(self):
time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime)) time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
dt = time_node.get_value() dt = time_node.get_value()
#pydt = dt.to_datetime() utcnow = datetime.utcnow()
#utcnow = datetime.datetime.utcnow() delta = utcnow - dt
#delta = utcnow - pydt self.assertTrue(delta < timedelta(seconds=1))
#self.assertTrue(delta < datetime.timedelta(seconds=1))
def test_datetime_write(self): def test_datetime_write(self):
time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime)) time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
...@@ -279,7 +285,7 @@ class CommonTests(object): ...@@ -279,7 +285,7 @@ class CommonTests(object):
objects = self.opc.get_objects_node() objects = self.opc.get_objects_node()
v1 = objects.add_variable(4, "test_datetime", now) v1 = objects.add_variable(4, "test_datetime", now)
tid = v1.get_value() tid = v1.get_value()
#self.assertEqual(now, tid.to_datetime()) rounding error!! self.assertEqual(now, tid)
def test_add_numeric_variable(self): def test_add_numeric_variable(self):
objects = self.opc.get_objects_node() objects = self.opc.get_objects_node()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment