Commit 3b0f90ad authored by zerox1212's avatar zerox1212

Subscribe to Multiple Event Types, History Updates

Implement Subscription Supports List of Event Types, History Update
again after all the changes to events.
parent f3adbc97
......@@ -17,3 +17,4 @@ dist
*.swp
newdocs
examples/history.db
*.sql
......@@ -10,12 +10,6 @@ from opcua.server.history_sql import HistorySQLite
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger("opcua.server.internal_subscription")
logger.setLevel(logging.DEBUG)
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
......@@ -44,6 +38,7 @@ if __name__ == "__main__":
myevgen = server.get_event_generator(etype, myobj)
myevgen.event.Severity = 500
myevgen.event.MyStringProperty = ua.Variant("hello world")
myevgen.event.MyNumericProperty = ua.Variant(-456)
myevgen2 = server.get_event_generator(etype2, myobj)
myevgen2.event.Severity = 123
......
......@@ -94,7 +94,7 @@ class Event(object):
@staticmethod
def from_event_fields(select_clauses, fields):
"""
Instanciate an Event object from a select_clauses and fields
Instantiate an Event object from a select_clauses and fields
"""
ev = Event()
ev.select_clauses = select_clauses
......@@ -108,40 +108,48 @@ class Event(object):
return ev
def get_filter_from_event_type(eventtype):
def get_filter_from_event_type(eventtypes):
evfilter = ua.EventFilter()
evfilter.SelectClauses = select_clauses_from_evtype(eventtype)
evfilter.WhereClause = where_clause_from_evtype(eventtype)
evfilter.SelectClauses = select_clauses_from_evtype(eventtypes)
evfilter.WhereClause = where_clause_from_evtype(eventtypes)
return evfilter
def select_clauses_from_evtype(evtype):
def select_clauses_from_evtype(evtypes):
clauses = []
for prop in get_event_properties_from_type_node(evtype):
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = evtype.nodeid
op.AttributeId = ua.AttributeIds.Value
op.BrowsePath = [prop.get_browse_name()]
clauses.append(op)
selected_paths = []
for evtype in evtypes:
for prop in get_event_properties_from_type_node(evtype):
if prop.get_browse_name() not in selected_paths:
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = evtype.nodeid
op.AttributeId = ua.AttributeIds.Value
op.BrowsePath = [prop.get_browse_name()]
clauses.append(op)
selected_paths.append(prop.get_browse_name())
return clauses
def where_clause_from_evtype(evtype):
def where_clause_from_evtype(evtypes):
cf = ua.ContentFilter()
el = ua.ContentFilterElement()
# operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = evtype.nodeid
op.BrowsePath.append(ua.QualifiedName("EventType", 0))
op.AttributeId = ua.AttributeIds.Value
el.FilterOperands.append(op)
for subtypeid in [st.nodeid for st in get_node_subtypes(evtype)]:
op = ua.LiteralOperand()
op.Value = ua.Variant(subtypeid)
for evtype in evtypes:
# operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = evtype.nodeid
op.BrowsePath.append(ua.QualifiedName("EventType", 0))
op.AttributeId = ua.AttributeIds.Value
el.FilterOperands.append(op)
el.FilterOperator = ua.FilterOperator.InList
for subtypeid in [st.nodeid for st in get_node_subtypes(evtype)]:
op = ua.LiteralOperand()
op.Value = ua.Variant(subtypeid)
el.FilterOperands.append(op)
el.FilterOperator = ua.FilterOperator.InList
cf.Elements.append(el)
return cf
......
......@@ -172,16 +172,23 @@ class Subscription(object):
"""
return self._subscribe(nodes, attr, queuesize=0)
def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType, evfilter=None):
def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtypes=ua.ObjectIds.BaseEventType, evfilter=None):
"""
Subscribe to events from a node. Default node is Server node.
In most servers the server node is the only one you can subscribe to.
if evfilter is provided, evtype is ignored
if evtypes is not provided, evtype defaults to BaseEventType
if evtypes is a list or tuple of custom event types, the events will be filtered to the supplied types
Return a handle which can be used to unsubscribe
"""
sourcenode = Node(self.server, sourcenode)
if evfilter is None:
evfilter = events.get_filter_from_event_type(Node(self.server, evtype))
if not type(evtypes) in (list, tuple):
evtypes = [evtypes]
evtypes = [Node(self.server, i) for i in evtypes] # make sure we have a list of Node objects
evfilter = events.get_filter_from_event_type(evtypes)
return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
......
......@@ -77,7 +77,7 @@ class HistoryStorageInterface(object):
class HistoryDict(HistoryStorageInterface):
"""
very minimal history backend storing data in memory using a Python dictionary
very minimal history back end storing data in memory using a Python dictionary
"""
def __init__(self):
self._datachanges = {}
......@@ -126,7 +126,7 @@ class HistoryDict(HistoryStorageInterface):
results = results[:nb_values]
return results, cont
def new_historized_event(self, source_id, etype, period, count):
def new_historized_event(self, source_id, etype, period, count=0):
if source_id in self._events:
raise UaNodeAlreadyHistorizedError(source_id)
self._events[source_id] = []
......@@ -208,7 +208,7 @@ class HistoryManager(object):
def historize_data_change(self, node, period=timedelta(days=7), count=0):
"""
subscribe to the nodes' data changes and store the data in the active storage
Subscribe to the nodes' data changes and store the data in the active storage.
"""
if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage))
......@@ -220,7 +220,16 @@ class HistoryManager(object):
def historize_event(self, source, period=timedelta(days=7), count=0):
"""
subscribe to the source nodes' events and store the data in the active storage; custom event properties included
Subscribe to the source nodes' events and store the data in the active storage.
SQL Implementation
The default is to historize every event type the source generates, custom event properties are included. At
this time there is no way to historize a specific event type. The user software can filter out events which are
not desired when reading.
Note that adding custom events to a source node AFTER historizing has been activated is not supported at this
time (in SQL history there will be no columns in the SQL table for the new event properties). For SQL The table
must be deleted manually so that a new table with the custom event fields can be created.
"""
if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage))
......@@ -230,14 +239,19 @@ class HistoryManager(object):
# get the event types the source node generates and a list of all possible event fields
event_types, ev_fields = self._get_source_event_data(source)
# FIXME passing ev_fields instead of event type only works because HistoryDict doesn't use this parameter,
# FIXME SQL needs to be fixed to get the fields in the SQL module, not here; only event types should be here
self.storage.new_historized_event(source.nodeid, ev_fields, period, count)
handler = self._sub.subscribe_events(source) # FIXME supply list of event types when master is fixed
handler = self._sub.subscribe_events(source, event_types)
self._handlers[source] = handler
def dehistorize(self, node):
"""
remove subscription to the node/source which is being historized
Remove subscription to the node/source which is being historized
SQL Implementation
Only the subscriptions is removed. The historical data remains.
"""
if node in self._handlers:
self._sub.unsubscribe(self._handlers[node])
......
......@@ -77,7 +77,7 @@ class HistorySQLite(HistoryStorageInterface):
# get this node's period from the period dict and calculate the limit
period, count = self._datachanges_period[node_id]
def executeDeleteStatement(condition, args):
def execute_sql_delete(condition, args):
query = ('DELETE FROM "{tn}" WHERE ' + condition).format(tn=table)
try:
......@@ -90,39 +90,19 @@ class HistorySQLite(HistoryStorageInterface):
if period:
# after the insert, if a period was specified delete all records older than period
date_limit = datetime.utcnow() - period
executeDeleteStatement('ServerTimestamp < ?', (date_limit,))
execute_sql_delete('ServerTimestamp < ?', (date_limit,))
if count:
#ensure that no more than count records are stored for the specified node
executeDeleteStatement('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
# ensure that no more than count records are stored for the specified node
execute_sql_delete('ServerTimestamp = (SELECT CASE WHEN COUNT(*) > ? '
'THEN MIN(ServerTimestamp) ELSE NULL END FROM "{tn}")', (count,))
def read_node_history(self, node_id, start, end, nb_values):
with self._lock:
_c_read = self._conn.cursor()
order = "ASC"
if start is None or start == ua.DateTimeMinValue:
order = "DESC"
start = ua.DateTimeMinValue
if end is None or end == ua.DateTimeMinValue:
end = datetime.utcnow() + timedelta(days=1)
if start < end:
start_time = start.isoformat(' ')
end_time = end.isoformat(' ')
else:
order = "DESC"
start_time = end.isoformat(' ')
end_time = start.isoformat(' ')
if nb_values:
limit = nb_values + 1 # add 1 to the number of values for retrieving a continuation point
else:
limit = -1 # in SQLite a LIMIT of -1 returns all results
table = self._get_table_name(node_id)
start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
cont = None
results = []
......@@ -152,7 +132,7 @@ class HistorySQLite(HistoryStorageInterface):
return results, cont
def new_historized_event(self, source_id, ev_fields, period, count):
def new_historized_event(self, source_id, ev_fields, period, count=0):
with self._lock:
_c_new = self._conn.cursor()
......@@ -211,32 +191,10 @@ class HistorySQLite(HistoryStorageInterface):
def read_event_history(self, source_id, start, end, nb_values, evfilter):
with self._lock:
_c_read = self._conn.cursor()
order = "ASC"
if start is None or start == ua.DateTimeMinValue:
order = "DESC"
start = ua.DateTimeMinValue
if end is None or end == ua.DateTimeMinValue:
end = datetime.utcnow() + timedelta(days=1)
if start < end:
start_time = start.isoformat(' ')
end_time = end.isoformat(' ')
else:
order = "DESC"
start_time = end.isoformat(' ')
end_time = start.isoformat(' ')
if nb_values:
limit = nb_values + 1 # add 1 to the number of values for retrieving a continuation point
else:
limit = -1 # in SQLite a LIMIT of -1 returns all results
table = self._get_table_name(source_id)
start_time, end_time, order, limit = self._get_bounds(start, end, nb_values)
clauses, clauses_str = self._get_select_clauses(source_id, evfilter)
cont = None
......@@ -273,13 +231,39 @@ class HistorySQLite(HistoryStorageInterface):
def _get_table_name(self, node_id):
return str(node_id.NamespaceIndex) + '_' + str(node_id.Identifier)
@staticmethod
def _get_bounds(start, end, nb_values):
order = "ASC"
if start is None or start == ua.DateTimeMinValue:
order = "DESC"
start = ua.DateTimeMinValue
if end is None or end == ua.DateTimeMinValue:
end = datetime.utcnow() + timedelta(days=1)
if start < end:
start_time = start.isoformat(' ')
end_time = end.isoformat(' ')
else:
order = "DESC"
start_time = end.isoformat(' ')
end_time = start.isoformat(' ')
if nb_values:
limit = nb_values + 1 # add 1 to the number of values for retrieving a continuation point
else:
limit = -1 # in SQLite a LIMIT of -1 returns all results
return start_time, end_time, order, limit
def _format_event(self, event_result):
placeholders = []
ev_variant_binaries = []
ev_variant_dict = event_result.get_event_props_as_fields_dict()
names = list(ev_variant_dict.keys())
names.sort() # sort alphabatically since dict is not sorted
names.sort() # sort alphabetically since dict is not sorted
# split dict into two synchronized lists which will be converted to SQL strings
# note that the variants are converted to binary objects for storing in SQL BLOB format
......@@ -313,7 +297,8 @@ class HistorySQLite(HistoryStorageInterface):
clauses = [x for x in s_clauses if x in self._event_fields[source_id]]
return clauses, self._list_to_sql_str(clauses)
def _list_to_sql_str(self, ls, quotes=True):
@staticmethod
def _list_to_sql_str(ls, quotes=True):
sql_str = ''
for item in ls:
if quotes:
......
......@@ -177,7 +177,7 @@ class InternalServer(object):
node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
self.history_manager.dehistorize(node)
def enable_history_event(self, source, period=timedelta(days=7)):
def enable_history_event(self, source, period=timedelta(days=7), count=0):
"""
Set attribute History Read of object events to True and start storing data for history
"""
......@@ -187,7 +187,7 @@ class InternalServer(object):
# if it supports events, turn on bit 2 (enables history read)
source.set_attr_bit(ua.AttributeIds.EventNotifier, 2)
# send the object to history manager
self.history_manager.historize_event(source, period)
self.history_manager.historize_event(source, period, count)
def disable_history_event(self, source):
"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment