Commit 18100b2e authored by ORD's avatar ORD

Merge pull request #140 from FreeOpcUa/history

Server history support for DataValues (Not events)
parents 15ed61bb 6069acb1
......@@ -361,7 +361,8 @@ class Client(object):
Activate session using either username and password or private_key
"""
params = ua.ActivateSessionParameters()
challenge = self.security_policy.server_certificate + self._server_nonce
cert = self.security_policy.server_certificate if self.security_policy.server_certificate is not None else b""
challenge = cert + self._server_nonce
params.ClientSignature.Algorithm = b"http://www.w3.org/2000/09/xmldsig#rsa-sha1"
params.ClientSignature.Signature = self.security_policy.asymmetric_cryptography.signature(challenge)
params.LocaleIds.append("en")
......
......@@ -105,8 +105,8 @@ def _create_folder(server, parentnodeid, nodeid, qname):
attrs = ua.ObjectAttributes()
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.WriteMask = ua.OpenFileMode.Read
attrs.UserWriteMask = ua.OpenFileMode.Read
attrs.WriteMask = 0
attrs.UserWriteMask = 0
attrs.EventNotifier = 0
addnode.NodeAttributes = attrs
results = server.add_nodes([addnode])
......@@ -126,8 +126,8 @@ def _create_object(server, parentnodeid, nodeid, qname):
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.EventNotifier = 0
attrs.WriteMask = ua.OpenFileMode.Read
attrs.UserWriteMask = ua.OpenFileMode.Read
attrs.WriteMask = 0
attrs.UserWriteMask = 0
addnode.NodeAttributes = attrs
results = server.add_nodes([addnode])
results[0].StatusCode.check()
......@@ -163,9 +163,11 @@ def _create_variable(server, parentnodeid, nodeid, qname, val, isproperty=False)
else:
attrs.ValueRank = ua.ValueRank.Scalar
#attrs.ArrayDimensions = None
attrs.WriteMask = ua.OpenFileMode.Read
attrs.UserWriteMask = ua.OpenFileMode.Read
attrs.WriteMask = 0
attrs.UserWriteMask = 0
attrs.Historizing = 0
attrs.AccessLevel = ua.AccessLevelMask.CurrentRead
attrs.UserAccessLevel = ua.AccessLevelMask.CurrentRead
addnode.NodeAttributes = attrs
results = server.add_nodes([addnode])
results[0].StatusCode.check()
......@@ -183,8 +185,8 @@ def _create_method(parent, nodeid, qname, callback, inputs, outputs):
attrs = ua.MethodAttributes()
attrs.Description = ua.LocalizedText(qname.Name)
attrs.DisplayName = ua.LocalizedText(qname.Name)
attrs.WriteMask = ua.OpenFileMode.Read
attrs.UserWriteMask = ua.OpenFileMode.Read
attrs.WriteMask = 0
attrs.UserWriteMask = 0
attrs.Executable = True
attrs.UserExecutable = True
addnode.NodeAttributes = attrs
......
......@@ -3,6 +3,8 @@ High level node object, to access node attribute
and browse address space
"""
from datetime import datetime
from opcua import ua
......@@ -12,7 +14,7 @@ class Node(object):
High level node object, to access node attribute,
browse and populate address space.
Node objects are usefull as-is but they do not expose the entire
OPC-UA protocol. Feel free to look at Node code and call
OPC-UA protocol. Feel free to look at the code of this class and call
directly UA services methods to optimize your code
"""
......@@ -40,6 +42,9 @@ class Node(object):
return "Node({})".format(self.nodeid)
__repr__ = __str__
def __hash__(self):
return self.nodeid.__hash__()
def get_browse_name(self):
"""
Get browse name of a node. A browse name is a QualifiedName object
......@@ -148,11 +153,21 @@ class Node(object):
A node is always writable on server side.
"""
if writable:
self.set_attribute(ua.AttributeIds.AccessLevel, ua.DataValue(ua.Variant(ua.AccessLevelMask.CurrentWrite, ua.VariantType.Byte)))
self.set_attribute(ua.AttributeIds.UserAccessLevel, ua.DataValue(ua.Variant(ua.AccessLevelMask.CurrentWrite, ua.VariantType.Byte)))
self.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
self.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
else:
self.set_attribute(ua.AttributeIds.AccessLevel, ua.DataValue(ua.Variant(ua.AccessLevelMask.CurrentRead, ua.VariantType.Byte)))
self.set_attribute(ua.AttributeIds.AccessLevel, ua.DataValue(ua.Variant(ua.AccessLevelMask.CurrentRead, ua.VariantType.Byte)))
self.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
self.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
def set_attr_bit(self, attr, bit):
val = self.get_attribute(attr)
val.Value.Value = ua.set_bit(val.Value.Value, bit)
self.set_attribute(attr, val)
def unset_attr_bit(self, attr, bit):
val = self.get_attribute(attr)
val.Value.Value = ua.unset_bit(val.Value.Value, bit)
self.set_attribute(attr, val)
def set_read_only(self):
"""
......@@ -290,20 +305,27 @@ class Node(object):
# FIXME: seems this method may return several nodes
return Node(self.server, result.Targets[0].TargetId)
def read_raw_history(self, starttime=None, endtime=None, numvalues=0, returnbounds=True):
def read_raw_history(self, starttime=None, endtime=None, numvalues=0):
"""
Read raw history of a node
result code from server is checked and an exception is raised in case of error
If numvalues is > 0 and number of events in period is > numvalues
then result will be truncated
"""
details = ua.ReadRawModifiedDetails()
details.IsReadModified = False
if starttime:
details.StartTime = starttime
else:
details.StartTime = ua.DateTimeMinValue
if endtime:
details.EndTime = endtime
else:
details.EndTime = ua.DateTimeMinValue
details.NumValuesPerNode = numvalues
details.ReturnBounds = returnbounds
return self.history_read(details)
details.ReturnBounds = True
result = self.history_read(details)
return result.HistoryData.DataValues
def history_read(self, details):
"""
......@@ -320,12 +342,12 @@ class Node(object):
params.ReleaseContinuationPoints = False
params.NodesToRead.append(valueid)
result = self.server.history_read(params)[0]
return result.HistoryData
return result
# Hack for convenience methods
# local import is ugly but necessary for python2 support
# feel fri to propose something better but I want to split all those
# create methods fro Node
# create methods from Node
def add_folder(*args, **kwargs):
from opcua.common import manage_nodes
......
......@@ -58,7 +58,7 @@ class AttributeService(object):
continue
al = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.AccessLevel)
ual = self._aspace.get_attribute_value(writevalue.NodeId, ua.AttributeIds.UserAccessLevel)
if not al.Value.Value & ua.AccessLevelMask.CurrentWrite or not ual.Value.Value & ua.AccessLevelMask.CurrentWrite:
if not ua.test_bit(al.Value.Value, ua.AccessLevel.CurrentWrite) or not ua.test_bit(ual.Value.Value, ua.AccessLevel.CurrentWrite):
res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
continue
res.append(self._aspace.set_attribute_value(writevalue.NodeId, writevalue.AttributeId, writevalue.Value))
......
from datetime import timedelta
from datetime import datetime
from opcua import Subscription
from opcua import ua
class HistoryStorageInterface(object):
"""
Interface of a history backend.
Must be implemented by backends
"""
def new_historized_node(self, node, period, count=0):
"""
Called when a new node is to be historized
Returns None
"""
raise NotImplementedError
def save_node_value(self, node, datavalue):
"""
Called when the value of a historized node has changed and should be saved in history
Returns None
"""
raise NotImplementedError
def read_node_history(self, node, start, end, nb_values):
"""
Called when a client make a history read request for a node
if start or end is missing then nb_values is used to limit query
nb_values is the max number of values to read. Ignored if 0
Start time and end time are inclusive
Returns a list of DataValues and a continuation point which
is None if all nodes are read or the ServerTimeStamp of the last rejected DataValue
"""
raise NotImplementedError
def new_historized_event(self, event, period):
"""
Called when historization of events is enabled on server side
FIXME: we may need to store events per nodes in future...
Returns None
"""
raise NotImplementedError
def save_event(self, event):
"""
Called when a new event has been generated ans should be saved in history
Returns None
"""
raise NotImplementedError
def read_event_history(self, start, end, evfilter):
"""
Called when a client make a history read request for events
Start time and end time are inclusive
Returns a list of Events and a continuation point which
is None if all events are read or the ServerTimeStamp of the last rejected event
"""
raise NotImplementedError
class HistoryDict(HistoryStorageInterface):
"""
very minimal history backend storing data in memory using a Python dictionnary
"""
def __init__(self):
self._datachanges = {}
self._datachanges_period = {}
self._events = {}
def new_historized_node(self, node, period, count=0):
self._datachanges[node] = []
self._datachanges_period[node] = period, count
def new_historized_event(self, event, period):
self._events = []
def save_node_value(self, node, datavalue):
data = self._datachanges[node]
period, count = self._datachanges_period[node]
data.append(datavalue)
now = datetime.now()
if period:
while now - data[0].ServerTimestamp > period:
data.pop(0)
if count and len(data) > count:
data = data[-count:]
def read_node_history(self, node, start, end, nb_values):
cont = None
if node not in self._datachanges:
print("Error attempt to read history for a node which is not historized")
return [], cont
else:
if end is None:
end = datetime.now() + timedelta(days=1)
if start is None:
start = ua.DateTimeMinValue
results = [dv for dv in self._datachanges[node] if start <= dv.ServerTimestamp <= end]
if nb_values:
if start > ua.DateTimeMinValue and len(results) > nb_values:
cont = results[nb_values + 1].ServerTimestamp
results = results[:nb_values]
else:
results = results[-nb_values:]
return results, cont
def save_event(self, event):
raise NotImplementedError
def read_event_history(self, start, end, evfilter):
raise NotImplementedError
class SubHandler(object):
def __init__(self, storage):
self.storage = storage
def datachange_notification(self, node, val, data):
self.storage.save_node_value(node.nodeid, data.monitored_item.Value)
def event_notification(self, event):
self.storage.save_event(event)
class HistoryManager(object):
def __init__(self, iserver):
self.iserver = iserver
self.storage = HistoryDict()
self._sub = None
self._handlers = {}
def set_storage(self, storage):
self.storage = storage
def _create_subscription(self, handler):
params = ua.CreateSubscriptionParameters()
params.RequestedPublishingInterval = 10
params.RequestedLifetimeCount = 3000
params.RequestedMaxKeepAliveCount = 10000
params.MaxNotificationsPerPublish = 0
params.PublishingEnabled = True
params.Priority = 0
return Subscription(self.iserver.isession, params, handler)
def historize(self, node, period=timedelta(days=7), count=0):
if not self._sub:
self._sub = self._create_subscription(SubHandler(self.storage))
if node in self._handlers:
raise ua.UaError("Node {} is allready historized".format(node))
self.storage.new_historized_node(node.nodeid, period, count)
handler = self._sub.subscribe_data_change(node)
self._handlers[node] = handler
def dehistorize(self, node):
self._sub.unsubscribe(self._handlers[node])
del(self._handlers[node])
def read_history(self, params):
"""
Read history for a node
This is the part AttributeService, but implemented as its own service
since it requires more logic than other attribute service methods
"""
results = []
for rv in params.NodesToRead:
res = self._read_history(params.HistoryReadDetails, rv)
results.append(res)
return results
def _read_history(self, details, rv):
""" read history for a node
"""
result = ua.HistoryReadResult()
if isinstance(details, ua.ReadRawModifiedDetails):
if details.IsReadModified:
result.HistoryData = ua.HistoryModifiedData()
# we do not support modified history by design so we return what we have
else:
result.HistoryData = ua.HistoryData()
dv, cont = self._read_datavalue_history(rv, details)
result.HistoryData.DataValues = dv
result.ContinuationPoint = cont
elif isinstance(details, ua.ReadEventDetails):
result.HistoryData = ua.HistoryEvent()
# FIXME: filter is a cumbersome type, maybe transform it something easier
# to handle for storage
result.HistoryData.Events = self.storage.read_event_history(details.StartTime,
details.EndTime,
details.Filter)
else:
# we do not currently support the other types, clients can process data themselves
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotImplemented)
return result
def _read_datavalue_history(self, rv, details):
starttime = details.StartTime
if rv.ContinuationPoint:
# Spec says we should ignore details if cont point is present
# but they also say we can use cont point as timestamp to enable stateless
# implementation. This is contradictory, so we assume details is
# send correctly with continuation point
#starttime = bytes_to_datetime(rv.ContinuationPoint)
starttime = ua.unpack_datetime(rv.ContinuationPoint)
dv, cont = self.storage.read_node_history(rv.NodeId,
starttime,
details.EndTime,
details.NumValuesPerNode)
if cont:
#cont = datetime_to_bytes(dv[-1].ServerTimestamp)
cont = ua.pack_datetime(dv[-1].ServerTimestamp)
# FIXME, parse index range and filter out if necesary
# rv.IndexRange
# rv.DataEncoding # xml or binary, seems spec say we can ignore that one
return dv, cont
def update_history(self, params):
"""
Update history for a node
This is the part AttributeService, but implemented as its own service
since it requires more logic than other attribute service methods
"""
results = []
for _ in params.HistoryUpdateDetails:
result = ua.HistoryUpdateResult()
# we do not accept to rewrite history
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadNotWritable)
results.append(results)
return results
......@@ -4,6 +4,7 @@ Internal server implementing opcu-ua interface. can be used on server side or to
from datetime import datetime
from copy import copy, deepcopy
from datetime import timedelta
import logging
from threading import Lock
from enum import Enum
......@@ -16,6 +17,7 @@ except ImportError:
from opcua import ua
from opcua.common import utils
from opcua.common.node import Node
from opcua.server.history import HistoryManager
from opcua.server.address_space import AddressSpace
from opcua.server.address_space import AttributeService
from opcua.server.address_space import ViewService
......@@ -67,6 +69,8 @@ class InternalServer(object):
self.asyncio_transports = []
self.subscription_service = SubscriptionService(self.loop, self.aspace)
self.history_manager = HistoryManager(self)
# create a session to use on server side
self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
......@@ -148,6 +152,25 @@ class InternalServer(object):
def create_session(self, name, user=User.Anonymous, external=False):
return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
def enable_history(self, node, period=timedelta(days=7), count=0):
"""
Set attribute Historizing of node to True and start storing data for history
"""
node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
self.history_manager.historize(node, period, count)
def disable_history(self, node):
"""
Set attribute Historizing of node to False and stop storing data for history
"""
node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
self.history_manager.dehistorize(node)
class InternalSession(object):
_counter = 10
......@@ -219,6 +242,9 @@ class InternalSession(object):
return results
return [deepcopy(dv) for dv in results]
def history_read(self, params):
return self.iserver.history_manager.read_history(params)
def write(self, params):
if not self.external:
# If session is internal we need to store a copy og object, not a reference,
......
......@@ -333,6 +333,18 @@ class UaProcessor(object):
self.logger.info("sending delete monitored items response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
self.logger.info("history read request")
params = ua.HistoryReadParameters.from_binary(body)
results = self.session.history_read(params)
response = ua.HistoryReadResponse()
response.Results = results
self.logger.info("sending history read response")
self.send_response(requesthdr.RequestHandle, algohdr, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
self.logger.info("publish request")
......
......@@ -14,34 +14,6 @@ logger = logging.getLogger('opcua.uaprotocol')
OPC_TCP_SCHEME = 'opc.tcp'
class ValueRank(IntEnum):
"""
Defines dimensions of a variable.
This enum does not support all cases since ValueRank support any n>0
but since it is an IntEnum it can be replace by a normal int
"""
ScalarOrOneDimension = -3
Any = -2
Scalar = -1
OneOrMoreDimensions = 0
OneDimension = 1
# the next names are not in spec but so common we express them here
TwoDimensions = 2
ThreeDimensions = 3
FourDimensions = 3
class AccessLevelMask(object):
"""
Used by AccessLevel and UserAccessLevel
"""
CurrentRead = 0
CurrentWrite = 1
HistoryRead = 2
HistoryWrite = 3
SemanticChange = 4
class Hello(uatypes.FrozenClass):
def __init__(self):
......
......@@ -2,7 +2,7 @@
implement ua datatypes
"""
import logging
from enum import Enum
from enum import Enum, IntEnum
from datetime import datetime, timedelta, tzinfo, MAXYEAR
from calendar import timegm
import sys
......@@ -62,6 +62,10 @@ def win_epoch_to_datetime(epch):
return datetime(MAXYEAR, 12, 31, 23, 59, 59, 999999)
# minimum datetime as in ua spec, used for history
DateTimeMinValue = datetime(1606, 1, 1, 12, 0, 0)
def build_array_format_py2(prefix, length, fmtchar):
return prefix + str(length) + fmtchar
......@@ -209,11 +213,11 @@ def unpack_datetime(data):
def pack_string(string):
if string is None:
return uatype_Int32.pack(-1)
if isinstance(string, unicode):
string = string.encode('utf-8')
length = len(string)
if length == 0:
return b'\xff\xff\xff\xff'
return uatype_Int32.pack(length) + string
pack_bytes = pack_string
......@@ -222,12 +226,14 @@ pack_bytes = pack_string
def unpack_bytes(data):
length = uatype_Int32.unpack(data.read(4))[0]
if length == -1:
return b''
return None
return data.read(length)
def py3_unpack_string(data):
b = unpack_bytes(data)
if b is None:
return b
return b.decode("utf-8")
......@@ -247,6 +253,11 @@ def set_bit(data, offset):
return data | mask
def unset_bit(data, offset):
mask = 1 << offset
return data & ~mask
class _FrozenClass(object):
"""
......@@ -270,6 +281,44 @@ else:
FrozenClass = _FrozenClass
class ValueRank(IntEnum):
"""
Defines dimensions of a variable.
This enum does not support all cases since ValueRank support any n>0
but since it is an IntEnum it can be replace by a normal int
"""
ScalarOrOneDimension = -3
Any = -2
Scalar = -1
OneOrMoreDimensions = 0
OneDimension = 1
# the next names are not in spec but so common we express them here
TwoDimensions = 2
ThreeDimensions = 3
FourDimensions = 3
class AccessLevel(IntEnum):
"""
"""
CurrentRead = 0
CurrentWrite = 1
HistoryRead = 2
HistoryWrite = 3
SemanticChange = 4
class AccessLevelMask(IntEnum):
"""
Mask for access level
"""
CurrentRead = 1 << AccessLevel.CurrentRead
CurrentWrite = 1 << AccessLevel.CurrentWrite
HistoryRead = 1 << AccessLevel.HistoryRead
HistoryWrite = 1 << AccessLevel.HistoryWrite
SemanticChange = 1 << AccessLevel.SemanticChange
class Guid(FrozenClass):
def __init__(self):
......
......@@ -16,7 +16,7 @@ except ImportError:
from tests_cmd_lines import TestCmdLines
from tests_server import TestServer
from tests_client import TestClient
from tests_unit import Unit
from tests_unit import TestUnit
if CRYPTOGRAPHY_AVAILABLE:
from tests_crypto_connect import TestCryptoConnect
......
......@@ -867,4 +867,32 @@ class CommonTests(object):
self.assertTrue(len(endpoints) > 0)
self.assertTrue(endpoints[0].EndpointUrl.startswith("opc.tcp://"))
def test_history_read(self):
o = self.srv.get_objects_node()
vals = [i for i in range(20)]
var = o.add_variable(3, "history_var", 0)
self.srv.iserver.enable_history(var, period=None, count=10)
for i in vals:
var.set_value(i)
time.sleep(1)
now = datetime.now()
old = now - timedelta(days=6)
res = var.read_raw_history(None, now, 2)
self.assertEqual(len(res), 2)
self.assertEqual(res[-1].Value.Value, vals[-1])
res = var.read_raw_history(old, now, 0)
self.assertEqual(len(res), 20)
self.assertEqual(res[-1].Value.Value, vals[-1])
self.assertEqual(res[0].Value.Value, vals[0])
res = var.read_raw_history(old, now, 5)
self.assertEqual(len(res), 5)
self.assertEqual(res[-1].Value.Value, vals[4])
self.assertEqual(res[0].Value.Value, vals[0])
import unittest
from tests_common import CommonTests, add_server_methods
import time
from datetime import timedelta
from opcua import Server
from opcua import Client
......@@ -126,5 +127,14 @@ class TestServer(unittest.TestCase, CommonTests):
val = v.get_value()
self.assertEqual(val, "StringValue")
def test_historize(self):
o = self.opc.get_objects_node()
var = o.add_variable(3, "test_hist", 1.0)
self.srv.iserver.enable_history(var, timedelta(days=1))
time.sleep(1)
var.set_value(2.0)
var.set_value(3.0)
self.srv.iserver.disable_history(var)
......@@ -12,7 +12,7 @@ from opcua.ua.uatypes import flatten, get_shape, reshape
class Unit(unittest.TestCase):
class TestUnit(unittest.TestCase):
'''
Simple unit test that do not need to setup a server or a client
......@@ -140,13 +140,22 @@ class Unit(unittest.TestCase):
with self.assertRaises(ua.UaError):
ua.QualifiedName.from_string("i:::yu")
def test_expandednodeid(self):
nid = ua.ExpandedNodeId()
self.assertEqual(nid.NodeIdType, ua.NodeIdType.TwoByte)
nid2 = ua.ExpandedNodeId.from_binary(ua.utils.Buffer(nid.to_binary()))
self.assertEqual(nid, nid2)
def test_null_string(self):
v = ua.Variant(None, ua.VariantType.String)
b = v.to_binary()
v2 = ua.Variant.from_binary(ua.utils.Buffer(b))
self.assertEqual(v.Value, v2.Value)
v = ua.Variant("", ua.VariantType.String)
b = v.to_binary()
v2 = ua.Variant.from_binary(ua.utils.Buffer(b))
self.assertEqual(v.Value, v2.Value)
def test_extension_object(self):
obj = ua.UserNameIdentityToken()
obj.UserName = "admin"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment