Commit 3a3edd05 authored by zerox1212's avatar zerox1212

Events History with Tests

This commit is very functional, but WhereClause is not supported yet.
Default history will fail tests because it raises NotImplemented on
event calls.
parent d538b57a
......@@ -24,7 +24,27 @@ if __name__ == "__main__":
# populating our address space
myobj = objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", ua.Variant(0, ua.VariantType.Double))
myvar.set_writable() # Set MyVariable to be writable by clients
myvar.set_writable() # Set MyVariable to be writable by clients
# Creating a custom event: Approach 1
# The custom event object automatically will have members from its parent (BaseEventType)
etype = server.create_custom_event_type(2, 'MyFirstEvent', ua.ObjectIds.BaseEventType,
[('MyNumericProperty', ua.VariantType.Float),
('MyStringProperty', ua.VariantType.String)])
# create second event
etype2 = server.create_custom_event_type(2, 'MySecondEvent', ua.ObjectIds.BaseEventType,
[('MyOtherProperty', ua.VariantType.Float)])
myevgen = server.get_event_generator(etype, myobj)
myevgen.event.Severity = 500
myevgen.event.MyStringProperty = ua.Variant("hello world")
myevgen2 = server.get_event_generator(etype2, myobj)
myevgen2.event.Severity = 123
myevgen2.event.MyOtherProperty = ua.Variant(1.337)
serverevgen = server.get_event_generator()
serverevgen.event.Severity = 111
# Configure server to use sqlite as history database (default is a simple in memory dict)
server.iserver.history_manager.set_storage(HistorySQLite(":memory:"))
......@@ -33,13 +53,25 @@ if __name__ == "__main__":
server.start()
# enable history for this particular node, must be called after start since it uses subscription
server.iserver.enable_history(myvar, period=None, count=100)
server.iserver.enable_history_var(myvar, period=None, count=100)
# enable history for myobj events
server.iserver.enable_history_event(myobj, period=None)
# enable history for server events
server_node = server.get_node(ua.ObjectIds.Server)
server.iserver.enable_history_event(server_node, period=None)
try:
count = 0
while True:
time.sleep(1)
count += 0.1
myvar.set_value(math.sin(count))
myevgen.trigger(message="This is MyFirstEvent with MyNumericProperty and MyStringProperty.")
myevgen2.trigger(message="This is MySecondEvent with MyOtherProperty.")
serverevgen.trigger(message="Server Event Message")
finally:
#close connection, remove subcsriptions, etc
# close connection, remove subscriptions, etc
server.stop()
......@@ -377,6 +377,51 @@ class Node(object):
result = self.server.history_read(params)[0]
return result
def read_event_history(self, evfilter, starttime=None, endtime=None, numvalues=0):
"""
Read raw history of a node
result code from server is checked and an exception is raised in case of error
If numvalues is > 0 and number of events in period is > numvalues
then result will be truncated
"""
# FIXME event filter must be supplied externally, the problem is the node class doesn't have a way to get
# FIXME another node from the address space as these methods are at the server level, there fore so there is
# FIXME no way to build an event filter here (although it would be nicer for the user)
details = ua.ReadEventDetails()
if starttime:
details.StartTime = starttime
else:
details.StartTime = ua.DateTimeMinValue
if endtime:
details.EndTime = endtime
else:
details.EndTime = ua.DateTimeMinValue
details.NumValuesPerNode = numvalues
details.Filter = evfilter
result = self.history_read_events(details)
return result.HistoryData.Events
def history_read_events(self, details):
"""
Read raw event history of a node, low-level function
result code from server is checked and an exception is raised in case of error
"""
valueid = ua.HistoryReadValueId()
valueid.NodeId = self.nodeid
valueid.IndexRange = ''
params = ua.HistoryReadParameters()
params.HistoryReadDetails = details
params.TimestampsToReturn = ua.TimestampsToReturn.Both
params.ReleaseContinuationPoints = False
params.NodesToRead.append(valueid)
result = self.server.history_read(params)[0]
return result
# Hack for convenience methods
# local import is ugly but necessary for python2 support
# feel fri to propose something better but I want to split all those
......
......@@ -23,19 +23,19 @@ class SubHandler(object):
def datachange_notification(self, node, val, data):
"""
called for every datachange notfication from server
called for every datachange notification from server
"""
pass
def event_notification(self, event):
"""
called for every event notfication from server
called for every event notification from server
"""
pass
def status_change_notification(self, status):
"""
called for every status change notfication from server
called for every status change notification from server
"""
pass
......@@ -52,10 +52,21 @@ class EventResult(object):
return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
__repr__ = __str__
def get_field_variants(self):
field_vars = {}
for key, value in vars(self).items():
if not key.startswith("__") and key is not "server_handle":
if key is "SourceName":
# hack because client is expecting string not byte string?
field_vars[key] = ua.Variant(value.decode(encoding='utf-8'))
else:
field_vars[key] = ua.Variant(value)
return field_vars
class SubscriptionItemData(object):
"""
To store usefull data from a monitored item
To store useful data from a monitored item
"""
def __init__(self):
self.node = None
......@@ -266,7 +277,7 @@ class Subscription(object):
params.ItemsToCreate = monitored_items
params.TimestampsToReturn = ua.TimestampsToReturn.Neither
# insert monitoried item into map to avoid notification arrive before result return
# insert monitored item into map to avoid notification arrive before result return
# server_handle is left as None in purpose as we don't get it yet.
with self._lock:
for mi in monitored_items:
......@@ -319,7 +330,7 @@ def get_event_properties_from_type_node(node):
break
parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
if len(parents) != 1: # Something went wrong
if len(parents) != 1: # Something went wrong
return None
curr_node = parents[0]
......
......@@ -71,7 +71,10 @@ class EventGenerator(object):
#result.StatusCode.check()
def __str__(self):
return "EventGenerator(Type:{}, Source:{}, Time:{}, Message: {})".format(self.EventType, self.SourceNode, self.Time, self.Message)
return "EventGenerator(Type:{}, Source:{}, Time:{}, Message: {})".format(self.event.EventType,
self.event.SourceNode,
self.event.Time,
self.event.Message)
__repr__ = __str__
def trigger(self, time=None, message=None):
......@@ -83,7 +86,7 @@ class EventGenerator(object):
self.event.Time = time
else:
self.event.Time = datetime.utcnow()
self.event.RecieveTime = datetime.utcnow()
self.event.ReceiveTime = datetime.utcnow()
#FIXME: LocalTime is wrong but currently know better. For description s. Part 5 page 18
self.event.LocalTime = datetime.utcnow()
if message:
......
......@@ -4,6 +4,7 @@ from datetime import datetime
from opcua import Subscription
from opcua import ua
from opcua.common import utils
from opcua.common import subscription
class HistoryStorageInterface(object):
......@@ -38,7 +39,7 @@ class HistoryStorageInterface(object):
"""
raise NotImplementedError
def new_historized_event(self, event, period):
def new_historized_event(self, source_id, etype, period):
"""
Called when historization of events is enabled on server side
FIXME: we may need to store events per nodes in future...
......@@ -53,7 +54,7 @@ class HistoryStorageInterface(object):
"""
raise NotImplementedError
def read_event_history(self, start, end, evfilter):
def read_event_history(self, source_id, start, end, nb_values, evfilter):
"""
Called when a client make a history read request for events
Start time and end time are inclusive
......@@ -118,13 +119,13 @@ class HistoryDict(HistoryStorageInterface):
results = results[:nb_values]
return results, cont
def new_historized_event(self, event, period):
def new_historized_event(self, source_id, etype, period):
self._events = []
def save_event(self, event):
raise NotImplementedError
def read_event_history(self, start, end, evfilter):
def read_event_history(self, source_id, start, end, nb_values, evfilter):
raise NotImplementedError
def stop(self):
......@@ -162,7 +163,7 @@ class HistoryManager(object):
params.Priority = 0
return Subscription(self.iserver.isession, params, handler)
def historize(self, node, period=timedelta(days=7), count=0):
def historize_var(self, node, period=timedelta(days=7), count=0):
if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage))
if node in self._handlers:
......@@ -171,6 +172,20 @@ class HistoryManager(object):
handler = self._sub.subscribe_data_change(node)
self._handlers[node] = handler
def historize_event(self, source, period=timedelta(days=7)):
if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage))
if source in self._handlers:
raise ua.UaError("Events from {} are already historized".format(source))
# get the event types the source node generates and a list of all possible event fields
event_types, ev_fields = self._get_source_event_data(source)
self.storage.new_historized_event(source.nodeid, ev_fields, period)
handler = self._sub.subscribe_events(source) # FIXME supply list of event types when master is fixed
self._handlers[source] = handler
def dehistorize(self, node):
self._sub.unsubscribe(self._handlers[node])
del(self._handlers[node])
......@@ -207,9 +222,10 @@ class HistoryManager(object):
result.HistoryData = ua.HistoryEvent()
# FIXME: filter is a cumbersome type, maybe transform it something easier
# to handle for storage
result.HistoryData.Events = self.storage.read_event_history(details.StartTime,
details.EndTime,
details.Filter)
ev, cont = self._read_event_history(rv, details)
result.HistoryData.Events = ev
result.ContinuationPoint = cont
else:
# we do not currently support the other types, clients can process data themselves
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
......@@ -222,7 +238,7 @@ class HistoryManager(object):
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
#starttime = bytes_to_datetime(rv.ContinuationPoint)
# starttime = bytes_to_datetime(rv.ContinuationPoint)
starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
dv, cont = self.storage.read_node_history(rv.NodeId,
......@@ -231,12 +247,44 @@ class HistoryManager(object):
details.NumValuesPerNode)
if cont:
# cont = datetime_to_bytes(dv[-1].ServerTimestamp)
cont = ua.pack_datetime(dv[-1].ServerTimestamp)
# FIXME, parse index range and filter out if necessary
cont = ua.pack_datetime(cont)
# rv.IndexRange
# rv.DataEncoding # xml or binary, seems spec say we can ignore that one
return dv, cont
def _read_event_history(self, rv, details):
starttime = details.StartTime
if rv.ContinuationPoint:
# Spec says we should ignore details if cont point is present
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
# starttime = bytes_to_datetime(rv.ContinuationPoint)
starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
ev, cont = self.storage.read_event_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode,
details.Filter)
if cont:
# cont = datetime_to_bytes(dv[-1].ServerTimestamp)
cont = ua.pack_datetime(cont)
return ev, cont
def _get_source_event_data(self, source):
# get all event types which the source node can generate; get the fields of those event types
event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
ev_aggregate_fields = []
for event_type in event_types:
ev_aggregate_fields.extend((subscription.get_event_properties_from_type_node(event_type)))
ev_fields = []
for field in set(ev_aggregate_fields):
ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
return event_types, ev_fields
def update_history(self, params):
"""
Update history for a node
......
This diff is collapsed.
......@@ -153,16 +153,16 @@ class InternalServer(object):
def create_session(self, name, user=User.Anonymous, external=False):
return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
def enable_history(self, node, period=timedelta(days=7), count=0):
def enable_history_var(self, node, period=timedelta(days=7), count=0):
"""
Set attribute Historizing of node to True and start storing data for history
"""
node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
self.history_manager.historize(node, period, count)
self.history_manager.historize_var(node, period, count)
def disable_history(self, node):
def disable_history_var(self, node):
"""
Set attribute Historizing of node to False and stop storing data for history
"""
......@@ -171,6 +171,24 @@ class InternalServer(object):
node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
self.history_manager.dehistorize(node)
def enable_history_event(self, source, period=timedelta(days=7)):
"""
Set attribute History Read of object events to True and start storing data for history
"""
# to historize events of an object, first check if object supports events
source_event_notifier = source.get_attribute(ua.AttributeIds.EventNotifier)
if source_event_notifier.Value.Value & 1 == 1: # check bit 0
# if it supports events, turn on bit 2 (enables history read)
source.set_attr_bit(ua.AttributeIds.EventNotifier, 2)
# send the object to history manager
self.history_manager.historize_event(source, period)
def disable_history_event(self, source):
"""
Set attribute History Read of node to False and stop storing data for history
"""
source.unset_attr_bit(ua.AttributeIds.EventNotifier, 2)
self.history_manager.dehistorize(source)
class InternalSession(object):
......
......@@ -91,7 +91,11 @@ class MonitoredItemService(object):
if params.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
if self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value != 1:
ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
if ev_notify_byte is not None:
if ev_notify_byte & 1 == 0:
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
else:
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
result.FilterResult = ua.EventFilterResult()
for _ in params.RequestedParameters.Filter.SelectClauses:
......
'''
"""
Example auto_generated file with UA Types
For now only events!
'''
"""
from opcua.ua import *
# TODO: This should be autogeneratd form XML description of EventTypes
# TODO: This should be auto generated form XML description of EventTypes
class BaseEvent(FrozenClass):
'''
"""
BaseEvent implements BaseEventType from which inherit all other events and it is used per default.
'''
"""
def __init__(self, sourcenode=None, message=None, severity=1, extended=False):
self.EventId = bytes()
self.EventType = NodeId(ObjectIds.BaseEventType)
self.SourceNode = sourcenode
self.SourceName = None
self.Time = None
self.RecieveTime = None
self.ReceiveTime = None
self.LocalTime = None
self.Message = LocalizedText(message)
self.Severity = Variant(severity, VariantType.UInt16)
......@@ -31,9 +31,9 @@ class BaseEvent(FrozenClass):
class AuditEvent(BaseEvent):
'''
"""
Audit implements AuditEventType from which inherit all other Audit events.
'''
"""
def __init__(self, sourcenode=None, message=None, severity=1, extended=False):
super(AuditEvent, self).__init__(sourcenode, message, severity, True)
self.EventType = NodeId(ObjectIds.AuditEventType)
......
This diff is collapsed.
......@@ -129,14 +129,21 @@ class TestServer(unittest.TestCase, CommonTests):
val = v.get_value()
self.assertEqual(val, "StringValue")
def test_historize(self):
def test_historize_variable(self):
o = self.opc.get_objects_node()
var = o.add_variable(3, "test_hist", 1.0)
self.srv.iserver.enable_history(var, timedelta(days=1))
self.srv.iserver.enable_history_var(var, timedelta(days=1))
time.sleep(1)
var.set_value(2.0)
var.set_value(3.0)
self.srv.iserver.disable_history(var)
self.srv.iserver.disable_history_var(var)
def test_historize_events(self):
srv_node = self.srv.get_node(ua.ObjectIds.Server)
srvevgen = self.srv.get_event_generator()
self.srv.iserver.enable_history_event(srv_node, period=None)
srvevgen.trigger(message="Message")
self.srv.iserver.disable_history_event(srv_node)
def test_references_for_added_nodes_method(self):
objects = self.opc.get_objects_node()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment