Commit 6d5ca772 authored by olivier R-D's avatar olivier R-D

Merge branch 'sql'

parents 6a68a039 a8353760
...@@ -16,3 +16,4 @@ dist ...@@ -16,3 +16,4 @@ dist
*.egg-info *.egg-info
*.swp *.swp
newdocs newdocs
examples/history.db
import sys
sys.path.insert(0, "..")
import time
from opcua import ua, Server
if __name__ == "__main__":
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
# setup our own namespace, not really necessary but should as spec
uri = "http://examples.freeopcua.github.io"
idx = server.register_namespace(uri)
# get Objects node, this is where we should put our custom stuff
objects = server.get_objects_node()
# populating our address space
myobj = objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", 6.7)
myvarA = myobj.add_variable(idx, "MyBool", True)
myvar.set_writable() # Set MyVariable to be writable by clients
# starting!
server.start()
# historize must be called after the server is started!
server.historize_node(myvar)
server.historize_node(myvarA)
try:
count = 0
while True:
time.sleep(1)
count += 0.1
myvar.set_value(count)
my_val = myvarA.get_value()
if my_val:
myvarA.set_value(False)
else:
myvarA.set_value(True)
finally:
# close connection, remove subscriptions, etc
server.stop()
...@@ -26,6 +26,7 @@ if __name__ == "__main__": ...@@ -26,6 +26,7 @@ if __name__ == "__main__":
# starting! # starting!
server.start() server.start()
try: try:
count = 0 count = 0
while True: while True:
......
...@@ -13,21 +13,21 @@ class HistoryStorageInterface(object): ...@@ -13,21 +13,21 @@ class HistoryStorageInterface(object):
Must be implemented by backends Must be implemented by backends
""" """
def new_historized_node(self, node, period, count=0): def new_historized_node(self, node_id, period, count=0):
""" """
Called when a new node is to be historized Called when a new node is to be historized
Returns None Returns None
""" """
raise NotImplementedError raise NotImplementedError
def save_node_value(self, node, datavalue): def save_node_value(self, node_id, datavalue):
""" """
Called when the value of a historized node has changed and should be saved in history Called when the value of a historized node has changed and should be saved in history
Returns None Returns None
""" """
raise NotImplementedError raise NotImplementedError
def read_node_history(self, node, start, end, nb_values): def read_node_history(self, node_id, start, end, nb_values):
""" """
Called when a client make a history read request for a node Called when a client make a history read request for a node
if start or end is missing then nb_values is used to limit query if start or end is missing then nb_values is used to limit query
...@@ -62,26 +62,34 @@ class HistoryStorageInterface(object): ...@@ -62,26 +62,34 @@ class HistoryStorageInterface(object):
""" """
raise NotImplementedError raise NotImplementedError
def stop(self):
"""
Called when the server shuts down
Can be used to close database connections etc.
"""
raise NotImplementedError
# if you want to use an SQL based history uncomment this import and change the storage type of the history manager
# from opcua.server.history_sql import HistorySQLite
class HistoryDict(HistoryStorageInterface): class HistoryDict(HistoryStorageInterface):
""" """
very minimal history backend storing data in memory using a Python dictionnary very minimal history backend storing data in memory using a Python dictionary
""" """
def __init__(self): def __init__(self):
self._datachanges = {} self._datachanges = {}
self._datachanges_period = {} self._datachanges_period = {}
self._events = {} self._events = {}
def new_historized_node(self, node, period, count=0): def new_historized_node(self, node_id, period, count=0):
self._datachanges[node] = [] self._datachanges[node_id] = []
self._datachanges_period[node] = period, count self._datachanges_period[node_id] = period, count
def new_historized_event(self, event, period):
self._events = []
def save_node_value(self, node, datavalue): def save_node_value(self, node_id, datavalue):
data = self._datachanges[node] data = self._datachanges[node_id]
period, count = self._datachanges_period[node] period, count = self._datachanges_period[node_id]
data.append(datavalue) data.append(datavalue)
now = datetime.now() now = datetime.now()
if period: if period:
...@@ -90,9 +98,9 @@ class HistoryDict(HistoryStorageInterface): ...@@ -90,9 +98,9 @@ class HistoryDict(HistoryStorageInterface):
if count and len(data) > count: if count and len(data) > count:
data = data[-count:] data = data[-count:]
def read_node_history(self, node, start, end, nb_values): def read_node_history(self, node_id, start, end, nb_values):
cont = None cont = None
if node not in self._datachanges: if node_id not in self._datachanges:
print("Error attempt to read history for a node which is not historized") print("Error attempt to read history for a node which is not historized")
return [], cont return [], cont
else: else:
...@@ -100,7 +108,7 @@ class HistoryDict(HistoryStorageInterface): ...@@ -100,7 +108,7 @@ class HistoryDict(HistoryStorageInterface):
end = datetime.now() + timedelta(days=1) end = datetime.now() + timedelta(days=1)
if start is None: if start is None:
start = ua.DateTimeMinValue start = ua.DateTimeMinValue
results = [dv for dv in self._datachanges[node] if start <= dv.ServerTimestamp <= end] results = [dv for dv in self._datachanges[node_id] if start <= dv.ServerTimestamp <= end]
if nb_values: if nb_values:
if start > ua.DateTimeMinValue and len(results) > nb_values: if start > ua.DateTimeMinValue and len(results) > nb_values:
cont = results[nb_values + 1].ServerTimestamp cont = results[nb_values + 1].ServerTimestamp
...@@ -109,12 +117,18 @@ class HistoryDict(HistoryStorageInterface): ...@@ -109,12 +117,18 @@ class HistoryDict(HistoryStorageInterface):
results = results[-nb_values:] results = results[-nb_values:]
return results, cont return results, cont
def new_historized_event(self, event, period):
self._events = []
def save_event(self, event): def save_event(self, event):
raise NotImplementedError raise NotImplementedError
def read_event_history(self, start, end, evfilter): def read_event_history(self, start, end, evfilter):
raise NotImplementedError raise NotImplementedError
def stop(self):
pass
class SubHandler(object): class SubHandler(object):
def __init__(self, storage): def __init__(self, storage):
...@@ -123,7 +137,6 @@ class SubHandler(object): ...@@ -123,7 +137,6 @@ class SubHandler(object):
def datachange_notification(self, node, val, data): def datachange_notification(self, node, val, data):
self.storage.save_node_value(node.nodeid, data.monitored_item.Value) self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
def event_notification(self, event): def event_notification(self, event):
self.storage.save_event(event) self.storage.save_event(event)
...@@ -131,7 +144,7 @@ class SubHandler(object): ...@@ -131,7 +144,7 @@ class SubHandler(object):
class HistoryManager(object): class HistoryManager(object):
def __init__(self, iserver): def __init__(self, iserver):
self.iserver = iserver self.iserver = iserver
self.storage = HistoryDict() self.storage = HistoryDict() # Change to HistorySQLite() for file based history
self._sub = None self._sub = None
self._handlers = {} self._handlers = {}
...@@ -152,7 +165,7 @@ class HistoryManager(object): ...@@ -152,7 +165,7 @@ class HistoryManager(object):
if not self._sub: if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage)) self._sub = self._create_subscription(SubHandler(self.storage))
if node in self._handlers: if node in self._handlers:
raise ua.UaError("Node {} is allready historized".format(node)) raise ua.UaError("Node {} is already historized".format(node))
self.storage.new_historized_node(node.nodeid, period, count) self.storage.new_historized_node(node.nodeid, period, count)
handler = self._sub.subscribe_data_change(node) handler = self._sub.subscribe_data_change(node)
self._handlers[node] = handler self._handlers[node] = handler
...@@ -175,7 +188,8 @@ class HistoryManager(object): ...@@ -175,7 +188,8 @@ class HistoryManager(object):
return results return results
def _read_history(self, details, rv): def _read_history(self, details, rv):
""" read history for a node """
read history for a node
""" """
result = ua.HistoryReadResult() result = ua.HistoryReadResult()
if isinstance(details, ua.ReadRawModifiedDetails): if isinstance(details, ua.ReadRawModifiedDetails):
...@@ -215,9 +229,9 @@ class HistoryManager(object): ...@@ -215,9 +229,9 @@ class HistoryManager(object):
details.EndTime, details.EndTime,
details.NumValuesPerNode) details.NumValuesPerNode)
if cont: if cont:
#cont = datetime_to_bytes(dv[-1].ServerTimestamp) # cont = datetime_to_bytes(dv[-1].ServerTimestamp)
cont = ua.pack_datetime(dv[-1].ServerTimestamp) cont = ua.pack_datetime(dv[-1].ServerTimestamp)
# FIXME, parse index range and filter out if necesary # FIXME, parse index range and filter out if necessary
# rv.IndexRange # rv.IndexRange
# rv.DataEncoding # xml or binary, seems spec say we can ignore that one # rv.DataEncoding # xml or binary, seems spec say we can ignore that one
return dv, cont return dv, cont
...@@ -236,6 +250,5 @@ class HistoryManager(object): ...@@ -236,6 +250,5 @@ class HistoryManager(object):
results.append(results) results.append(results)
return results return results
def stop(self):
self.storage.stop()
import logging
from datetime import timedelta
from datetime import datetime
from opcua import ua
from opcua.common.utils import Buffer
from opcua.server.history import HistoryStorageInterface
import sqlite3
class HistorySQLite(HistoryStorageInterface):
"""
very minimal history backend storing data in SQLite database
"""
def __init__(self):
self.logger = logging.getLogger('historySQL')
self._datachanges_period = {}
self._events = {}
self._db_file = "history.db"
self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
def new_historized_node(self, node_id, period, count=0):
_c_new = self._conn.cursor()
table = self._get_table_name(node_id)
self._datachanges_period[node_id] = period
# create a table for the node which will store attributes of the DataValue object
# note: Value and VariantType TEXT is only for human reading, the actual data is stored in VariantBinary column
try:
_c_new.execute('CREATE TABLE "{tn}" (ServerTimestamp TIMESTAMP,'
' SourceTimestamp TIMESTAMP,'
' StatusCode INTEGER,'
' Value TEXT,'
' VariantType TEXT,'
' VariantBinary BLOB)'.format(tn=table))
except sqlite3.Error as e:
self.logger.info('Historizing SQL Table Creation Error for %s: %s', node_id, e)
self._conn.commit()
def save_node_value(self, node_id, datavalue):
_c_sub = self._conn.cursor()
table = self._get_table_name(node_id)
# insert the data change into the database
try:
_c_sub.execute('INSERT INTO "{tn}" VALUES (?, ?, ?, ?, ?, ?)'.format(tn=table), (datavalue.ServerTimestamp,
datavalue.SourceTimestamp,
datavalue.StatusCode.value,
str(datavalue.Value.Value),
datavalue.Value.VariantType.name,
datavalue.Value.to_binary()))
except sqlite3.Error as e:
self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
self._conn.commit()
# get this node's period from the period dict and calculate the limit
period = self._datachanges_period[node_id]
date_limit = datetime.now() - period
# after the insert, delete all values older than period
try:
_c_sub.execute('DELETE FROM "{tn}" WHERE ServerTimestamp < ?'.format(tn=table),
(date_limit.isoformat(' '),))
except sqlite3.Error as e:
self.logger.error('Historizing SQL Delete Old Data Error for %s: %s', node_id, e)
self._conn.commit()
def read_node_history(self, node_id, start, end, nb_values):
_c_read = self._conn.cursor()
if end is None:
end = datetime.now() + timedelta(days=1)
if start is None:
start = ua.DateTimeMinValue
table = self._get_table_name(node_id)
cont = None
results = []
start_time = start.isoformat(' ')
end_time = end.isoformat(' ')
# select values from the database; recreate UA Variant from binary
try:
for row in _c_read.execute('SELECT * FROM "{tn}" WHERE "ServerTimestamp" BETWEEN ? AND ? '
'LIMIT ?'.format(tn=table), (start_time, end_time, nb_values,)):
dv = ua.DataValue(ua.Variant.from_binary(Buffer(row[5])))
dv.ServerTimestamp = row[0]
dv.SourceTimestamp = row[1]
dv.StatusCode = ua.StatusCode(row[2])
results.append(dv)
except sqlite3.Error as e:
self.logger.error('Historizing SQL Read Error for %s: %s', node_id, e)
return results, cont
def new_historized_event(self, event, period):
raise NotImplementedError
def save_event(self, event):
raise NotImplementedError
def read_event_history(self, start, end, evfilter):
raise NotImplementedError
def _get_table_name(self, node_id):
return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
# close connections to the history database when the server stops
def stop(self):
self._conn.close()
...@@ -97,6 +97,7 @@ class InternalServer(object): ...@@ -97,6 +97,7 @@ class InternalServer(object):
def stop(self): def stop(self):
self.logger.info("stopping internal server") self.logger.info("stopping internal server")
self.loop.stop() self.loop.stop()
self.history_manager.stop()
def _set_current_time(self): def _set_current_time(self):
self.current_time_node.set_value(datetime.utcnow()) self.current_time_node.set_value(datetime.utcnow())
......
...@@ -338,3 +338,9 @@ class Server(object): ...@@ -338,3 +338,9 @@ class Server(object):
def delete_nodes(self, nodes, recursive=False): def delete_nodes(self, nodes, recursive=False):
return delete_nodes(self.iserver.isession, nodes, recursive) return delete_nodes(self.iserver.isession, nodes, recursive)
def historize_node(self, node):
self.iserver.enable_history(node)
def dehistorize_node(self, node):
self.iserver.disable_history(node)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment