Commit d0d0cb26 authored by ORD's avatar ORD

Merge pull request #158 from FreeOpcUa/sql2

SQL History storage
parents bac760de f7128531
...@@ -16,3 +16,4 @@ dist ...@@ -16,3 +16,4 @@ dist
*.egg-info *.egg-info
*.swp *.swp
newdocs newdocs
examples/history.db
import sys
sys.path.insert(0, "..")
import time
import math
from opcua import ua, Server
from opcua.server.history_sql import HistorySQLite
if __name__ == "__main__":
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
# setup our own namespace, not really necessary but should as spec
uri = "http://examples.freeopcua.github.io"
idx = server.register_namespace(uri)
# get Objects node, this is where we should put our custom stuff
objects = server.get_objects_node()
# populating our address space
myobj = objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", ua.Variant(0, ua.VariantType.Double))
myvar.set_writable() # Set MyVariable to be writable by clients
# Configure server to use sqlite as history database (default is a simple in memory dict)
server.iserver.history_manager.set_storage(HistorySQLite(":memory:"))
# starting!
server.start()
# enable history for this particular node, must be called after start since it uses subscription
server.iserver.enable_history(myvar, period=None, count=100)
try:
count = 0
while True:
time.sleep(1)
count += 0.1
myvar.set_value(math.sin(count))
finally:
#close connection, remove subcsriptions, etc
server.stop()
...@@ -26,6 +26,7 @@ if __name__ == "__main__": ...@@ -26,6 +26,7 @@ if __name__ == "__main__":
# starting! # starting!
server.start() server.start()
try: try:
count = 0 count = 0
while True: while True:
......
...@@ -13,21 +13,21 @@ class HistoryStorageInterface(object): ...@@ -13,21 +13,21 @@ class HistoryStorageInterface(object):
Must be implemented by backends Must be implemented by backends
""" """
def new_historized_node(self, node, period, count=0): def new_historized_node(self, node_id, period, count=0):
""" """
Called when a new node is to be historized Called when a new node is to be historized
Returns None Returns None
""" """
raise NotImplementedError raise NotImplementedError
def save_node_value(self, node, datavalue): def save_node_value(self, node_id, datavalue):
""" """
Called when the value of a historized node has changed and should be saved in history Called when the value of a historized node has changed and should be saved in history
Returns None Returns None
""" """
raise NotImplementedError raise NotImplementedError
def read_node_history(self, node, start, end, nb_values): def read_node_history(self, node_id, start, end, nb_values):
""" """
Called when a client make a history read request for a node Called when a client make a history read request for a node
if start or end is missing then nb_values is used to limit query if start or end is missing then nb_values is used to limit query
...@@ -62,59 +62,74 @@ class HistoryStorageInterface(object): ...@@ -62,59 +62,74 @@ class HistoryStorageInterface(object):
""" """
raise NotImplementedError raise NotImplementedError
def stop(self):
"""
Called when the server shuts down
Can be used to close database connections etc.
"""
raise NotImplementedError
class HistoryDict(HistoryStorageInterface): class HistoryDict(HistoryStorageInterface):
""" """
very minimal history backend storing data in memory using a Python dictionnary very minimal history backend storing data in memory using a Python dictionary
""" """
def __init__(self): def __init__(self):
self._datachanges = {} self._datachanges = {}
self._datachanges_period = {} self._datachanges_period = {}
self._events = {} self._events = {}
def new_historized_node(self, node, period, count=0): def new_historized_node(self, node_id, period, count=0):
self._datachanges[node] = [] self._datachanges[node_id] = []
self._datachanges_period[node] = period, count self._datachanges_period[node_id] = period, count
def new_historized_event(self, event, period):
self._events = []
def save_node_value(self, node, datavalue): def save_node_value(self, node_id, datavalue):
data = self._datachanges[node] data = self._datachanges[node_id]
period, count = self._datachanges_period[node] period, count = self._datachanges_period[node_id]
data.append(datavalue) data.append(datavalue)
now = datetime.now() now = datetime.utcnow()
if period: if period:
while now - data[0].ServerTimestamp > period: while now - data[0].ServerTimestamp > period:
data.pop(0) data.pop(0)
if count and len(data) > count: if count and len(data) > count:
data = data[-count:] data = data[-count:]
def read_node_history(self, node, start, end, nb_values): def read_node_history(self, node_id, start, end, nb_values):
cont = None cont = None
if node not in self._datachanges: if node_id not in self._datachanges:
print("Error attempt to read history for a node which is not historized") print("Error attempt to read history for a node which is not historized")
return [], cont return [], cont
else: else:
if end is None:
end = datetime.now() + timedelta(days=1)
if start is None: if start is None:
start = ua.DateTimeMinValue start = ua.DateTimeMinValue
results = [dv for dv in self._datachanges[node] if start <= dv.ServerTimestamp <= end] if end is None:
if nb_values: end = ua.DateTimeMinValue
if start > ua.DateTimeMinValue and len(results) > nb_values: if start == ua.DateTimeMinValue:
cont = results[nb_values + 1].ServerTimestamp results = [dv for dv in reversed(self._datachanges[node_id]) if start <= dv.ServerTimestamp]
results = results[:nb_values] elif end == ua.DateTimeMinValue:
else: results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp]
results = results[-nb_values:] elif start > end:
results = [dv for dv in reversed(self._datachanges[node_id]) if end <= dv.ServerTimestamp <= start]
else:
results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
if nb_values and len(results) > nb_values:
cont = results[nb_values + 1].ServerTimestamp
results = results[:nb_values]
return results, cont return results, cont
def new_historized_event(self, event, period):
self._events = []
def save_event(self, event): def save_event(self, event):
raise NotImplementedError raise NotImplementedError
def read_event_history(self, start, end, evfilter): def read_event_history(self, start, end, evfilter):
raise NotImplementedError raise NotImplementedError
def stop(self):
pass
class SubHandler(object): class SubHandler(object):
def __init__(self, storage): def __init__(self, storage):
...@@ -123,7 +138,6 @@ class SubHandler(object): ...@@ -123,7 +138,6 @@ class SubHandler(object):
def datachange_notification(self, node, val, data): def datachange_notification(self, node, val, data):
self.storage.save_node_value(node.nodeid, data.monitored_item.Value) self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
def event_notification(self, event): def event_notification(self, event):
self.storage.save_event(event) self.storage.save_event(event)
...@@ -152,7 +166,7 @@ class HistoryManager(object): ...@@ -152,7 +166,7 @@ class HistoryManager(object):
if not self._sub: if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage)) self._sub = self._create_subscription(SubHandler(self.storage))
if node in self._handlers: if node in self._handlers:
raise ua.UaError("Node {} is allready historized".format(node)) raise ua.UaError("Node {} is already historized".format(node))
self.storage.new_historized_node(node.nodeid, period, count) self.storage.new_historized_node(node.nodeid, period, count)
handler = self._sub.subscribe_data_change(node) handler = self._sub.subscribe_data_change(node)
self._handlers[node] = handler self._handlers[node] = handler
...@@ -175,7 +189,8 @@ class HistoryManager(object): ...@@ -175,7 +189,8 @@ class HistoryManager(object):
return results return results
def _read_history(self, details, rv): def _read_history(self, details, rv):
""" read history for a node """
read history for a node
""" """
result = ua.HistoryReadResult() result = ua.HistoryReadResult()
if isinstance(details, ua.ReadRawModifiedDetails): if isinstance(details, ua.ReadRawModifiedDetails):
...@@ -215,9 +230,9 @@ class HistoryManager(object): ...@@ -215,9 +230,9 @@ class HistoryManager(object):
details.EndTime, details.EndTime,
details.NumValuesPerNode) details.NumValuesPerNode)
if cont: if cont:
#cont = datetime_to_bytes(dv[-1].ServerTimestamp) # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
cont = ua.pack_datetime(dv[-1].ServerTimestamp) cont = ua.pack_datetime(dv[-1].ServerTimestamp)
# FIXME, parse index range and filter out if necesary # FIXME, parse index range and filter out if necessary
# rv.IndexRange # rv.IndexRange
# rv.DataEncoding # xml or binary, seems spec say we can ignore that one # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
return dv, cont return dv, cont
...@@ -236,6 +251,5 @@ class HistoryManager(object): ...@@ -236,6 +251,5 @@ class HistoryManager(object):
results.append(results) results.append(results)
return results return results
def stop(self):
self.storage.stop()
import logging
from datetime import timedelta
from datetime import datetime
from threading import Lock
from opcua import ua
from opcua.common.utils import Buffer
from opcua.server.history import HistoryStorageInterface
import sqlite3
class HistorySQLite(HistoryStorageInterface):
"""
very minimal history backend storing data in SQLite database
"""
def __init__(self, path="history.db"):
self.logger = logging.getLogger(__name__)
self._datachanges_period = {}
self._events = {}
self._db_file = path
self._lock = Lock()
self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
def new_historized_node(self, node_id, period, count=0):
with self._lock:
_c_new = self._conn.cursor()
table = self._get_table_name(node_id)
self._datachanges_period[node_id] = period, count
# create a table for the node which will store attributes of the DataValue object
# note: Value/VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
try:
_c_new.execute('CREATE TABLE "{tn}" (Id INTEGER PRIMARY KEY NOT NULL,'
' ServerTimestamp TIMESTAMP,'
' SourceTimestamp TIMESTAMP,'
' StatusCode INTEGER,'
' Value TEXT,'
' VariantType TEXT,'
' VariantBinary BLOB)'.format(tn=table))
except sqlite3.Error as e:
self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
self._conn.commit()
def save_node_value(self, node_id, datavalue):
with self._lock:
_c_sub = self._conn.cursor()
table = self._get_table_name(node_id)
# insert the data change into the database
try:
_c_sub.execute('INSERT INTO "{tn}" VALUES (NULL, ?, ?, ?, ?, ?, ?)'.format(tn=table),
(
datavalue.ServerTimestamp,
datavalue.SourceTimestamp,
datavalue.StatusCode.value,
str(datavalue.Value.Value),
datavalue.Value.VariantType.name,
datavalue.Value.to_binary()
)
)
except sqlite3.Error as e:
self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
self._conn.commit()
# get this node's period from the period dict and calculate the limit
period, count = self._datachanges_period[node_id]
if period:
# after the insert, if a period was specified delete all records older than period
date_limit = datetime.now() - period
try:
_c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
(date_limit.isoformat(' '),))
except sqlite3.Error as e:
self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
self._conn.commit()
def read_node_history(self, node_id, start, end, nb_values):
with self._lock:
_c_read = self._conn.cursor()
order = "ASC"
if start is None or start == ua.DateTimeMinValue:
order = "DESC"
start = ua.DateTimeMinValue
if end is None or end == ua.DateTimeMinValue:
end = datetime.utcnow() + timedelta(days=1)
if start < end:
start_time = start.isoformat(' ')
end_time = end.isoformat(' ')
else:
order = "DESC"
start_time = end.isoformat(' ')
end_time = start.isoformat(' ')
if nb_values:
limit = nb_values + 1 # add 1 to the number of values for retrieving a continuation point
else:
limit = -1 # in SQLite a LIMIT of -1 returns all results
table = self._get_table_name(node_id)
cont = None
results = []
# select values from the database; recreate UA Variant from binary ORDER BY "ServerTimestamp" DESC
try:
for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
'ORDER BY "Id" {dir} LIMIT ?'.format(tn=table, dir=order), (start_time, end_time, limit,)):
dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[6])))
dv.ServerTimestamp = row[1]
dv.SourceTimestamp = row[2]
dv.StatusCode = ua.StatusCode(row[3])
results.append(dv)
except sqlite3.Error as e:
self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
if nb_values:
if start > ua.DateTimeMinValue and len(results) > nb_values:
cont = results[nb_values].ServerTimestamp
results = results[:nb_values]
return results, cont
def new_historized_event(self, event, period):
raise NotImplementedError
def save_event(self, event):
raise NotImplementedError
def read_event_history(self, start, end, evfilter):
raise NotImplementedError
def _get_table_name(self, node_id):
return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
def stop(self):
with self._lock:
self._conn.close()
...@@ -97,6 +97,7 @@ class InternalServer(object): ...@@ -97,6 +97,7 @@ class InternalServer(object):
def stop(self): def stop(self):
self.logger.info("stopping internal server") self.logger.info("stopping internal server")
self.loop.stop() self.loop.stop()
self.history_manager.stop()
def _set_current_time(self): def _set_current_time(self):
self.current_time_node.set_value(datetime.utcnow()) self.current_time_node.set_value(datetime.utcnow())
......
...@@ -338,3 +338,9 @@ class Server(object): ...@@ -338,3 +338,9 @@ class Server(object):
def delete_nodes(self, nodes, recursive=False): def delete_nodes(self, nodes, recursive=False):
return delete_nodes(self.iserver.isession, nodes, recursive) return delete_nodes(self.iserver.isession, nodes, recursive)
def historize_node(self, node):
self.iserver.enable_history(node)
def dehistorize_node(self, node):
self.iserver.disable_history(node)
...@@ -17,6 +17,8 @@ from tests_cmd_lines import TestCmdLines ...@@ -17,6 +17,8 @@ from tests_cmd_lines import TestCmdLines
from tests_server import TestServer from tests_server import TestServer
from tests_client import TestClient from tests_client import TestClient
from tests_unit import TestUnit from tests_unit import TestUnit
from tests_history import TestHistory
from tests_history import TestHistorySQL
if CRYPTOGRAPHY_AVAILABLE: if CRYPTOGRAPHY_AVAILABLE:
from tests_crypto_connect import TestCryptoConnect from tests_crypto_connect import TestCryptoConnect
......
...@@ -867,32 +867,3 @@ class CommonTests(object): ...@@ -867,32 +867,3 @@ class CommonTests(object):
self.assertTrue(len(endpoints) > 0) self.assertTrue(len(endpoints) > 0)
self.assertTrue(endpoints[0].EndpointUrl.startswith("opc.tcp://")) self.assertTrue(endpoints[0].EndpointUrl.startswith("opc.tcp://"))
def test_history_read(self):
o = self.srv.get_objects_node()
vals = [i for i in range(20)]
var = o.add_variable(3, "history_var", 0)
self.srv.iserver.enable_history(var, period=None, count=10)
for i in vals:
var.set_value(i)
time.sleep(1)
now = datetime.now()
old = now - timedelta(days=6)
res = var.read_raw_history(None, now, 2)
self.assertEqual(len(res), 2)
self.assertEqual(res[-1].Value.Value, vals[-1])
res = var.read_raw_history(old, now, 0)
self.assertEqual(len(res), 20)
self.assertEqual(res[-1].Value.Value, vals[-1])
self.assertEqual(res[0].Value.Value, vals[0])
res = var.read_raw_history(old, now, 5)
self.assertEqual(len(res), 5)
self.assertEqual(res[-1].Value.Value, vals[4])
self.assertEqual(res[0].Value.Value, vals[0])
import time
from datetime import datetime, timedelta
import unittest
from opcua import Client
from opcua import Server
from opcua import ua
from opcua.server.history_sql import HistorySQLite
from tests_common import CommonTests, add_server_methods
port_num1 = 48530
port_num2 = 48530
class HistoryCommon(object):
srv = Server
clt = Client
@classmethod
def start_server_and_client(cls):
cls.srv = Server()
cls.srv.set_endpoint('opc.tcp://localhost:%d' % port_num1)
cls.srv.start()
cls.clt = Client('opc.tcp://localhost:%d' % port_num1)
cls.clt.connect()
@classmethod
def stop_server_and_client(cls):
cls.clt.disconnect()
cls.srv.stop()
@classmethod
def create_var(cls):
o = cls.srv.get_objects_node()
cls.values = [i for i in range(20)]
cls.var = o.add_variable(3, "history_var", 0)
cls.srv.iserver.enable_history(cls.var, period=None, count=10)
for i in cls.values:
cls.var.set_value(i)
time.sleep(1)
# no start and no end is not defined by spec, return reverse order
def test_history_read_one(self):
# Spec says that at least two parameters should be provided, so
# this one is out of spec
res = self.var.read_raw_history(None, None, 1)
self.assertEqual(len(res), 1)
self.assertEqual(res[0].Value.Value, self.values[-1])
# no start and no end is not defined by spec, return reverse order
def test_history_read_none(self):
res = self.var.read_raw_history(None, None, 0)
self.assertEqual(len(res), 20)
self.assertEqual(res[0].Value.Value, self.values[-1])
self.assertEqual(res[-1].Value.Value, self.values[0])
# no start and no end is not defined by spec, return reverse order
def test_history_read_last_3(self):
res = self.var.read_raw_history(None, None, 3)
self.assertEqual(len(res), 3)
self.assertEqual(res[-1].Value.Value, self.values[-3])
self.assertEqual(res[0].Value.Value, self.values[-1])
# no start and no end is not defined by spec, return reverse order
def test_history_read_all2(self):
res = self.var.read_raw_history(None, None, 9999)
self.assertEqual(len(res), 20)
self.assertEqual(res[-1].Value.Value, self.values[0])
self.assertEqual(res[0].Value.Value, self.values[-1])
# only has end time, should return reverse order
def test_history_read_2_with_end(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.var.read_raw_history(None, now, 2)
self.assertEqual(len(res), 2)
self.assertEqual(res[-1].Value.Value, self.values[-2])
# both start and endtime, return from start to end
def test_history_read_all(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.var.read_raw_history(old, now, 0)
self.assertEqual(len(res), 20)
self.assertEqual(res[-1].Value.Value, self.values[-1])
self.assertEqual(res[0].Value.Value, self.values[0])
def test_history_read_5_in_timeframe(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.var.read_raw_history(old, now, 5)
self.assertEqual(len(res), 5)
self.assertEqual(res[-1].Value.Value, self.values[4])
self.assertEqual(res[0].Value.Value, self.values[0])
# start time greater than end time, should return reverse order
def test_history_read_5_in_timeframe_start_greater_than_end(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.var.read_raw_history(now, old, 5)
self.assertEqual(len(res), 5)
self.assertEqual(res[-1].Value.Value, self.values[-5])
self.assertEqual(res[0].Value.Value, self.values[-1])
# only start return original order
def test_history_read_6_with_start(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.var.read_raw_history(old, None, 6)
self.assertEqual(len(res), 6)
self.assertEqual(res[-1].Value.Value, self.values[5])
self.assertEqual(res[0].Value.Value, self.values[0])
# only start return original order
def test_history_read_all_with_start(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.var.read_raw_history(old, None, 0)
self.assertEqual(len(res), 20)
self.assertEqual(res[-1].Value.Value, self.values[-1])
self.assertEqual(res[0].Value.Value, self.values[0])
# only end return reversed order
def test_history_read_all_with_end(self):
end = datetime.utcnow() + timedelta(days=6)
res = self.var.read_raw_history(None, end, 0)
self.assertEqual(len(res), 20)
self.assertEqual(res[-1].Value.Value, self.values[0])
self.assertEqual(res[0].Value.Value, self.values[-1])
# only end return reversed order
def test_history_read_3_with_end(self):
end = datetime.utcnow() + timedelta(days=6)
res = self.var.read_raw_history(None, end, 3)
self.assertEqual(len(res), 3)
self.assertEqual(res[2].Value.Value, self.values[-3])
self.assertEqual(res[0].Value.Value, self.values[-1])
class TestHistory(unittest.TestCase, HistoryCommon):
@classmethod
def setUpClass(cls):
cls.start_server_and_client()
cls.create_var()
@classmethod
def tearDownClass(cls):
cls.stop_server_and_client()
class TestHistorySQL(unittest.TestCase, HistoryCommon):
@classmethod
def setUpClass(cls):
cls.start_server_and_client()
cls.srv.iserver.history_manager.set_storage(HistorySQLite(":memory:"))
cls.create_var()
@classmethod
def tearDownClass(cls):
cls.stop_server_and_client()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment