Commit 2370b4d6 authored by olivier R-D's avatar olivier R-D

start work on server history support

parent 15ed61bb
......@@ -3,6 +3,8 @@ High level node object, to access node attribute
and browse address space
"""
from datetime import timedelta
from opcua import ua
......@@ -40,6 +42,9 @@ class Node(object):
return "Node({})".format(self.nodeid)
__repr__ = __str__
def __hash__(self):
return self.nodeid.__hash__()
def get_browse_name(self):
"""
Get browse name of a node. A browse name is a QualifiedName object
......
from datetime import timedelta
from datetime import datetime
from opcua import Subscription
from opcua import ua
class HistoryStorageInterface(object):
"""
Interface of a history backend
"""
def save_node_value(self, node, timestamp, datavalue):
raise NotImplementedError
def read_node_value(self, node, start, end):
raise NotImplementedError
def save_event(self, timestamp, event):
raise NotImplementedError
def read_event(self, event, start, end):
raise NotImplementedError
class HistoryDict(HistoryStorageInterface):
"""
very minimal history backend storing data in memory using a Python dictionnary
"""
def __init__(self):
self._datachanges = {}
self._datachanges_period = {}
self._events = {}
def new_node(self, node, period):
self._datachanges[node] = []
self._datachanges_period[node] = period
def new_event(self, period):
self._events = []
def save_node_value(self, node, datavalue):
data = self._datachanges[node]
period = self._datachanges_period[node]
data.append(datavalue)
now = datetime.now()
while now - data[0].ServerTimestamp > period:
data.pop(0)
def read_node_value(self, node, start, end):
if node not in self._datachanges:
return []
else:
# FIME: improve algo
return [dv for dv in self._datachanges[node] if start <= dv.ServerTimestamp <= end]
def save_event(self, timestamp, event):
raise NotImplementedError
def read_event(self, event, start, end):
raise NotImplementedError
class SubHandler(object):
def __init__(self, storage):
self.storage = storage
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val, data)
self.storage.save_node_value(node, data.monitored_item.Value)
def event_notification(self, event):
print("Python: New event", event)
self.storage.save_event(event)
class HistoryManager(object):
def __init__(self, iserver):
self.iserver = iserver
self.storage = HistoryDict()
self._sub = None
self._handlers = {}
def set_storage(self, storage):
self.storage = storage
def _create_subscription(self, handler):
params = ua.CreateSubscriptionParameters()
params.RequestedPublishingInterval = 10
params.RequestedLifetimeCount = 3000
params.RequestedMaxKeepAliveCount = 10000
params.MaxNotificationsPerPublish = 0
params.PublishingEnabled = True
params.Priority = 0
return Subscription(self.iserver.isession, params, handler)
def historize(self, node, period=timedelta(days=7)):
if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage))
if node in self._handlers:
raise ua.UaError("Node {} is allready historized".format(node))
self.storage.new_node(node, period)
handler = self._sub.subscribe_data_change(node)
self._handlers[node] = handler
def dehistorize(self, node):
self._sub.unsubscribe(self._handlers[node])
del(self._handlers[node])
def read_history(self, params):
"""
Read history for a node
This is the part AttributeService, but implemented as its own service
since it requires more logic than other attribute service methods
"""
results = []
for rv in params.NodesToRead:
res = self._read_history(params.HistoryReadDetails, rv)
results.append(res)
return results
def _read_history(self, details, rv):
if type(details) is ua.ReadRawModifiedDetails:
pass
self.storage.read_data()
def update_history(self, params):
"""
Update history for a node
This is the part AttributeService, but implemented as its own service
since it requires more logic than other attribute service methods
"""
pass
......@@ -4,6 +4,7 @@ Internal server implementing opcu-ua interface. can be used on server side or to
from datetime import datetime
from copy import copy, deepcopy
from datetime import timedelta
import logging
from threading import Lock
from enum import Enum
......@@ -16,6 +17,7 @@ except ImportError:
from opcua import ua
from opcua.common import utils
from opcua.common.node import Node
from opcua.server.history import HistoryManager
from opcua.server.address_space import AddressSpace
from opcua.server.address_space import AttributeService
from opcua.server.address_space import ViewService
......@@ -67,6 +69,8 @@ class InternalServer(object):
self.asyncio_transports = []
self.subscription_service = SubscriptionService(self.loop, self.aspace)
self.history_manager = HistoryManager(self)
# create a session to use on server side
self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
......@@ -148,6 +152,21 @@ class InternalServer(object):
def create_session(self, name, user=User.Anonymous, external=False):
return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
def enable_history(self, node, period=timedelta(days=7)):
"""
Set attribute Historizing of node to True and start storing data for history
"""
node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
self.history_manager.historize(node, period)
def disable_history(self, node):
"""
Set attribute Historizing of node to False and stop storing data for history
"""
node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
self.history_manager.dehistorize(node)
class InternalSession(object):
_counter = 10
......
import unittest
from tests_common import CommonTests, add_server_methods
import time
from datetime import timedelta
from opcua import Server
from opcua import Client
......@@ -126,5 +127,14 @@ class TestServer(unittest.TestCase, CommonTests):
val = v.get_value()
self.assertEqual(val, "StringValue")
def test_historize(self):
o = self.opc.get_objects_node()
var = o.add_variable(3, "test_hist", 1.0)
self.srv.iserver.enable_history(var, timedelta(days=1))
time.sleep(1)
var.set_value(2.0)
var.set_value(3.0)
self.srv.iserver.disable_history(var)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment