Commit e0c690be authored by Christian Bergmiller's avatar Christian Bergmiller

fix sqlite timestamp type

parent 399b1780
......@@ -13,11 +13,11 @@ class UaNodeAlreadyHistorizedError(ua.UaError):
class HistoryStorageInterface:
"""
Interface of a history backend.
Must be implemented by backends
"""
async def init(self):
"""
Async. initialization.
......@@ -185,7 +185,7 @@ class HistoryDict(HistoryStorageInterface):
class SubHandler:
def __init__(self, storage, loop):
self.storage = storage
self.loop = loop
self.loop = loop
def datachange_notification(self, node, val, data):
self.loop.create_task(
......@@ -274,7 +274,7 @@ class HistoryManager:
"""
if node in self._handlers:
await self._sub.unsubscribe(self._handlers[node])
del(self._handlers[node])
del (self._handlers[node])
else:
self.logger.error("History Manager isn't subscribed to %s", node)
......@@ -329,9 +329,9 @@ class HistoryManager:
starttime = ua.ua_binary.Primitives.DateTime.unpack(Buffer(rv.ContinuationPoint))
dv, cont = await self.storage.read_node_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode)
starttime,
details.EndTime,
details.NumValuesPerNode)
if cont:
cont = ua.ua_binary.Primitives.DateTime.pack(cont)
# rv.IndexRange
......@@ -348,10 +348,10 @@ class HistoryManager:
starttime = ua.ua_binary.Primitives.DateTime.unpack(Buffer(rv.ContinuationPoint))
evts, cont = await self.storage.read_event_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode,
details.Filter)
starttime,
details.EndTime,
details.NumValuesPerNode,
details.Filter)
results = []
for ev in evts:
field_list = ua.HistoryEventFieldList()
......
import logging
import aiosqlite
import sqlite3
from typing import Iterable, Optional
from datetime import timedelta
from datetime import datetime
from asyncio import get_event_loop
import sqlite3
from asyncua import ua
from ..ua.ua_binary import variant_from_binary, variant_to_binary
......@@ -30,7 +30,7 @@ class HistorySQLite(HistoryStorageInterface):
self._loop = loop or get_event_loop()
async def init(self):
self._db = await aiosqlite.connect(self._db_file, loop=self._loop)
self._db = await aiosqlite.connect(self._db_file, loop=self._loop, detect_types=sqlite3.PARSE_DECLTYPES)
async def stop(self):
await self._db.close()
......@@ -50,15 +50,14 @@ class HistorySQLite(HistoryStorageInterface):
' VariantType TEXT,'
' VariantBinary BLOB)', None)
await self._db.commit()
except sqlite3.Error as e:
except aiosqlite.Error as e:
self.logger.info("Historizing SQL Table Creation Error for %s: %s", node_id, e)
async def execute_sql_delete(self, condition: str, args: Iterable, table: str, node_id):
try:
await self._db.execute(f'DELETE FROM "{table}" WHERE {condition}', args)
await self._db.commit()
except sqlite3.Error as e:
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Delete Old Data Error for %s: %s", node_id, e)
async def save_node_value(self, node_id, datavalue):
......@@ -74,7 +73,7 @@ class HistorySQLite(HistoryStorageInterface):
sqlite3.Binary(variant_to_binary(datavalue.Value))
))
await self._db.commit()
except sqlite3.Error as e:
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Insert Error for %s: %s", node_id, e)
# get this node's period from the period dict and calculate the limit
period, count = self._datachanges_period[node_id]
......@@ -105,7 +104,7 @@ class HistorySQLite(HistoryStorageInterface):
dv.SourceTimestamp = row[2]
dv.StatusCode = ua.StatusCode(row[3])
results.append(dv)
except sqlite3.Error as e:
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Read Error for %s: %s", node_id, e)
if nb_values:
if len(results) > nb_values:
......@@ -130,7 +129,7 @@ class HistorySQLite(HistoryStorageInterface):
None
)
await self._db.commit()
except sqlite3.Error as e:
except aiosqlite.Error as e:
self.logger.info("Historizing SQL Table Creation Error for events from %s: %s", source_id, e)
async def save_event(self, event):
......@@ -145,7 +144,7 @@ class HistorySQLite(HistoryStorageInterface):
evtup
)
await self._db.commit()
except sqlite3.Error as e:
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Insert Error for events from %s: %s", event.SourceNode, e)
# get this node's period from the period dict and calculate the limit
period = self._datachanges_period[event.SourceNode]
......@@ -155,7 +154,7 @@ class HistorySQLite(HistoryStorageInterface):
try:
await self._db.execute(f'DELETE FROM "{table}" WHERE Time < ?', (date_limit.isoformat(' '),))
await self._db.commit()
except sqlite3.Error as e:
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Delete Old Data Error for events from %s: %s", event.SourceNode, e)
async def read_event_history(self, source_id, start, end, nb_values, evfilter):
......@@ -180,7 +179,7 @@ class HistorySQLite(HistoryStorageInterface):
else:
fdict[clauses[i]] = ua.Variant(None)
results.append(Event.from_field_dict(fdict))
except sqlite3.Error as e:
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Read Error events for node %s: %s", source_id, e)
if nb_values:
if len(results) > nb_values: # start > ua.get_win_epoch() and
......@@ -248,7 +247,7 @@ class HistorySQLite(HistoryStorageInterface):
for name in names:
variant = ev_variant_dict[name]
placeholders.append("?")
ev_variant_binaries.append(sqlite3.Binary(variant_to_binary(variant)))
ev_variant_binaries.append(aiosqlite.Binary(variant_to_binary(variant)))
return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
def _get_event_columns(self, ev_fields):
......
......@@ -36,7 +36,7 @@ class UTC(tzinfo):
return timedelta(0)
def datetime_to_win_epoch(dt):
def datetime_to_win_epoch(dt: datetime):
"""method copied from David Buxton <david@gasmark6.com> sample code"""
if (dt.tzinfo is None) or (dt.tzinfo.utcoffset(dt) is None):
dt = dt.replace(tzinfo=UTC())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment