Commit 98e4179e authored by ORD's avatar ORD

Merge pull request #180 from zerox1212/event_history

Events History with Tests
parents d538b57a e961b9a1
......@@ -24,7 +24,27 @@ if __name__ == "__main__":
# populating our address space
myobj = objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", ua.Variant(0, ua.VariantType.Double))
myvar.set_writable() # Set MyVariable to be writable by clients
myvar.set_writable() # Set MyVariable to be writable by clients
# Creating a custom event: Approach 1
# The custom event object automatically will have members from its parent (BaseEventType)
etype = server.create_custom_event_type(2, 'MyFirstEvent', ua.ObjectIds.BaseEventType,
[('MyNumericProperty', ua.VariantType.Float),
('MyStringProperty', ua.VariantType.String)])
# create second event
etype2 = server.create_custom_event_type(2, 'MySecondEvent', ua.ObjectIds.BaseEventType,
[('MyOtherProperty', ua.VariantType.Float)])
myevgen = server.get_event_generator(etype, myobj)
myevgen.event.Severity = 500
myevgen.event.MyStringProperty = ua.Variant("hello world")
myevgen2 = server.get_event_generator(etype2, myobj)
myevgen2.event.Severity = 123
myevgen2.event.MyOtherProperty = ua.Variant(1.337)
serverevgen = server.get_event_generator()
serverevgen.event.Severity = 111
# Configure server to use sqlite as history database (default is a simple in memory dict)
server.iserver.history_manager.set_storage(HistorySQLite(":memory:"))
......@@ -33,13 +53,25 @@ if __name__ == "__main__":
server.start()
# enable history for this particular node, must be called after start since it uses subscription
server.iserver.enable_history(myvar, period=None, count=100)
server.iserver.enable_history_data_change(myvar, period=None, count=100)
# enable history for myobj events
server.iserver.enable_history_event(myobj, period=None)
# enable history for server events
server_node = server.get_node(ua.ObjectIds.Server)
server.iserver.enable_history_event(server_node, period=None)
try:
count = 0
while True:
time.sleep(1)
count += 0.1
myvar.set_value(math.sin(count))
myevgen.trigger(message="This is MyFirstEvent with MyNumericProperty and MyStringProperty.")
myevgen2.trigger(message="This is MySecondEvent with MyOtherProperty.")
serverevgen.trigger(message="Server Event Message")
finally:
#close connection, remove subcsriptions, etc
# close connection, remove subscriptions, etc
server.stop()
......@@ -377,6 +377,51 @@ class Node(object):
result = self.server.history_read(params)[0]
return result
def read_event_history(self, evfilter, starttime=None, endtime=None, numvalues=0):
"""
Read event history of a source node based on supplied UA EventFilter
result code from server is checked and an exception is raised in case of error
If numvalues is > 0 and number of events in period is > numvalues
then result will be truncated
"""
# FIXME event filter must be supplied externally, the problem is the node class doesn't have a way to get
# FIXME another node from the address space as these methods are at the server level, therefore there is
# FIXME no way to build an event filter here (although it could be nicer for a user who doesn't want a filter)
details = ua.ReadEventDetails()
if starttime:
details.StartTime = starttime
else:
details.StartTime = ua.DateTimeMinValue
if endtime:
details.EndTime = endtime
else:
details.EndTime = ua.DateTimeMinValue
details.NumValuesPerNode = numvalues
details.Filter = evfilter
result = self.history_read_events(details)
return result.HistoryData.Events
def history_read_events(self, details):
"""
Read event history of a node, low-level function
result code from server is checked and an exception is raised in case of error
"""
valueid = ua.HistoryReadValueId()
valueid.NodeId = self.nodeid
valueid.IndexRange = ''
params = ua.HistoryReadParameters()
params.HistoryReadDetails = details
params.TimestampsToReturn = ua.TimestampsToReturn.Both
params.ReleaseContinuationPoints = False
params.NodesToRead.append(valueid)
result = self.server.history_read(params)[0]
return result
# Hack for convenience methods
# local import is ugly but necessary for python2 support
# feel fri to propose something better but I want to split all those
......
......@@ -23,19 +23,19 @@ class SubHandler(object):
def datachange_notification(self, node, val, data):
"""
called for every datachange notfication from server
called for every datachange notification from server
"""
pass
def event_notification(self, event):
"""
called for every event notfication from server
called for every event notification from server
"""
pass
def status_change_notification(self, status):
"""
called for every status change notfication from server
called for every status change notification from server
"""
pass
......@@ -52,10 +52,20 @@ class EventResult(object):
return "EventResult({})".format([str(k) + ":" + str(v) for k, v in self.__dict__.items()])
__repr__ = __str__
def get_event_props_as_fields_dict(self):
"""
convert all properties of the EventResult class to a dict of variants
"""
field_vars = {}
for key, value in vars(self).items():
if not key.startswith("__") and key is not "server_handle":
field_vars[key] = ua.Variant(value)
return field_vars
class SubscriptionItemData(object):
"""
To store usefull data from a monitored item
To store useful data from a monitored item
"""
def __init__(self):
self.node = None
......@@ -74,7 +84,7 @@ class DataChangeNotif(object):
self.subscription_data = subscription_data
def __str__(self):
return "DataChangeNotfication({}, {})".format(self.subscription_data, self.monitored_item)
return "DataChangeNotification({}, {})".format(self.subscription_data, self.monitored_item)
__repr__ = __str__
......@@ -266,7 +276,7 @@ class Subscription(object):
params.ItemsToCreate = monitored_items
params.TimestampsToReturn = ua.TimestampsToReturn.Neither
# insert monitoried item into map to avoid notification arrive before result return
# insert monitored item into map to avoid notification arrive before result return
# server_handle is left as None in purpose as we don't get it yet.
with self._lock:
for mi in monitored_items:
......@@ -319,7 +329,7 @@ def get_event_properties_from_type_node(node):
break
parents = curr_node.get_referenced_nodes(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse, includesubtypes=False)
if len(parents) != 1: # Something went wrong
if len(parents) != 1: # Something went wrong
return None
curr_node = parents[0]
......
......@@ -71,7 +71,10 @@ class EventGenerator(object):
#result.StatusCode.check()
def __str__(self):
return "EventGenerator(Type:{}, Source:{}, Time:{}, Message: {})".format(self.EventType, self.SourceNode, self.Time, self.Message)
return "EventGenerator(Type:{}, Source:{}, Time:{}, Message: {})".format(self.event.EventType,
self.event.SourceNode,
self.event.Time,
self.event.Message)
__repr__ = __str__
def trigger(self, time=None, message=None):
......@@ -83,7 +86,7 @@ class EventGenerator(object):
self.event.Time = time
else:
self.event.Time = datetime.utcnow()
self.event.RecieveTime = datetime.utcnow()
self.event.ReceiveTime = datetime.utcnow()
#FIXME: LocalTime is wrong but currently know better. For description s. Part 5 page 18
self.event.LocalTime = datetime.utcnow()
if message:
......
import logging
from datetime import timedelta
from datetime import datetime
from opcua import Subscription
from opcua import ua
from opcua.common import utils
from opcua.common import subscription
class HistoryStorageInterface(object):
......@@ -38,7 +40,7 @@ class HistoryStorageInterface(object):
"""
raise NotImplementedError
def new_historized_event(self, event, period):
def new_historized_event(self, source_id, etype, period):
"""
Called when historization of events is enabled on server side
FIXME: we may need to store events per nodes in future...
......@@ -53,7 +55,7 @@ class HistoryStorageInterface(object):
"""
raise NotImplementedError
def read_event_history(self, start, end, evfilter):
def read_event_history(self, source_id, start, end, nb_values, evfilter):
"""
Called when a client make a history read request for events
Start time and end time are inclusive
......@@ -118,13 +120,13 @@ class HistoryDict(HistoryStorageInterface):
results = results[:nb_values]
return results, cont
def new_historized_event(self, event, period):
def new_historized_event(self, source_id, etype, period):
self._events = []
def save_event(self, event):
raise NotImplementedError
def read_event_history(self, start, end, evfilter):
def read_event_history(self, source_id, start, end, nb_values, evfilter):
raise NotImplementedError
def stop(self):
......@@ -144,12 +146,16 @@ class SubHandler(object):
class HistoryManager(object):
def __init__(self, iserver):
self.logger = logging.getLogger(__name__)
self.iserver = iserver
self.storage = HistoryDict()
self._sub = None
self._handlers = {}
def set_storage(self, storage):
"""
set the desired HistoryStorageInterface which History Manager will use for historizing
"""
self.storage = storage
def _create_subscription(self, handler):
......@@ -162,7 +168,10 @@ class HistoryManager(object):
params.Priority = 0
return Subscription(self.iserver.isession, params, handler)
def historize(self, node, period=timedelta(days=7), count=0):
def historize_data_change(self, node, period=timedelta(days=7), count=0):
"""
subscribe to the nodes' data changes and store the data in the active storage
"""
if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage))
if node in self._handlers:
......@@ -171,9 +180,32 @@ class HistoryManager(object):
handler = self._sub.subscribe_data_change(node)
self._handlers[node] = handler
def historize_event(self, source, period=timedelta(days=7)):
"""
subscribe to the source nodes' events and store the data in the active storage; custom event properties included
"""
if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage))
if source in self._handlers:
raise ua.UaError("Events from {} are already historized".format(source))
# get the event types the source node generates and a list of all possible event fields
event_types, ev_fields = self._get_source_event_data(source)
self.storage.new_historized_event(source.nodeid, ev_fields, period)
handler = self._sub.subscribe_events(source) # FIXME supply list of event types when master is fixed
self._handlers[source] = handler
def dehistorize(self, node):
self._sub.unsubscribe(self._handlers[node])
del(self._handlers[node])
"""
remove subscription to the node/source which is being historized
"""
if node in self._handlers:
self._sub.unsubscribe(self._handlers[node])
del(self._handlers[node])
else:
self.logger.error("History Manager isn't subscribed to %s", node)
def read_history(self, params):
"""
......@@ -190,7 +222,7 @@ class HistoryManager(object):
def _read_history(self, details, rv):
"""
read history for a node
determine if the history read is for a data changes or events; then read the history for that node
"""
result = ua.HistoryReadResult()
if isinstance(details, ua.ReadRawModifiedDetails):
......@@ -207,9 +239,10 @@ class HistoryManager(object):
result.HistoryData = ua.HistoryEvent()
# FIXME: filter is a cumbersome type, maybe transform it something easier
# to handle for storage
result.HistoryData.Events = self.storage.read_event_history(details.StartTime,
details.EndTime,
details.Filter)
ev, cont = self._read_event_history(rv, details)
result.HistoryData.Events = ev
result.ContinuationPoint = cont
else:
# we do not currently support the other types, clients can process data themselves
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
......@@ -222,7 +255,7 @@ class HistoryManager(object):
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
#starttime = bytes_to_datetime(rv.ContinuationPoint)
# starttime = bytes_to_datetime(rv.ContinuationPoint)
starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
dv, cont = self.storage.read_node_history(rv.NodeId,
......@@ -231,12 +264,44 @@ class HistoryManager(object):
details.NumValuesPerNode)
if cont:
# cont = datetime_to_bytes(dv[-1].ServerTimestamp)
cont = ua.pack_datetime(dv[-1].ServerTimestamp)
# FIXME, parse index range and filter out if necessary
cont = ua.pack_datetime(cont)
# rv.IndexRange
# rv.DataEncoding # xml or binary, seems spec say we can ignore that one
return dv, cont
def _read_event_history(self, rv, details):
starttime = details.StartTime
if rv.ContinuationPoint:
# Spec says we should ignore details if cont point is present
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
# starttime = bytes_to_datetime(rv.ContinuationPoint)
starttime = ua.unpack_datetime(utils.Buffer(rv.ContinuationPoint))
ev, cont = self.storage.read_event_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode,
details.Filter)
if cont:
# cont = datetime_to_bytes(dv[-1].ServerTimestamp)
cont = ua.pack_datetime(cont)
return ev, cont
def _get_source_event_data(self, source):
# get all event types which the source node can generate; get the fields of those event types
event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
ev_aggregate_fields = []
for event_type in event_types:
ev_aggregate_fields.extend((subscription.get_event_properties_from_type_node(event_type)))
ev_fields = []
for field in set(ev_aggregate_fields):
ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
return event_types, ev_fields
def update_history(self, params):
"""
Update history for a node
......@@ -252,4 +317,7 @@ class HistoryManager(object):
return results
def stop(self):
"""
call stop methods of active storage interface whenever the server is stopped
"""
self.storage.stop()
This diff is collapsed.
......@@ -153,16 +153,16 @@ class InternalServer(object):
def create_session(self, name, user=User.Anonymous, external=False):
return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
def enable_history(self, node, period=timedelta(days=7), count=0):
def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
"""
Set attribute Historizing of node to True and start storing data for history
"""
node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
self.history_manager.historize(node, period, count)
self.history_manager.historize_data_change(node, period, count)
def disable_history(self, node):
def disable_history_data_change(self, node):
"""
Set attribute Historizing of node to False and stop storing data for history
"""
......@@ -171,6 +171,24 @@ class InternalServer(object):
node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
self.history_manager.dehistorize(node)
def enable_history_event(self, source, period=timedelta(days=7)):
"""
Set attribute History Read of object events to True and start storing data for history
"""
# to historize events of an object, first check if object supports events
source_event_notifier = source.get_attribute(ua.AttributeIds.EventNotifier)
if source_event_notifier.Value.Value & 1 == 1: # check bit 0
# if it supports events, turn on bit 2 (enables history read)
source.set_attr_bit(ua.AttributeIds.EventNotifier, 2)
# send the object to history manager
self.history_manager.historize_event(source, period)
def disable_history_event(self, source):
"""
Set attribute History Read of node to False and stop storing data for history
"""
source.unset_attr_bit(ua.AttributeIds.EventNotifier, 2)
self.history_manager.dehistorize(source)
class InternalSession(object):
......
......@@ -91,7 +91,11 @@ class MonitoredItemService(object):
if params.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId)
if self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value != 1:
ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
if ev_notify_byte is not None:
if ev_notify_byte & 1 == 0:
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
else:
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadServiceUnsupported)
result.FilterResult = ua.EventFilterResult()
for _ in params.RequestedParameters.Filter.SelectClauses:
......
'''
"""
Example auto_generated file with UA Types
For now only events!
'''
"""
from opcua.ua import *
# TODO: This should be autogeneratd form XML description of EventTypes
# TODO: This should be auto generated form XML description of EventTypes
class BaseEvent(FrozenClass):
'''
"""
BaseEvent implements BaseEventType from which inherit all other events and it is used per default.
'''
"""
def __init__(self, sourcenode=None, message=None, severity=1, extended=False):
self.EventId = bytes()
self.EventType = NodeId(ObjectIds.BaseEventType)
self.SourceNode = sourcenode
self.SourceName = None
self.Time = None
self.RecieveTime = None
self.ReceiveTime = None
self.LocalTime = None
self.Message = LocalizedText(message)
self.Severity = Variant(severity, VariantType.UInt16)
......@@ -31,9 +31,9 @@ class BaseEvent(FrozenClass):
class AuditEvent(BaseEvent):
'''
"""
Audit implements AuditEventType from which inherit all other Audit events.
'''
"""
def __init__(self, sourcenode=None, message=None, severity=1, extended=False):
super(AuditEvent, self).__init__(sourcenode, message, severity, True)
self.EventType = NodeId(ObjectIds.AuditEventType)
......
This diff is collapsed.
......@@ -129,14 +129,21 @@ class TestServer(unittest.TestCase, CommonTests):
val = v.get_value()
self.assertEqual(val, "StringValue")
def test_historize(self):
def test_historize_variable(self):
o = self.opc.get_objects_node()
var = o.add_variable(3, "test_hist", 1.0)
self.srv.iserver.enable_history(var, timedelta(days=1))
self.srv.iserver.enable_history_data_change(var, timedelta(days=1))
time.sleep(1)
var.set_value(2.0)
var.set_value(3.0)
self.srv.iserver.disable_history(var)
self.srv.iserver.disable_history_data_change(var)
def test_historize_events(self):
srv_node = self.srv.get_node(ua.ObjectIds.Server)
srvevgen = self.srv.get_event_generator()
self.srv.iserver.enable_history_event(srv_node, period=None)
srvevgen.trigger(message="Message")
self.srv.iserver.disable_history_event(srv_node)
def test_references_for_added_nodes_method(self):
objects = self.opc.get_objects_node()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment