Commit af5a31c0 authored by oroulet's avatar oroulet Committed by oroulet

fix use of continuation point in server history

parent 2cf565e2
......@@ -20,6 +20,8 @@ class HistoryStorageInterface:
Interface of a history backend.
Must be implemented by backends
"""
def __init__(self, max_history_data_response_size=10000):
self.max_history_data_response_size = max_history_data_response_size
async def init(self):
"""
......@@ -89,7 +91,8 @@ class HistoryDict(HistoryStorageInterface):
Very minimal history backend storing data in memory using a Python dictionary
"""
def __init__(self):
def __init__(self, max_history_data_response_size=10000):
self.max_history_data_response_size = max_history_data_response_size
self._datachanges = {}
self._datachanges_period = {}
self._events = {}
......@@ -144,9 +147,13 @@ class HistoryDict(HistoryStorageInterface):
results = [
dv for dv in self._datachanges[node_id] if start <= dv.SourceTimestamp <= end
]
if nb_values and len(results) > nb_values:
cont = results[nb_values + 1].SourceTimestamp
results = results[:nb_values]
if len(results) > self.max_history_data_response_size:
cont = results[self.max_history_data_response_size + 1].SourceTimestamp
results = results[:self.max_history_data_response_size]
return results, cont
async def new_historized_event(self, source_id, evtypes, period, count=0):
......@@ -190,9 +197,13 @@ class HistoryDict(HistoryStorageInterface):
else:
results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
if nb_values and len(results) > nb_values:
cont = results[nb_values + 1].Time
results = results[:nb_values]
if len(results) > self.max_history_data_response_size:
cont = results[self.max_history_data_response_size + 1].Time
results = results[:self.max_history_data_response_size]
return results, cont
async def stop(self):
......
......@@ -4,7 +4,6 @@ import sqlite3
from typing import Iterable
from datetime import timedelta
from datetime import datetime
from asyncio import get_event_loop
from asyncua import ua
from ..ua.ua_binary import variant_from_binary, variant_to_binary
......@@ -21,16 +20,16 @@ class HistorySQLite(HistoryStorageInterface):
note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
"""
def __init__(self, path="history.db", loop=None):
def __init__(self, path="history.db", max_history_data_response_size=10000):
self.max_history_data_response_size = max_history_data_response_size
self.logger = logging.getLogger(__name__)
self._datachanges_period = {}
self._db_file = path
self._event_fields = {}
self._db: aiosqlite.Connection = None
self._loop = loop or get_event_loop()
async def init(self):
self._db = await aiosqlite.connect(self._db_file, loop=self._loop, detect_types=sqlite3.PARSE_DECLTYPES)
self._db = await aiosqlite.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES)
async def stop(self):
await self._db.close()
......@@ -123,10 +122,9 @@ class HistorySQLite(HistoryStorageInterface):
results.append(dv)
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Read Error for %s: %s", node_id, e)
if nb_values:
if len(results) > nb_values:
cont = results[nb_values].SourceTimestamp
results = results[:nb_values]
if len(results) > self.max_history_data_response_size:
cont = results[self.max_history_data_response_size].SourceTimestamp
results = results[:self.max_history_data_response_size]
return results, cont
async def new_historized_event(self, source_id, evtypes, period, count=0):
......@@ -199,10 +197,9 @@ class HistorySQLite(HistoryStorageInterface):
results.append(Event.from_field_dict(fdict))
except aiosqlite.Error as e:
self.logger.error("Historizing SQL Read Error events for node %s: %s", source_id, e)
if nb_values:
if len(results) > nb_values: # start > ua.get_win_epoch() and
cont = cont_timestamps[nb_values]
results = results[:nb_values]
if len(results) > self.max_history_data_response_size: # start > ua.get_win_epoch() and
cont = cont_timestamps[self.max_history_data_response_size]
results = results[:self.max_history_data_response_size]
return results, cont
def _get_table_name(self, node_id):
......@@ -241,7 +238,7 @@ class HistorySQLite(HistoryStorageInterface):
start_time = end.isoformat(" ")
end_time = start.isoformat(" ")
if nb_values:
limit = nb_values + 1 # add 1 to the number of values for retrieving a continuation point
limit = nb_values
else:
limit = -1 # in SQLite a LIMIT of -1 returns all results
return start_time, end_time, order, limit
......
......@@ -3,7 +3,6 @@ import logging
from asyncua import Client
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')
......@@ -64,4 +63,5 @@ async def main():
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment