Commit bace92d0 authored by Dominik Luntzer's avatar Dominik Luntzer

Correctly handle period and count limits in both history implementations

* Added missing handling of the count parameter to the sqlite implementation
* Add tests for both limits and both history implementations
* Fixed some minor issues
parent ef0cba63
......@@ -91,7 +91,7 @@ class HistoryDict(HistoryStorageInterface):
data.append(datavalue)
now = datetime.utcnow()
if period:
while now - data[0].ServerTimestamp > period:
while len(data) and now - data[0].ServerTimestamp > period:
data.pop(0)
if count and len(data) > count:
data.pop(0)
......@@ -214,12 +214,12 @@ class HistoryManager(object):
since it requires more logic than other attribute service methods
"""
results = []
for rv in params.NodesToRead:
res = self._read_history(params.HistoryReadDetails, rv)
results.append(res)
return results
def _read_history(self, details, rv):
"""
determine if the history read is for a data changes or events; then read the history for that node
......
......@@ -75,18 +75,25 @@ class HistorySQLite(HistoryStorageInterface):
# get this node's period from the period dict and calculate the limit
period, count = self._datachanges_period[node_id]
if period:
# after the insert, if a period was specified delete all records older than period
date_limit = datetime.now() - period
def executeDeleteStatement(condition, args):
query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn = table)
try:
_c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
(date_limit.isoformat(' '),))
_c_sub.execute(query, args)
except sqlite3.Error as e:
self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
self._conn.commit()
if period:
# after the insert, if a period was specified delete all records older than period
date_limit = datetime.utcnow() - period
executeDeleteStatement('ServerTimestamp < ?', (date_limit,))
if count:
#ensure that no more than count records are stored for the specified node
executeDeleteStatement('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
def read_node_history(self, node_id, start, end, nb_values):
with self._lock:
_c_read = self._conn.cursor()
......
......@@ -17,7 +17,7 @@ from tests_cmd_lines import TestCmdLines
from tests_server import TestServer, TestServerCaching
from tests_client import TestClient
from tests_unit import TestUnit
from tests_history import TestHistory
from tests_history import TestHistory, TestHistorySQL, TestHistoryLimits, TestHistorySQLLimits
from tests_history import TestHistorySQL
if CRYPTOGRAPHY_AVAILABLE:
from tests_crypto_connect import TestCryptoConnect
......
......@@ -7,6 +7,7 @@ from opcua import Server
from opcua import ua
from opcua.server.history_sql import HistorySQLite
from opcua.server.history import HistoryDict
from tests_common import CommonTests, add_server_methods
......@@ -114,7 +115,7 @@ class HistoryCommon(object):
res = self.var.read_raw_history(None, now, 2)
self.assertEqual(len(res), 2)
self.assertEqual(res[-1].Value.Value, self.values[-2])
# both start and endtime, return from start to end
def test_history_var_read_all(self):
now = datetime.utcnow()
......@@ -143,7 +144,7 @@ class HistoryCommon(object):
self.assertEqual(len(res), 5)
self.assertEqual(res[-1].Value.Value, self.values[-5])
self.assertEqual(res[0].Value.Value, self.values[-1])
# only start return original order
def test_history_var_read_6_with_start(self):
now = datetime.utcnow()
......@@ -286,12 +287,71 @@ class TestHistorySQL(unittest.TestCase, HistoryCommon):
def tearDownClass(cls):
cls.stop_server_and_client()
class TestHistoryLimitsCommon(unittest.TestCase):
id = ua.NodeId(123)
def setUp(self):
self.history = self.createHistoryInstance()
def createHistoryInstance(self):
assert(False)
def resultCount(self):
results, cont = self.history.read_node_history(self.id, None, None, None)
return len(results)
def addValue(self, age):
value = ua.DataValue()
value.ServerTimestamp = datetime.utcnow() - timedelta(hours = age)
self.history.save_node_value(self.id, value)
def test_count_limit(self):
self.history.new_historized_node(self.id, period = None, count = 3)
self.assertEqual(self.resultCount(), 0)
self.addValue(5)
self.assertEqual(self.resultCount(), 1)
self.addValue(4)
self.assertEqual(self.resultCount(), 2)
self.addValue(3)
self.assertEqual(self.resultCount(), 3)
self.addValue(2)
self.assertEqual(self.resultCount(), 3)
self.addValue(1)
self.assertEqual(self.resultCount(), 3)
def test_period_limit(self):
self.history.new_historized_node(self.id, period = timedelta(hours = 3))
self.assertEqual(self.resultCount(), 0)
self.addValue(5)
self.assertEqual(self.resultCount(), 0)
self.addValue(4)
self.assertEqual(self.resultCount(), 0)
self.addValue(2)
self.assertEqual(self.resultCount(), 1)
self.addValue(1)
self.assertEqual(self.resultCount(), 2)
self.addValue(0)
self.assertEqual(self.resultCount(), 3)
def test_combined_limit(self):
self.history.new_historized_node(self.id, period = timedelta(hours = 3), count = 2)
self.assertEqual(self.resultCount(), 0)
self.addValue(5)
self.assertEqual(self.resultCount(), 0)
self.addValue(4)
self.assertEqual(self.resultCount(), 0)
self.addValue(2)
self.assertEqual(self.resultCount(), 1)
self.addValue(1)
self.assertEqual(self.resultCount(), 2)
self.addValue(0)
self.assertEqual(self.resultCount(), 2)
class TestHistoryLimits(TestHistoryLimitsCommon):
def createHistoryInstance(self):
return HistoryDict()
class TestHistorySQLLimits(TestHistoryLimitsCommon):
def createHistoryInstance(self):
return HistorySQLite(":memory:")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment