Commit e35fc08f authored by olivier R-D's avatar olivier R-D

first draft of history_read service, broken

parent 2370b4d6
from datetime import timedelta
from datetime import datetime
import time
import struct
from opcua import Subscription
from opcua import ua
def datetime_to_bytes(dt):
time_float = time.mktime(dt.timetuple()) + dt.microsecond / 1E6
return struct.pack("!L", time_float)
def bytes_to_datetime(data):
time_float = struct.unpack('!L', data)[0]
return datetime.fromtimestamp(time_float)
class HistoryStorageInterface(object):
"""
Interface of a history backend
"""
def save_node_value(self, node, timestamp, datavalue):
def new_node(self, node, period, count=0):
raise NotImplementedError
def read_node_value(self, node, start, end):
def save_datavalue(self, node, datavalue):
raise NotImplementedError
def save_event(self, timestamp, event):
def read_datavalues(self, node, start, end, nb_values):
raise NotImplementedError
def read_event(self, event, start, end):
def new_event(self, event, period):
raise NotImplementedError
def save_event(self, event):
raise NotImplementedError
def read_events(self, start, end, evfilter):
raise NotImplementedError
......@@ -31,22 +49,26 @@ class HistoryDict(HistoryStorageInterface):
self._datachanges_period = {}
self._events = {}
def new_node(self, node, period):
def new_node(self, node, period, count=0):
self._datachanges[node] = []
self._datachanges_period[node] = period
self._datachanges_period[node] = period, count
def new_event(self, period):
def new_event(self, event, period):
self._events = []
def save_node_value(self, node, datavalue):
def save_datavalue(self, node, datavalue):
print("saving", node, datavalue)
data = self._datachanges[node]
period = self._datachanges_period[node]
period, count = self._datachanges_period[node]
data.append(datavalue)
now = datetime.now()
while now - data[0].ServerTimestamp > period:
data.pop(0)
if period:
while now - data[0].ServerTimestamp > period:
data.pop(0)
if count and len(data) > count:
data = data[-count:]
def read_node_value(self, node, start, end):
def read_datavalues(self, node, start, end, nb_values):
if node not in self._datachanges:
return []
else:
......@@ -56,7 +78,7 @@ class HistoryDict(HistoryStorageInterface):
def save_event(self, timestamp, event):
raise NotImplementedError
def read_event(self, event, start, end):
def read_events(self, start, end, evfilter):
raise NotImplementedError
......@@ -65,11 +87,9 @@ class SubHandler(object):
self.storage = storage
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val, data)
self.storage.save_node_value(node, data.monitored_item.Value)
self.storage.save_datavalue(node, data.monitored_item.Value)
def event_notification(self, event):
print("Python: New event", event)
self.storage.save_event(event)
......@@ -93,12 +113,12 @@ class HistoryManager(object):
params.Priority = 0
return Subscription(self.iserver.isession, params, handler)
def historize(self, node, period=timedelta(days=7)):
def historize(self, node, period=timedelta(days=7), count=0):
if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage))
if node in self._handlers:
raise ua.UaError("Node {} is allready historized".format(node))
self.storage.new_node(node, period)
self.storage.new_node(node, period, count)
handler = self._sub.subscribe_data_change(node)
self._handlers[node] = handler
......@@ -120,10 +140,53 @@ class HistoryManager(object):
return results
def _read_history(self, details, rv):
""" read history for a node
"""
result = ua.HistoryReadResult()
if type(details) is ua.ReadRawModifiedDetails:
pass
self.storage.read_data()
if details.IsReadModified:
result.HistoryData = ua.HistoryModifiedData()
# we do not support modified history by design so we return what we have
dv, cont = self._read_datavalue_history(rv, details)
result.HistoryData.DataValues = dv
result.ContinuationPoint = cont
else:
result.HistoryData = ua.HistoryData()
dv, cont = self._read_datavalue_history(rv, details)
result.HistoryData.DataValues = dv
result.ContinuationPoint = cont
elif type(details) is ua.ReadEventDetails:
result.HistoryData = ua.HistoryEvent()
# FIXME: filter is a cumbersome type, maybe transform it something easier
# to handle for storage
result.HistoryData.Events = self.storage.read_events(details.StartTime,
details.EndTime,
details.Filter)
else:
# we do not currently support the other types, clients can process data themselves
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
return result
def read_datavalue_history(self, rv, details):
starttime = details.StartTime
if rv.ContinuationPoint:
# Spec says we should ignore details if cont point is present
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
starttime = bytes_to_datetime(rv.ContinuationPoint)
dv, cont = self.storage.read_datavalues(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode)
if cont:
cont = datetime_to_bytes(dv[-1].SourceTimeStamp)
# FIXME, parse index range and filter out if necesary
# rv.IndexRange
# rv.DataEncoding # xml or binary, seems spec say we can ignore that one
return dv, cont
def update_history(self, params):
"""
......@@ -131,7 +194,13 @@ class HistoryManager(object):
This is the part AttributeService, but implemented as its own service
since it requires more logic than other attribute service methods
"""
pass
results = []
for details in params.HistoryUpdateDetails:
result = ua.HistoryUpdateResult()
# we do not accept to rewrite history
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
results.append(results)
return results
......
......@@ -152,12 +152,12 @@ class InternalServer(object):
def create_session(self, name, user=User.Anonymous, external=False):
return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
def enable_history(self, node, period=timedelta(days=7)):
def enable_history(self, node, period=timedelta(days=7), count=0):
"""
Set attribute Historizing of node to True and start storing data for history
"""
node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
self.history_manager.historize(node, period)
self.history_manager.historize(node, period, count)
def disable_history(self, node):
"""
......@@ -238,6 +238,9 @@ class InternalSession(object):
return results
return [deepcopy(dv) for dv in results]
def history_read(self, params):
return self.iserver.history_manager.read_history(params)
def write(self, params):
if not self.external:
# If session is internal we need to store a copy og object, not a reference,
......
......@@ -333,6 +333,18 @@ class UaProcessor(object):
self.logger.info("sending delete monitored items response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
self.logger.info("history read request")
params = ua.HistoryReadRequest.from_binary(body)
results = self.session.history_read(params)
response = ua.HistoryReadResponse()
response.Results = results
self.logger.info("sending history read response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
self.logger.info("publish request")
......
......@@ -867,4 +867,14 @@ class CommonTests(object):
self.assertTrue(len(endpoints) > 0)
self.assertTrue(endpoints[0].EndpointUrl.startswith("opc.tcp://"))
def test_history_read(self):
o = self.srv.get_objects_node()
start_val = 0.1
var = o.add_variable(3, "history_var", start_val)
self.srv.iserver.enable_history(var, period=None, count=10)
for _ in range(20):
start_val += 0.1
var.set_value(start_val)
time.sleep(1)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment