Commit 4f8f92dc authored by olivier R-D's avatar olivier R-D

event history api tweak

take event as arg not evfilter
return EventResult obj and not fields
update command line to read events
parent db7296de
......@@ -47,7 +47,7 @@ if __name__ == "__main__":
serverevgen.event.Severity = 111
# Configure server to use sqlite as history database (default is a simple in memory dict)
server.iserver.history_manager.set_storage(HistorySQLite(":memory:"))
server.iserver.history_manager.set_storage(HistorySQLite("my_event_history.sql"))
# starting!
server.start()
......
from opcua import ua
class EventResult(object):
"""
To be sent to clients for every events from server
"""
def __init__(self):
self.server_handle = None
def __str__(self):
return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
__repr__ = __str__
def get_event_props_as_fields_dict(self):
"""
convert all properties of the EventResult class to a dict of variants
"""
field_vars = {}
for key, value in vars(self).items():
if not key.startswith("__") and key is not "server_handle":
field_vars[key] = ua.Variant(value)
return field_vars
def event_obj_from_event_fields(select_clauses, fields):
result = EventResult()
for idx, sattr in enumerate(select_clauses):
if len(sattr.BrowsePath) == 0:
setattr(result, sattr.AttributeId.name, fields[idx].Value)
else:
setattr(result, sattr.BrowsePath[0].Name, fields[idx].Value)
return result
def get_filter_from_event_type(eventtype):
evfilter = ua.EventFilter()
evfilter.SelectClauses = select_clauses_from_evtype(eventtype)
evfilter.WhereClause = where_clause_from_evtype(eventtype)
return evfilter
def select_clauses_from_evtype(evtype):
clauses = []
for prop in get_event_properties_from_type_node(evtype):
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = evtype.nodeid
op.AttributeId = ua.AttributeIds.Value
op.BrowsePath = [prop.get_browse_name()]
clauses.append(op)
return clauses
def where_clause_from_evtype(evtype):
cf = ua.ContentFilter()
el = ua.ContentFilterElement()
# operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = evtype.nodeid
op.BrowsePath.append(ua.QualifiedName("EventType", 0))
op.AttributeId = ua.AttributeIds.Value
el.FilterOperands.append(op)
for subtypeid in [st.nodeid for st in get_node_subtypes(evtype)]:
op = ua.LiteralOperand()
op.Value = ua.Variant(subtypeid)
el.FilterOperands.append(op)
el.FilterOperator = ua.FilterOperator.InList
cf.Elements.append(el)
return cf
def get_node_subtypes(node, nodes=None):
if nodes is None:
nodes = [node]
for child in node.get_children(refs=ua.ObjectIds.HasSubtype):
nodes.append(child)
get_node_subtypes(child, nodes)
return nodes
def get_event_properties_from_type_node(node):
properties = []
curr_node = node
while True:
properties.extend(curr_node.get_properties())
if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
break
parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
if len(parents) != 1: # Something went wrong
return None
curr_node = parents[0]
return properties
......@@ -3,9 +3,8 @@ High level node object, to access node attribute
and browse address space
"""
from datetime import datetime
from opcua import ua
from opcua.common import events
class Node(object):
......@@ -21,7 +20,9 @@ class Node(object):
def __init__(self, server, nodeid):
self.server = server
self.nodeid = None
if isinstance(nodeid, ua.NodeId):
if isinstance(nodeid, Node):
self.nodeid = nodeid.nodeid
elif isinstance(nodeid, ua.NodeId):
self.nodeid = nodeid
elif type(nodeid) in (str, bytes):
self.nodeid = ua.NodeId.from_string(nodeid)
......@@ -381,9 +382,9 @@ class Node(object):
result = self.server.history_read(params)[0]
return result
def read_event_history(self, evfilter, starttime=None, endtime=None, numvalues=0):
def read_event_history(self, starttime=None, endtime=None, numvalues=0, evtype=ua.ObjectIds.BaseEventType):
"""
Read event history of a source node based on supplied UA EventFilter
Read event history of a source node
result code from server is checked and an exception is raised in case of error
If numvalues is > 0 and number of events in period is > numvalues
then result will be truncated
......@@ -404,10 +405,14 @@ class Node(object):
details.EndTime = ua.DateTimeMinValue
details.NumValuesPerNode = numvalues
evfilter = events.get_filter_from_event_type(Node(self.server, evtype))
details.Filter = evfilter
result = self.history_read_events(details)
return result.HistoryData.Events
event_res = []
for res in result.HistoryData.Events:
event_res.append(events.event_obj_from_event_fields(evfilter.SelectClauses, res.EventFields))
return event_res
def history_read_events(self, details):
"""
......
......@@ -6,6 +6,7 @@ import logging
from threading import Lock
from opcua import ua
from opcua.common import events
from opcua import Node
......@@ -40,29 +41,6 @@ class SubHandler(object):
pass
class EventResult(object):
"""
To be sent to clients for every events from server
"""
def __init__(self):
self.server_handle = None
def __str__(self):
return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
__repr__ = __str__
def get_event_props_as_fields_dict(self):
"""
convert all properties of the EventResult class to a dict of variants
"""
field_vars = {}
for key, value in vars(self).items():
if not key.startswith("__") and key is not "server_handle":
field_vars[key] = ua.Variant(value)
return field_vars
class SubscriptionItemData(object):
"""
To store useful data from a monitored item
......@@ -164,13 +142,8 @@ class Subscription(object):
for event in eventlist.Events:
with self._lock:
data = self._monitoreditems_map[event.ClientHandle]
result = EventResult()
result = events.event_obj_from_event_fields(data.mfilter.SelectClauses, event.EventFields)
result.server_handle = data.server_handle
for idx, sattr in enumerate(data.mfilter.SelectClauses):
if len(sattr.BrowsePath) == 0:
setattr(result, sattr.AttributeId.name, event.EventFields[idx].Value)
else:
setattr(result, sattr.BrowsePath[0].Name, event.EventFields[idx].Value)
if hasattr(self._handler, "event_notification"):
try:
self._handler.event_notification(result)
......@@ -199,58 +172,6 @@ class Subscription(object):
"""
return self._subscribe(nodes, attr, queuesize=0)
def _get_node(self, nodeid):
if isinstance(nodeid, ua.NodeId):
node = Node(self.server, nodeid)
elif isinstance(nodeid, Node):
node = nodeid
else:
node = Node(self.server, ua.NodeId(nodeid))
return node
def _get_filter_from_event_type(self, eventtype):
eventtype = self._get_node(eventtype)
evfilter = ua.EventFilter()
evfilter.SelectClauses = self._select_clauses_from_evtype(eventtype)
evfilter.WhereClause = self._where_clause_from_evtype(eventtype)
return evfilter
def _select_clauses_from_evtype(self, evtype):
clauses = []
for prop in get_event_properties_from_type_node(evtype):
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = evtype.nodeid
op.AttributeId = ua.AttributeIds.Value
op.BrowsePath = [prop.get_browse_name()]
clauses.append(op)
return clauses
def _where_clause_from_evtype(self, evtype):
cf = ua.ContentFilter()
el = ua.ContentFilterElement()
# operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = evtype.nodeid
op.BrowsePath.append(ua.QualifiedName("EventType", 0))
op.AttributeId = ua.AttributeIds.Value
el.FilterOperands.append(op)
for subtypeid in [st.nodeid for st in self._get_subtypes(evtype)]:
op = ua.LiteralOperand()
op.Value = ua.Variant(subtypeid)
el.FilterOperands.append(op)
el.FilterOperator = ua.FilterOperator.InList
cf.Elements.append(el)
return cf
def _get_subtypes(self, parent, nodes=None):
if nodes is None:
nodes = [parent]
for child in parent.get_children(refs=ua.ObjectIds.HasSubtype):
nodes.append(child)
self._get_subtypes(child, nodes)
return nodes
def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType, evfilter=None):
"""
Subscribe to events from a node. Default node is Server node.
......@@ -258,9 +179,9 @@ class Subscription(object):
if evfilter is provided, evtype is ignored
Return a handle which can be used to unsubscribe
"""
sourcenode = self._get_node(sourcenode)
sourcenode = Node(self.server, sourcenode)
if evfilter is None:
evfilter = self._get_filter_from_event_type(evtype)
evfilter = events.get_filter_from_event_type(Node(self.server, evtype))
return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
......@@ -352,19 +273,3 @@ class Subscription(object):
return
def get_event_properties_from_type_node(node):
properties = []
curr_node = node
while True:
properties.extend(curr_node.get_properties())
if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
break
parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
if len(parents) != 1: # Something went wrong
return None
curr_node = parents[0]
return properties
......@@ -199,6 +199,3 @@ class ThreadLoop(threading.Thread):
p = functools.partial(self._run_until_complete, future, coro)
self.loop.call_soon_threadsafe(p)
return future.result()
......@@ -5,7 +5,7 @@ from datetime import datetime
from opcua import Subscription
from opcua import ua
from opcua.common import utils
from opcua.common import subscription
from opcua.common import events
class HistoryStorageInterface(object):
......@@ -295,7 +295,7 @@ class HistoryManager(object):
ev_aggregate_fields = []
for event_type in event_types:
ev_aggregate_fields.extend((subscription.get_event_properties_from_type_node(event_type)))
ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
ev_fields = []
for field in set(ev_aggregate_fields):
......
import logging
import sys
import argparse
from datetime import datetime
from datetime import datetime, timedelta
import math
import time
......@@ -600,8 +600,10 @@ def print_history(o):
print("{:30} {:10} {}".format(str(d.SourceTimestamp), d.StatusCode.name, d.Value))
def str_to_datetime(s):
def str_to_datetime(s, default=None):
if not s:
if default is not None:
return default
return datetime.utcnow()
# FIXME: try different datetime formats
for fmt in ["%Y-%m-%d", "%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S"]:
......@@ -615,11 +617,20 @@ def uahistoryread():
parser = argparse.ArgumentParser(description="Read history of a node")
add_common_args(parser)
parser.add_argument("--starttime",
default="",
help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
default=None,
help="Start time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time - one day")
parser.add_argument("--endtime",
default="",
default=None,
help="End time, formatted as YYYY-MM-DD [HH:MM[:SS]]. Default: current time")
parser.add_argument("-e",
"--events",
action="store_true",
help="Read event history instead of data change history")
parser.add_argument("-l",
"--limit",
type=int,
default=10,
help="Maximum number of notfication to return")
args = parse_args(parser, requirenodeid=True)
......@@ -628,10 +639,15 @@ def uahistoryread():
client.connect()
try:
node = get_node(client, args)
starttime = str_to_datetime(args.starttime)
endtime = str_to_datetime(args.endtime)
starttime = str_to_datetime(args.starttime, datetime.utcnow() - timedelta(days=1))
endtime = str_to_datetime(args.endtime, datetime.utcnow())
print("Reading raw history of node {} at {}; start at {}, end at {}\n".format(node, args.url, starttime, endtime))
print_history(node.read_raw_history(starttime, endtime))
if args.events:
evs = node.read_event_history(starttime, endtime, numvalues=args.limit)
for ev in evs:
print(ev)
else:
print_history(node.read_raw_history(starttime, endtime, numvalues=args.limit))
finally:
client.disconnect()
sys.exit(0)
......@@ -11,7 +11,7 @@ from opcua.server.history import HistoryDict
from tests_common import CommonTests, add_server_methods
from opcua.common.subscription import get_event_properties_from_type_node as get_props
from opcua.common.events import get_event_properties_from_type_node as get_props
port_num1 = 48530
port_num2 = 48530
......@@ -51,24 +51,6 @@ class HistoryCommon(object):
cls.ev_values = [i for i in range(20)]
cls.srvevgen = cls.srv.get_event_generator()
# for now we need to build our own filter explicitly because node class can't do it
cls.evfilter = ua.EventFilter()
ev_type_node = cls.srv.get_node(cls.srvevgen.event.EventType)
for ev_property in get_props(ev_type_node):
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = ev_type_node.nodeid
op.AttributeId = ua.AttributeIds.Value
op.BrowsePath = [ev_property.get_browse_name()]
cls.evfilter.SelectClauses.append(op)
cls.revevfilter = ua.EventFilter()
for ev_property in reversed(get_props(ev_type_node)):
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = ev_type_node.nodeid
op.AttributeId = ua.AttributeIds.Value
op.BrowsePath = [ev_property.get_browse_name()]
cls.revevfilter.SelectClauses.append(op)
cls.srv_node = cls.srv.get_node(ua.ObjectIds.Server)
cls.srv.iserver.enable_history_event(cls.srv_node, period=None)
......@@ -207,81 +189,81 @@ class TestHistorySQL(unittest.TestCase, HistoryCommon):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.srv_node.read_event_history(self.evfilter, None, now, 2)
res = self.srv_node.read_event_history(None, now, 2)
self.assertEqual(len(res), 2)
self.assertEqual(res[-1].EventFields[8].Value, self.ev_values[-2])
self.assertEqual(res[-1].Severity, self.ev_values[-2])
# both start and end time, return from start to end
def test_history_ev_read_all(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.srv_node.read_event_history(self.evfilter, old, now, 0)
res = self.srv_node.read_event_history(old, now, 0)
self.assertEqual(len(res), 20)
self.assertEqual(res[-1].EventFields[8].Value, self.ev_values[-1])
self.assertEqual(res[0].EventFields[8].Value, self.ev_values[0])
self.assertEqual(res[-1].Severity, self.ev_values[-1])
self.assertEqual(res[0].Severity, self.ev_values[0])
def test_history_ev_read_5_in_timeframe(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.srv_node.read_event_history(self.evfilter, old, now, 5)
res = self.srv_node.read_event_history(old, now, 5)
self.assertEqual(len(res), 5)
self.assertEqual(res[-1].EventFields[8].Value, self.ev_values[4])
self.assertEqual(res[0].EventFields[8].Value, self.ev_values[0])
self.assertEqual(res[-1].Severity, self.ev_values[4])
self.assertEqual(res[0].Severity, self.ev_values[0])
# start time greater than end time, should return reverse order
def test_history_ev_read_5_in_timeframe_start_greater_than_end(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.srv_node.read_event_history(self.evfilter, now, old, 5)
res = self.srv_node.read_event_history(now, old, 5)
self.assertEqual(len(res), 5)
self.assertEqual(res[-1].EventFields[8].Value, self.ev_values[-5])
self.assertEqual(res[0].EventFields[8].Value, self.ev_values[-1])
self.assertEqual(res[-1].Severity, self.ev_values[-5])
self.assertEqual(res[0].Severity, self.ev_values[-1])
# only start return original order
def test_history_ev_read_6_with_start(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.srv_node.read_event_history(self.evfilter, old, None, 6)
res = self.srv_node.read_event_history(old, None, 6)
self.assertEqual(len(res), 6)
self.assertEqual(res[-1].EventFields[8].Value, self.ev_values[5])
self.assertEqual(res[0].EventFields[8].Value, self.ev_values[0])
self.assertEqual(res[-1].Severity, self.ev_values[5])
self.assertEqual(res[0].Severity, self.ev_values[0])
# only start return original order
def test_history_ev_read_all_with_start(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.srv_node.read_event_history(self.evfilter, old, None, 0)
res = self.srv_node.read_event_history(old, None, 0)
self.assertEqual(len(res), 20)
self.assertEqual(res[-1].EventFields[8].Value, self.ev_values[-1])
self.assertEqual(res[0].EventFields[8].Value, self.ev_values[0])
self.assertEqual(res[-1].Severity, self.ev_values[-1])
self.assertEqual(res[0].Severity, self.ev_values[0])
# only end return reversed order
def test_history_ev_read_all_with_end(self):
end = datetime.utcnow() + timedelta(days=6)
res = self.srv_node.read_event_history(self.evfilter, None, end, 0)
res = self.srv_node.read_event_history(None, end, 0)
self.assertEqual(len(res), 20)
self.assertEqual(res[-1].EventFields[8].Value, self.ev_values[0])
self.assertEqual(res[0].EventFields[8].Value, self.ev_values[-1])
self.assertEqual(res[-1].Severity, self.ev_values[0])
self.assertEqual(res[0].Severity, self.ev_values[-1])
# only end return reversed order
def test_history_ev_read_3_with_end(self):
end = datetime.utcnow() + timedelta(days=6)
res = self.srv_node.read_event_history(self.evfilter, None, end, 3)
res = self.srv_node.read_event_history(None, end, 3)
self.assertEqual(len(res), 3)
self.assertEqual(res[2].EventFields[8].Value, self.ev_values[-3])
self.assertEqual(res[0].EventFields[8].Value, self.ev_values[-1])
self.assertEqual(res[2].Severity, self.ev_values[-3])
self.assertEqual(res[0].Severity, self.ev_values[-1])
# reverse event filter select clauses and test that results match the filter order
def test_history_ev_read_all_filter_order_reversed(self):
now = datetime.utcnow()
old = now - timedelta(days=6)
res = self.srv_node.read_event_history(self.revevfilter, old, None, 0)
res = self.srv_node.read_event_history(old, None, 0)
self.assertEqual(len(res), 20)
self.assertEqual(res[-1].EventFields[0].Value, self.ev_values[-1])
self.assertEqual(res[0].EventFields[0].Value, self.ev_values[0])
self.assertEqual(res[-1].Severity, self.ev_values[-1])
self.assertEqual(res[0].Severity, self.ev_values[0])
@classmethod
def tearDownClass(cls):
......
......@@ -325,14 +325,14 @@ class SubscriptionTests(object):
def test_get_event_from_type_node_BaseEvent(self):
etype = self.opc.get_node(ua.ObjectIds.BaseEventType)
properties = opcua.common.subscription.get_event_properties_from_type_node(etype)
properties = opcua.common.events.get_event_properties_from_type_node(etype)
for child in etype.get_properties():
self.assertTrue(child in properties)
def test_get_event_from_type_node_CustomEvent(self):
etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.AuditEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
properties = opcua.common.subscription.get_event_properties_from_type_node(etype)
properties = opcua.common.events.get_event_properties_from_type_node(etype)
for child in self.opc.get_node(ua.ObjectIds.BaseEventType).get_properties():
self.assertTrue(child in properties)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment