Commit eeb0edd3 authored by ORD's avatar ORD Committed by GitHub

Merge pull request #207 from zerox1212/master

Make History Consistent with Events
parents 4652a3a9 83d14db5
import sys
sys.path.insert(0, "..")
import time
import math
from opcua import ua, Server
from opcua.server.history_sql import HistorySQLite
if __name__ == "__main__":
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
# setup our own namespace, not really necessary but should as spec
uri = "http://examples.freeopcua.github.io"
idx = server.register_namespace(uri)
# get Objects node, this is where we should put our custom stuff
objects = server.get_objects_node()
# populating our address space
myobj = objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", ua.Variant(0, ua.VariantType.Double))
myvar.set_writable() # Set MyVariable to be writable by clients
# Configure server to use sqlite as history database (default is a simple in memory dict)
server.iserver.history_manager.set_storage(HistorySQLite("my_datavalue_history.sql"))
# starting!
server.start()
# enable data change history for this particular node, must be called after start since it uses subscription
server.iserver.enable_history_data_change(myvar, period=None, count=100)
try:
count = 0
while True:
time.sleep(1)
count += 0.1
myvar.set_value(math.sin(count))
finally:
# close connection, remove subscriptions, etc
server.stop()
import sys
sys.path.insert(0, "..")
import time
import logging
from datetime import datetime
from opcua import ua, Server
from opcua.server.history_sql import HistorySQLite
......@@ -23,8 +22,6 @@ if __name__ == "__main__":
# populating our address space
myobj = objects.add_object(idx, "MyObject")
myvar = myobj.add_variable(idx, "MyVariable", ua.Variant(0, ua.VariantType.Double))
myvar.set_writable() # Set MyVariable to be writable by clients
# Creating a custom event: Approach 1
# The custom event object automatically will have members from its parent (BaseEventType)
......@@ -35,15 +32,18 @@ if __name__ == "__main__":
etype2 = server.create_custom_event_type(2, 'MySecondEvent', ua.ObjectIds.BaseEventType,
[('MyOtherProperty', ua.VariantType.Float)])
# get an event generator for the myobj node which generates custom events
myevgen = server.get_event_generator(etype, myobj)
myevgen.event.Severity = 500
myevgen.event.MyStringProperty = ua.Variant("hello world")
myevgen.event.MyNumericProperty = ua.Variant(-456)
# get another event generator for the myobj node which generates different custom events
myevgen2 = server.get_event_generator(etype2, myobj)
myevgen2.event.Severity = 123
myevgen2.event.MyOtherProperty = ua.Variant(1.337)
# get an event generator for the server node which generates BaseEventType
serverevgen = server.get_event_generator()
serverevgen.event.Severity = 111
......@@ -53,10 +53,10 @@ if __name__ == "__main__":
# starting!
server.start()
# enable history for myobj events
# enable history for myobj events; must be called after start since it uses subscription
server.iserver.enable_history_event(myobj, period=None)
# enable history for server events
# enable history for server events; must be called after start since it uses subscription
server_node = server.get_node(ua.ObjectIds.Server)
server.iserver.enable_history_event(server_node, period=None)
......@@ -65,11 +65,15 @@ if __name__ == "__main__":
while True:
time.sleep(1)
count += 0.1
# generate events for subscribed clients and history
myevgen.trigger(message="This is MyFirstEvent " + str(count))
myevgen2.trigger(message="This is MySecondEvent " + str(count))
serverevgen.trigger(message="Server Event Message")
res = server_node.read_event_history(None, None, 0)
# read event history from sql
end_time = datetime.utcnow()
server_event_history = server_node.read_event_history(None, end_time, 0)
finally:
# close connection, remove subscriptions, etc
......
......@@ -183,24 +183,12 @@ class Subscription(object):
sourcenode = Node(self.server, sourcenode)
if evfilter is None:
# FIXME Review this, the commented out way doesn't support evtypes being passed a Node object
# if not type(evtypes) in (list, tuple):
# evtypes = [evtypes]
#
# evtypes = [Node(self.server, i) for i in evtypes] # make sure we have a list of Node objects
if not type(evtypes) in (list, tuple):
evtypes = [evtypes]
# FIXME not a very nice way to make sure events.get_filter gets a list of nodes...
evtype_nodes = []
for evtype in evtypes:
if not isinstance(evtype, Node):
evtype_nodes.append(Node(self.server, ua.NodeId(evtype))) # make sure we have a list of Node objects
else:
evtype_nodes.append(evtype)
evtypes = [Node(self.server, evtype) for evtype in evtypes]
evfilter = events.get_filter_from_event_type(evtype_nodes)
evfilter = events.get_filter_from_event_type(evtypes)
return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
......
......@@ -44,7 +44,7 @@ class HistoryStorageInterface(object):
"""
raise NotImplementedError
def new_historized_event(self, source_id, etype, period, count=0):
def new_historized_event(self, source_id, evtypes, period, count=0):
"""
Called when historization of events is enabled on server side
Returns None
......@@ -126,7 +126,7 @@ class HistoryDict(HistoryStorageInterface):
results = results[:nb_values]
return results, cont
def new_historized_event(self, source_id, etype, period, count=0):
def new_historized_event(self, source_id, evtypes, period, count=0):
if source_id in self._events:
raise UaNodeAlreadyHistorizedError(source_id)
self._events[source_id] = []
......@@ -236,12 +236,10 @@ class HistoryManager(object):
if source in self._handlers:
raise ua.UaError("Events from {} are already historized".format(source))
# get the event types the source node generates and a list of all possible event fields
event_types, ev_fields = self._get_source_event_data(source)
# get list of all event types that the source node generates; change this to only historize specific events
event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
# FIXME passing ev_fields instead of event type only works because HistoryDict doesn't use this parameter,
# FIXME SQL needs to be fixed to get the fields in the SQL module, not here; only event types should be here
self.storage.new_historized_event(source.nodeid, ev_fields, period, count)
self.storage.new_historized_event(source.nodeid, event_types, period, count)
handler = self._sub.subscribe_events(source, event_types)
self._handlers[source] = handler
......@@ -342,19 +340,6 @@ class HistoryManager(object):
cont = ua.pack_datetime(cont)
return results, cont
def _get_source_event_data(self, source):
# get all event types which the source node can generate; get the fields of those event types
event_types = source.get_referenced_nodes(ua.ObjectIds.GeneratesEvent)
ev_aggregate_fields = []
for event_type in event_types:
ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
ev_fields = []
for field in set(ev_aggregate_fields):
ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
return event_types, ev_fields
def update_history(self, params):
"""
Update history for a node
......
......@@ -132,10 +132,13 @@ class HistorySQLite(HistoryStorageInterface):
return results, cont
def new_historized_event(self, source_id, ev_fields, period, count=0):
def new_historized_event(self, source_id, evtypes, period, count=0):
with self._lock:
_c_new = self._conn.cursor()
# get all fields for the event type nodes
ev_fields = self._get_event_fields(evtypes)
self._datachanges_period[source_id] = period
self._event_fields[source_id] = ev_fields
......@@ -231,6 +234,25 @@ class HistorySQLite(HistoryStorageInterface):
def _get_table_name(self, node_id):
return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
def _get_event_fields(self, evtypes):
"""
Get all fields from the event types that are to be historized
Args:
evtypes: List of event type nodes
Returns: List of fields for all event types
"""
# get all fields from the event types that are to be historized
ev_aggregate_fields = []
for event_type in evtypes:
ev_aggregate_fields.extend((events.get_event_properties_from_type_node(event_type)))
ev_fields = []
for field in set(ev_aggregate_fields):
ev_fields.append(field.get_display_name().Text.decode(encoding='utf-8'))
return ev_fields
@staticmethod
def _get_bounds(start, end, nb_values):
order = "ASC"
......@@ -257,11 +279,20 @@ class HistorySQLite(HistoryStorageInterface):
return start_time, end_time, order, limit
def _format_event(self, event_result):
def _format_event(self, event):
"""
Convert an event object triggered by the subscription into ordered lists for the SQL insert string
Args:
event: The event returned by the subscription
Returns: List of event fields (SQL column names), List of '?' placeholders, Tuple of variant binaries
"""
placeholders = []
ev_variant_binaries = []
ev_variant_dict = event_result.get_event_props_as_fields_dict()
ev_variant_dict = event.get_event_props_as_fields_dict()
names = list(ev_variant_dict.keys())
names.sort() # sort alphabetically since dict is not sorted
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment