Commit e19d7713 authored by olivier R-D's avatar olivier R-D

implement dict based event history, cleanup event stuff

parent 175840d0
......@@ -8,9 +8,14 @@ class EventResult(object):
def __init__(self):
self.server_handle = None
self.select_clauses = None
self.event_fields = None
self.data_types = {}
# save current attributes
self.internal_properties = list(self.__dict__.keys())[:] + ["internal_properties"]
def __str__(self):
return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items() if k not in self.internal_properties])
__repr__ = __str__
def get_event_props_as_fields_dict(self):
......@@ -19,19 +24,64 @@ class EventResult(object):
"""
field_vars = {}
for key, value in vars(self).items():
if not key.startswith("__") and key is not "server_handle":
field_vars[key] = ua.Variant(value)
if not key.startswith("__") and key not in self.internal_properties:
field_vars[key] = ua.Variant(value, self.data_types[key])
return field_vars
@staticmethod
def from_field_dict(fields):
"""
Create an Event object from a dict of name and variants
"""
result = EventResult()
for k, v in fields.items():
setattr(result, k, v.Value)
result.data_types[k] = v.VariantType
return result
def event_obj_from_event_fields(select_clauses, fields):
result = EventResult()
for idx, sattr in enumerate(select_clauses):
if len(sattr.BrowsePath) == 0:
setattr(result, sattr.AttributeId.name, fields[idx].Value)
else:
setattr(result, sattr.BrowsePath[0].Name, fields[idx].Value)
return result
def to_event_fields_using_subscription_fields(self, select_clauses):
"""
Using a new select_clauses and the original select_clauses
used during subscription, return a field list
"""
fields = []
for sattr in select_clauses:
for idx, o_sattr in enumerate(self.select_clauses):
if sattr.BrowsePath == o_sattr.BrowsePath and sattr.AttributeId == o_sattr.AttributeId:
fields.append(self.event_fields[idx])
break
return fields
def to_event_fields(self, select_clauses):
"""
return a field list using a select clause and the object properties
"""
fields = []
for sattr in select_clauses:
if len(sattr.BrowsePath) == 0:
name = sattr.AttributeId.name
else:
name = sattr.BrowsePath[0].Name
field = getattr(self, name)
fields.append(ua.Variant(field, self.data_types[name]))
return fields
@staticmethod
def from_event_fields(select_clauses, fields):
"""
Instanciate an Event object from a select_clauses and fields
"""
result = EventResult()
result.select_clauses = select_clauses
result.event_fields = fields
for idx, sattr in enumerate(select_clauses):
if len(sattr.BrowsePath) == 0:
name = sattr.AttributeId.name
else:
name = sattr.BrowsePath[0].Name
setattr(result, name, fields[idx].Value)
result.data_types[name] = fields[idx].VariantType
return result
def get_filter_from_event_type(eventtype):
......@@ -96,3 +146,5 @@ def get_event_properties_from_type_node(node):
curr_node = parents[0]
return properties
......@@ -411,7 +411,7 @@ class Node(object):
result = self.history_read_events(details)
event_res = []
for res in result.HistoryData.Events:
event_res.append(events.event_obj_from_event_fields(evfilter.SelectClauses, res.EventFields))
event_res.append(events.EventResult.from_event_fields(evfilter.SelectClauses, res.EventFields))
return event_res
def history_read_events(self, details):
......
......@@ -142,7 +142,7 @@ class Subscription(object):
for event in eventlist.Events:
with self._lock:
data = self._monitoreditems_map[event.ClientHandle]
result = events.event_obj_from_event_fields(data.mfilter.SelectClauses, event.EventFields)
result = events.EventResult.from_event_fields(data.mfilter.SelectClauses, event.EventFields)
result.server_handle = data.server_handle
if hasattr(self._handler, "event_notification"):
try:
......
......@@ -8,6 +8,10 @@ from opcua.common import utils
from opcua.common import events
class UaNodeAlreadyHistorizedError(ua.UaError):
pass
class HistoryStorageInterface(object):
"""
......@@ -40,10 +44,9 @@ class HistoryStorageInterface(object):
"""
raise NotImplementedError
def new_historized_event(self, source_id, etype, period):
def new_historized_event(self, source_id, etype, period, count=0):
"""
Called when historization of events is enabled on server side
FIXME: we may need to store events per nodes in future...
Returns None
"""
raise NotImplementedError
......@@ -80,8 +83,11 @@ class HistoryDict(HistoryStorageInterface):
self._datachanges = {}
self._datachanges_period = {}
self._events = {}
self._events_periods = {}
def new_historized_node(self, node_id, period, count=0):
if node_id in self._datachanges:
raise UaNodeAlreadyHistorizedError(node_id)
self._datachanges[node_id] = []
self._datachanges_period[node_id] = period, count
......@@ -120,14 +126,46 @@ class HistoryDict(HistoryStorageInterface):
results = results[:nb_values]
return results, cont
def new_historized_event(self, source_id, etype, period):
self._events = []
def new_historized_event(self, source_id, etype, period, count):
if source_id in self._events:
raise UaNodeAlreadyHistorizedError(source_id)
self._events[source_id] = []
self._events_periods[source_id] = period, count
def save_event(self, event):
raise NotImplementedError
evts = self._events[event.SourceNode]
evts.append(event)
period, count = self._events_periods[event.SourceNode]
now = datetime.utcnow()
if period:
while len(evts) and now - evts[0].ServerTimestamp > period:
evts.pop(0)
if count and len(evts) > count:
evts.pop(0)
def read_event_history(self, source_id, start, end, nb_values, evfilter):
raise NotImplementedError
cont = None
if source_id not in self._events:
print("Error attempt to read event history for a node which does not historize events")
return [], cont
else:
if start is None:
start = ua.DateTimeMinValue
if end is None:
end = ua.DateTimeMinValue
if start == ua.DateTimeMinValue:
results = [ev for ev in reversed(self._events[source_id]) if start <= ev.Time]
elif end == ua.DateTimeMinValue:
results = [ev for ev in self._events[source_id] if start <= ev.Time]
elif start > end:
results = [ev for ev in reversed(self._events[source_id]) if end <= ev.Time <= start]
else:
results = [ev for ev in self._events[source_id] if start <= ev.Time <= end]
if nb_values and len(results) > nb_values:
cont = results[nb_values + 1].Time
results = results[:nb_values]
return results, cont
def stop(self):
pass
......@@ -180,7 +218,7 @@ class HistoryManager(object):
handler = self._sub.subscribe_data_change(node)
self._handlers[node] = handler
def historize_event(self, source, period=timedelta(days=7)):
def historize_event(self, source, period=timedelta(days=7), count=0):
"""
subscribe to the source nodes' events and store the data in the active storage; custom event properties included
"""
......@@ -192,7 +230,7 @@ class HistoryManager(object):
# get the event types the source node generates and a list of all possible event fields
event_types, ev_fields = self._get_source_event_data(source)
self.storage.new_historized_event(source.nodeid, ev_fields, period)
self.storage.new_historized_event(source.nodeid, ev_fields, period, count)
handler = self._sub.subscribe_events(source) # FIXME supply list of event types when master is fixed
self._handlers[source] = handler
......@@ -255,7 +293,6 @@ class HistoryManager(object):
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
# starttime = bytes_to_datetime(rv.ContinuationPoint)
starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
dv, cont = self.storage.read_node_history(rv.NodeId,
......@@ -263,7 +300,6 @@ class HistoryManager(object):
details.EndTime,
details.NumValuesPerNode)
if cont:
# cont = datetime_to_bytes(dv[-1].ServerTimestamp)
cont = ua.pack_datetime(cont)
# rv.IndexRange
# rv.DataEncoding # xml or binary, seems spec say we can ignore that one
......@@ -276,18 +312,21 @@ class HistoryManager(object):
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
# starttime = bytes_to_datetime(rv.ContinuationPoint)
starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
ev, cont = self.storage.read_event_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode,
details.Filter)
evts, cont = self.storage.read_event_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode,
details.Filter)
results = []
for ev in evts:
field_list = ua.HistoryEventFieldList()
field_list.EventFields = ev.to_event_fields(details.Filter.SelectClauses)
results.append(field_list)
if cont:
# cont = datetime_to_bytes(dv[-1].ServerTimestamp)
cont = ua.pack_datetime(cont)
return ev, cont
return results, cont
def _get_source_event_data(self, source):
# get all event types which the source node can generate; get the fields of those event types
......
......@@ -2,10 +2,12 @@ import logging
from datetime import timedelta
from datetime import datetime
from threading import Lock
import sqlite3
from opcua import ua
from opcua.common.utils import Buffer
from opcua.common import events
from opcua.server.history import HistoryStorageInterface
import sqlite3
class HistorySQLite(HistoryStorageInterface):
......@@ -66,7 +68,7 @@ class HistorySQLite(HistoryStorageInterface):
datavalue.Value.VariantType.name,
sqlite3.Binary(datavalue.Value.to_binary())
)
)
)
except sqlite3.Error as e:
self.logger.error('Historizing SQL Insert Error for %s: %s', node_id, e)
......@@ -76,7 +78,7 @@ class HistorySQLite(HistoryStorageInterface):
period, count = self._datachanges_period[node_id]
def executeDeleteStatement(condition, args):
query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn = table)
query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
try:
_c_sub.execute(query, args)
......@@ -150,7 +152,7 @@ class HistorySQLite(HistoryStorageInterface):
return results, cont
def new_historized_event(self, source_id, ev_fields, period):
def new_historized_event(self, source_id, ev_fields, period, count):
with self._lock:
_c_new = self._conn.cursor()
......@@ -164,10 +166,9 @@ class HistorySQLite(HistoryStorageInterface):
# note that _Timestamp is for SQL query, _EventTypeName is for debugging, be careful not to create event
# properties with these names
try:
_c_new.execute('CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, '
'_Timestamp TIMESTAMP, '
'_EventTypeName TEXT, '
'{co})'.format(tn=table, co=columns))
_c_new.execute(
'CREATE TABLE "{tn}" (_Id INTEGER PRIMARY KEY NOT NULL, _Timestamp TIMESTAMP, _EventTypeName TEXT, {co})'
.format(tn=table, co=columns))
except sqlite3.Error as e:
self.logger.info('Historizing SQL Table Creation Error for events from %s: %s', source_id, e)
......@@ -184,8 +185,9 @@ class HistorySQLite(HistoryStorageInterface):
# insert the event into the database
try:
_c_sub.execute('INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) '
'VALUES (NULL, "{ts}", "{et}", {pl})'.format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
_c_sub.execute(
'INSERT INTO "{tn}" ("_Id", "_Timestamp", "_EventTypeName", {co}) VALUES (NULL, "{ts}", "{et}", {pl})'
.format(tn=table, co=columns, ts=event.Time, et=event_type, pl=placeholders), evtup)
except sqlite3.Error as e:
self.logger.error('Historizing SQL Insert Error for events from %s: %s', event.SourceNode, e)
......@@ -235,7 +237,7 @@ class HistorySQLite(HistoryStorageInterface):
limit = -1 # in SQLite a LIMIT of -1 returns all results
table = self._get_table_name(source_id)
clauses = self._get_select_clauses(source_id, evfilter)
clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
cont = None
cont_timestamps = []
......@@ -243,25 +245,19 @@ class HistorySQLite(HistoryStorageInterface):
# select events from the database; SQL select clause is built from EventFilter and available fields
try:
for row in _c_read.execute('SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? '
'ORDER BY "_Id" {dir} LIMIT ?'.format(cl=clauses, tn=table, dir=order),
(start_time, end_time, limit,)):
# place all the variants in the event field list object
hist_ev_field_list = ua.HistoryEventFieldList()
i = 0
for field in row:
# if the field is the _Timestamp column store it in a list used for getting the continuation
if i == 0:
cont_timestamps.append(field)
for row in _c_read.execute(
'SELECT "_Timestamp", {cl} FROM "{tn}" WHERE "_Timestamp" BETWEEN ? AND ? ORDER BY "_Id" {dir} LIMIT ?'
.format(cl=clauses_str, tn=table, dir=order), (start_time, end_time, limit)):
fdict = {}
cont_timestamps.append(row[0])
for i, field in enumerate(row[1:]):
if field is not None:
fdict[clauses[i]] = ua.Variant.from_binary(Buffer(field))
else:
if field is not None:
hist_ev_field_list.EventFields.append(ua.Variant.from_binary(Buffer(field)))
else:
hist_ev_field_list.EventFields.append(ua.Variant(None))
i += 1
fdict[clauses[i]] = ua.Variant(None)
results.append(hist_ev_field_list)
results.append(events.EventResult.from_field_dict(fdict))
except sqlite3.Error as e:
self.logger.error('Historizing SQL Read Error events for node %s: %s', source_id, e)
......@@ -279,24 +275,25 @@ class HistorySQLite(HistoryStorageInterface):
def _format_event(self, event_result):
placeholders = []
ev_fields = []
ev_variant_binaries = []
ev_variant_dict = event_result.get_event_props_as_fields_dict()
names = list(ev_variant_dict.keys())
names.sort() # sort alphabatically since dict is not sorted
# split dict into two synchronized lists which will be converted to SQL strings
# note that the variants are converted to binary objects for storing in SQL BLOB format
for field, variant in ev_variant_dict.items():
for name in names:
variant = ev_variant_dict[name]
placeholders.append('?')
ev_fields.append(field)
ev_variant_binaries.append(sqlite3.Binary(variant.to_binary()))
return self._list_to_sql_str(ev_fields), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
return self._list_to_sql_str(names), self._list_to_sql_str(placeholders, False), tuple(ev_variant_binaries)
def _get_event_columns(self, ev_fields):
fields = []
for field in ev_fields:
fields.append(field + ' BLOB')
fields.append(field + ' BLOB')
return self._list_to_sql_str(fields, False)
def _get_select_clauses(self, source_id, evfilter):
......@@ -313,15 +310,8 @@ class HistorySQLite(HistoryStorageInterface):
' Clause: %s:', source_id, select_clause)
# remove select clauses that the event type doesn't have; SQL will error because the column doesn't exist
clauses = [x for x in s_clauses if self._check(source_id, x)]
return self._list_to_sql_str(clauses)
def _check(self, source_id, s_clause):
if s_clause in self._event_fields[source_id]:
return True
else:
return False
clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
return clauses, self._list_to_sql_str(clauses)
def _list_to_sql_str(self, ls, quotes=True):
sql_str = ''
......
......@@ -46,20 +46,6 @@ class HistoryCommon(object):
cls.var.set_value(i)
time.sleep(1)
@classmethod
def create_srv_events(cls):
cls.ev_values = [i for i in range(20)]
cls.srvevgen = cls.srv.get_event_generator()
cls.srv_node = cls.srv.get_node(ua.ObjectIds.Server)
cls.srv.iserver.enable_history_event(cls.srv_node, period=None)
for i in cls.ev_values:
cls.srvevgen.event.Severity = cls.ev_values[i]
cls.srvevgen.trigger(message="test message")
time.sleep(.1)
time.sleep(2)
# no start and no end is not defined by spec, return reverse order
def test_history_var_read_one(self):
# Spec says that at least two parameters should be provided, so
......@@ -162,27 +148,22 @@ class HistoryCommon(object):
self.assertEqual(res[0].Value.Value, self.values[-1])
class TestHistory(unittest.TestCase, HistoryCommon):
@classmethod
def setUpClass(cls):
cls.start_server_and_client()
cls.create_var()
class TestHistoryEvents(object):
@classmethod
def tearDownClass(cls):
cls.stop_server_and_client()
def create_srv_events(cls):
cls.ev_values = [i for i in range(20)]
cls.srvevgen = cls.srv.get_event_generator()
cls.srv_node = cls.srv.get_node(ua.ObjectIds.Server)
cls.srv.iserver.enable_history_event(cls.srv_node, period=None)
class TestHistorySQL(unittest.TestCase, HistoryCommon):
@classmethod
def setUpClass(cls):
cls.start_server_and_client()
cls.srv.iserver.history_manager.set_storage(HistorySQLite(":memory:"))
cls.create_var()
cls.create_srv_events()
for i in cls.ev_values:
cls.srvevgen.event.Severity = cls.ev_values[i]
cls.srvevgen.trigger(message="test message")
time.sleep(.1)
time.sleep(2)
# ~~~~~~~~~~~~~~~~~~~~~~~ events ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# only has end time, should return reverse order
def test_history_ev_read_2_with_end(self):
......@@ -265,9 +246,6 @@ class TestHistorySQL(unittest.TestCase, HistoryCommon):
self.assertEqual(res[-1].Severity, self.ev_values[-1])
self.assertEqual(res[0].Severity, self.ev_values[0])
@classmethod
def tearDownClass(cls):
cls.stop_server_and_client()
class TestHistoryLimitsCommon(unittest.TestCase):
id = ua.NodeId(123)
......@@ -337,3 +315,29 @@ class TestHistoryLimits(TestHistoryLimitsCommon):
class TestHistorySQLLimits(TestHistoryLimitsCommon):
def createHistoryInstance(self):
return HistorySQLite(":memory:")
class TestHistory(unittest.TestCase, HistoryCommon, TestHistoryEvents):
@classmethod
def setUpClass(cls):
cls.start_server_and_client()
cls.create_var()
cls.create_srv_events()
@classmethod
def tearDownClass(cls):
cls.stop_server_and_client()
class TestHistorySQL(unittest.TestCase, HistoryCommon, TestHistoryEvents):
@classmethod
def setUpClass(cls):
cls.start_server_and_client()
cls.srv.iserver.history_manager.set_storage(HistorySQLite(":memory:"))
cls.create_var()
cls.create_srv_events()
@classmethod
def tearDownClass(cls):
cls.stop_server_and_client()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment