Commit 92999182 authored by ORD's avatar ORD

Merge pull request #179 from FreeOpcUa/evfilter

WhereClause support
parents 94ecade8 8b3323a7
...@@ -23,9 +23,9 @@ class SubHandler(object): ...@@ -23,9 +23,9 @@ class SubHandler(object):
if __name__ == "__main__": if __name__ == "__main__":
#from IPython import embed #from IPython import embed
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
#client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/") client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
client = Client("opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/") #client = Client("opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/")
client.set_security_string("Basic256,SignAndEncrypt,certificate-example.der,private-key-example.pem") #client.set_security_string("Basic256,SignAndEncrypt,certificate-example.der,private-key-example.pem")
try: try:
client.connect() client.connect()
root = client.get_root_node() root = client.get_root_node()
......
import sys
sys.path.insert(0, "..")
import logging
from opcua import Client
from opcua import ua
from IPython import embed
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val)
def event_notification(self, event):
print("Python: New event", event.EventType)
if __name__ == "__main__":
#from IPython import embed
logging.basicConfig(level=logging.WARN)
client = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
#client = Client("opc.tcp://olivier:olivierpass@localhost:53530/OPCUA/SimulationServer/")
try:
client.connect()
root = client.get_root_node()
print("Root is", root)
handler = SubHandler()
sub = client.create_subscription(500, handler)
handle = sub.subscribe_events(evtype=2788)
# refresh server condition to force generation of events
cond = root.get_child(["0:Types", "0:EventTypes", "0:BaseEventType", "0:ConditionType"])
cond.call_method("0:ConditionRefresh", ua.Variant(sub.subscription_id, ua.VariantType.UInt32))
embed()
finally:
client.disconnect()
...@@ -53,10 +53,12 @@ if __name__ == "__main__": ...@@ -53,10 +53,12 @@ if __name__ == "__main__":
try: try:
# time.sleep is here just because we want to see events in UaExpert # time.sleep is here just because we want to see events in UaExpert
import time import time
for i in range(1, 10): count = 0
time.sleep(10) while True:
myevgen.trigger(message="This is MyFirstEvent with MyNumericProperty and MyStringProperty.") time.sleep(2)
mysecondevgen.trigger(message="This is MySecondEvent with MyIntProperty and MyBoolProperty.") myevgen.trigger(message="MyFirstEvent " + str(count))
mysecondevgen.trigger(message="MySecondEvent " + str(count))
count += 1
embed() embed()
finally: finally:
......
...@@ -211,22 +211,56 @@ class Subscription(object): ...@@ -211,22 +211,56 @@ class Subscription(object):
def _get_filter_from_event_type(self, eventtype): def _get_filter_from_event_type(self, eventtype):
eventtype = self._get_node(eventtype) eventtype = self._get_node(eventtype)
evfilter = ua.EventFilter() evfilter = ua.EventFilter()
for property in get_event_properties_from_type_node(eventtype): evfilter.SelectClauses = self._select_clauses_from_evtype(eventtype)
op = ua.SimpleAttributeOperand() evfilter.WhereClause = self._where_clause_from_evtype(eventtype)
op.TypeDefinitionId = eventtype.nodeid
op.AttributeId = ua.AttributeIds.Value
op.BrowsePath = [property.get_browse_name()]
evfilter.SelectClauses.append(op)
return evfilter return evfilter
def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType): def _select_clauses_from_evtype(self, evtype):
clauses = []
for prop in get_event_properties_from_type_node(evtype):
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = evtype.nodeid
op.AttributeId = ua.AttributeIds.Value
op.BrowsePath = [prop.get_browse_name()]
clauses.append(op)
return clauses
def _where_clause_from_evtype(self, evtype):
cf = ua.ContentFilter()
el = ua.ContentFilterElement()
# operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
op = ua.SimpleAttributeOperand()
op.TypeDefinitionId = evtype.nodeid
op.BrowsePath.append(ua.QualifiedName("EventType", 0))
op.AttributeId = ua.AttributeIds.Value
el.FilterOperands.append(op)
for subtypeid in [st.nodeid for st in self._get_subtypes(evtype)]:
op = ua.LiteralOperand()
op.Value = ua.Variant(subtypeid)
el.FilterOperands.append(op)
el.FilterOperator = ua.FilterOperator.InList
cf.Elements.append(el)
return cf
def _get_subtypes(self, parent, nodes=None):
if nodes is None:
nodes = [parent]
for child in parent.get_children(refs=ua.ObjectIds.HasSubtype):
nodes.append(child)
self._get_subtypes(child, nodes)
return nodes
def subscribe_events(self, sourcenode=ua.ObjectIds.Server, evtype=ua.ObjectIds.BaseEventType, evfilter=None):
""" """
Subscribe to events from a node. Default node is Server node. Subscribe to events from a node. Default node is Server node.
In most servers the server node is the only one you can subscribe to. In most servers the server node is the only one you can subscribe to.
if evfilter is provided, evtype is ignored
Return a handle which can be used to unsubscribe Return a handle which can be used to unsubscribe
""" """
sourcenode = self._get_node(sourcenode) sourcenode = self._get_node(sourcenode)
evfilter = self._get_filter_from_event_type(evtype) if evfilter is None:
evfilter = self._get_filter_from_event_type(evtype)
return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter) return self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter)
def _subscribe(self, nodes, attr, mfilter=None, queuesize=0): def _subscribe(self, nodes, attr, mfilter=None, queuesize=0):
......
...@@ -56,7 +56,7 @@ class EventGenerator(object): ...@@ -56,7 +56,7 @@ class EventGenerator(object):
source = Node(self.isession, self.event.SourceNode) source = Node(self.isession, self.event.SourceNode)
self.event.SourceNode = source.nodeid self.event.SourceNode = source.nodeid
self.event.SourceName = source.get_display_name().Text self.event.SourceName = source.get_browse_name().Name
source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte))) source.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(1, ua.VariantType.Byte)))
refs = [] refs = []
......
...@@ -18,6 +18,7 @@ class MonitoredItemData(object): ...@@ -18,6 +18,7 @@ class MonitoredItemData(object):
self.parameters = None self.parameters = None
self.mode = None self.mode = None
self.mfilter = None self.mfilter = None
self.where_clause_evaluator = None
class MonitoredItemService(object): class MonitoredItemService(object):
...@@ -99,7 +100,9 @@ class MonitoredItemService(object): ...@@ -99,7 +100,9 @@ class MonitoredItemService(object):
return result, mdata return result, mdata
def _create_events_monitored_item(self, params): def _create_events_monitored_item(self, params):
self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId) self.logger.info("request to subscribe to events for node %s and attribute %s",
params.ItemToMonitor.NodeId,
params.ItemToMonitor.AttributeId)
result, mdata = self._make_monitored_item_common(params) result, mdata = self._make_monitored_item_common(params)
ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value ev_notify_byte = self.aspace.get_attribute_value(params.ItemToMonitor.NodeId, ua.AttributeIds.EventNotifier).Value.Value
...@@ -109,13 +112,18 @@ class MonitoredItemService(object): ...@@ -109,13 +112,18 @@ class MonitoredItemService(object):
result.FilterResult = ua.EventFilterResult() result.FilterResult = ua.EventFilterResult()
for _ in params.RequestedParameters.Filter.SelectClauses: for _ in params.RequestedParameters.Filter.SelectClauses:
result.FilterResult.SelectClauseResults.append(ua.StatusCode()) result.FilterResult.SelectClauseResults.append(ua.StatusCode())
# FIXME: where clause result # TODO: spec says we should check WhereClause here
mdata.where_clause_evaluator = WhereClauseEvaluator(self.logger, self.aspace, mdata.mfilter.WhereClause)
self._commit_monitored_item(result, mdata) self._commit_monitored_item(result, mdata)
self._monitored_events[params.ItemToMonitor.NodeId] = result.MonitoredItemId if params.ItemToMonitor.NodeId not in self._monitored_events:
self._monitored_events[params.ItemToMonitor.NodeId] = []
self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
return result return result
def _create_data_change_monitored_item(self, params): def _create_data_change_monitored_item(self, params):
self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId, params.ItemToMonitor.AttributeId) self.logger.info("request to subscribe to datachange for node %s and attribute %s",
params.ItemToMonitor.NodeId,
params.ItemToMonitor.AttributeId)
result, mdata = self._make_monitored_item_common(params) result, mdata = self._make_monitored_item_common(params)
result.FilterResult = params.RequestedParameters.Filter result.FilterResult = params.RequestedParameters.Filter
...@@ -141,8 +149,10 @@ class MonitoredItemService(object): ...@@ -141,8 +149,10 @@ class MonitoredItemService(object):
if mid not in self._monitored_items: if mid not in self._monitored_items:
return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid) return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
for k, v in self._monitored_events.items(): for k, v in self._monitored_events.items():
if v == mid: if mid in v:
self._monitored_events.pop(k) v.remove(mid)
if not v:
self._monitored_events.pop(k)
break break
for k, v in self._monitored_datachange.items(): for k, v in self._monitored_datachange.items():
if v == mid: if v == mid:
...@@ -154,10 +164,12 @@ class MonitoredItemService(object): ...@@ -154,10 +164,12 @@ class MonitoredItemService(object):
def datachange_callback(self, handle, value, error=None): def datachange_callback(self, handle, value, error=None):
if error: if error:
self.logger.info("subscription %s: datachange callback called with handle '%s' and erorr '%s'", self, handle, error) self.logger.info("subscription %s: datachange callback called with handle '%s' and erorr '%s'",
self, handle, error)
self.trigger_statuschange(error) self.trigger_statuschange(error)
else: else:
self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'", self, handle, value.Value) self.logger.info("subscription %s: datachange callback called with handle '%s' and value '%s'",
self, handle, value.Value)
event = ua.MonitoredItemNotification() event = ua.MonitoredItemNotification()
with self._lock: with self._lock:
mid = self._monitored_datachange[handle] mid = self._monitored_datachange[handle]
...@@ -169,26 +181,33 @@ class MonitoredItemService(object): ...@@ -169,26 +181,33 @@ class MonitoredItemService(object):
def trigger_event(self, event): def trigger_event(self, event):
with self._lock: with self._lock:
if event.SourceNode not in self._monitored_events: if event.SourceNode not in self._monitored_events:
self.logger.debug("%s has no subscription for events %s from node: %s", self, event, event.SourceNode) self.logger.debug("%s has no subscription for events %s from node: %s",
return False self, event, event.SourceNode)
self.logger.debug("%s has subscription for events %s from node: %s", self, event, event.SourceNode)
mid = self._monitored_events[event.SourceNode]
if mid not in self._monitored_items:
self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event, self)
return False return False
mdata = self._monitored_items[mid] self.logger.debug("%s has subscription for events %s from node: %s",
fieldlist = ua.EventFieldList() self, event, event.SourceNode)
fieldlist.ClientHandle = mdata.client_handle mids = self._monitored_events[event.SourceNode]
fieldlist.EventFields = self._get_event_fields(mdata.mfilter, event) for mid in mids:
self.isub.enqueue_event(mid, fieldlist, mdata.parameters.RevisedQueueSize) self._trigger_event(event, mid)
return True
def _trigger_event(self, event, mid):
if mid not in self._monitored_items:
self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event, self)
return
mdata = self._monitored_items[mid]
if not mdata.where_clause_evaluator.eval(event):
self.logger.debug("Event does not fit WhereClause, not generating event", mid, event, self)
return
fieldlist = ua.EventFieldList()
fieldlist.ClientHandle = mdata.client_handle
fieldlist.EventFields = self._get_event_fields(mdata.mfilter, event)
self.isub.enqueue_event(mid, fieldlist, mdata.parameters.RevisedQueueSize)
def _get_event_fields(self, evfilter, event): def _get_event_fields(self, evfilter, event):
fields = [] fields = []
for sattr in evfilter.SelectClauses: for sattr in evfilter.SelectClauses:
try: try:
if not sattr.BrowsePath: if not sattr.BrowsePath:
#val = getattr(event, ua.AttributeIdsInv[sattr.Attribute])
val = getattr(event, sattr.Attribute.name) val = getattr(event, sattr.Attribute.name)
val = copy.deepcopy(val) val = copy.deepcopy(val)
fields.append(ua.Variant(val)) fields.append(ua.Variant(val))
...@@ -347,3 +366,88 @@ class InternalSubscription(object): ...@@ -347,3 +366,88 @@ class InternalSubscription(object):
if len(queue[mid]) >= size: if len(queue[mid]) >= size:
queue[mid].pop(0) queue[mid].pop(0)
queue[mid].append(eventdata) queue[mid].append(eventdata)
class WhereClauseEvaluator(object):
def __init__(self, logger, aspace, whereclause):
self.logger = logger
self.elements = whereclause.Elements
self._aspace = aspace
def eval(self, event):
if not self.elements:
return True
# spec says we should only evaluate first element, which may use other elements
try:
res = self._eval_el(0, event)
except Exception as ex:
self.logger.warning("Exception while evaluating WhereClause %s for event %s: %s", self.elements, event, ex)
return False
return res
def _eval_el(self, index, event):
el = self.elements[index]
#ops = [self._eval_op(op, event) for op in el.FilterOperands]
ops = el.FilterOperands # just to make code more readable
if el.FilterOperator == ua.FilterOperator.Equals:
return self._eval_op(ops[0], event) == self._eval_el(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.IsNull:
return self._eval_op(ops[0], event) is None # FIXME: might be too strict
elif el.FilterOperator == ua.FilterOperator.GreaterThan:
return self._eval_op(ops[0], event) > self._eval_el(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.LessThan:
return self._eval_op(ops[0], event) < self._eval_el(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.GreaterThanOrEqual:
return self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.LessThanOrEqual:
return self._eval_op(ops[0], event) <= self._eval_el(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.Like:
return self._likeoperator(self._eval_op(ops[0], event), self._eval_el(ops[1], event))
elif el.FilterOperator == ua.FilterOperator.Not:
return not self._eval_op(ops[0], event)
elif el.FilterOperator == ua.FilterOperator.Between:
return self._eval_el(ops[2], event) >= self._eval_op(ops[0], event) >= self._eval_el(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.InList:
return self._eval_op(ops[0], event) in [self._eval_op(op, event) for op in ops[1:]]
elif el.FilterOperator == ua.FilterOperator.And:
self.elements(ops[0].Index)
return self._eval_op(ops[0], event) and self._eval_op(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.Or:
return self._eval_op(ops[0], event) or self._eval_el(ops[1], event)
elif el.FilterOperator == ua.FilterOperator.Cast:
self.logger("Cast operand not implemented")
raise NotImplementError
else:
# TODO: implement missing operators
print("WhereClause not implemented for element: %s", el)
raise NotImplementError
def _like_operator(self, string, pattern):
raise NotImplementError
def _eval_op(self, op, event):
# seems spec says we should return Null if issues
if type(op) is ua.ElementOperand:
el = self.elements[op.FilterOperands[0].Index]
return self._eval_el(el)
elif type(op) is ua.AttributeOperand:
if op.BrowsePath:
return getattr(event, op.BrowsePath.Elements[0].TargetName.Name)
else:
return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
# FIXME: check, this is probably broken
elif type(op) is ua.SimpleAttributeOperand:
if op.BrowsePath:
# we only support depth of 1
return getattr(event, op.BrowsePath[0].Name)
else:
# TODO: write code for index range.... but doe it make any sense
return self._aspace.get_attribute_value(event.EventType, op.AttributeId).Value.Value
elif type(op) is ua.LiteralOperand:
return op.Value.Value
else:
self.logger.warning("Where clause element % is not of a known type", el)
raise NotImplementError
...@@ -4,12 +4,13 @@ from opcua import Client ...@@ -4,12 +4,13 @@ from opcua import Client
from opcua import Server from opcua import Server
from opcua import ua from opcua import ua
from tests_subscriptions import SubscriptionTests
from tests_common import CommonTests, add_server_methods from tests_common import CommonTests, add_server_methods
port_num1 = 48510 port_num1 = 48510
class TestClient(unittest.TestCase, CommonTests): class TestClient(unittest.TestCase, CommonTests, SubscriptionTests):
''' '''
Run common tests on client side Run common tests on client side
......
...@@ -33,79 +33,6 @@ def add_server_methods(srv): ...@@ -33,79 +33,6 @@ def add_server_methods(srv):
v = o.add_method(ua.NodeId("ServerMethodArray2", 2), ua.QualifiedName('ServerMethodArray2', 2), func3, [ua.VariantType.Int64], [ua.VariantType.Int64]) v = o.add_method(ua.NodeId("ServerMethodArray2", 2), ua.QualifiedName('ServerMethodArray2', 2), func3, [ua.VariantType.Int64], [ua.VariantType.Int64])
class SubHandler():
'''
Dummy subscription client
'''
def datachange_notification(self, node, val, data):
pass
def event_notification(self, event):
pass
class MySubHandlerDeprecated():
'''
More advanced subscription client using Future, so we can wait for events in tests
'''
def __init__(self):
self.future = Future()
def reset(self):
self.future = Future()
def data_change(self, handle, node, val, attr):
self.future.set_result((handle, node, val, attr))
def event(self, handle, event):
self.future.set_result((handle, event))
class MySubHandler():
'''
More advanced subscription client using Future, so we can wait for events in tests
'''
def __init__(self):
self.future = Future()
def reset(self):
self.future = Future()
def datachange_notification(self, node, val, data):
self.future.set_result((node, val, data))
def event_notification(self, event):
self.future.set_result(event)
class MySubHandler2():
def __init__(self):
self.results = []
def datachange_notification(self, node, val, data):
self.results.append((node, val))
def event_notification(self, event):
self.results.append(event)
class MySubHandlerCounter():
def __init__(self):
self.datachange_count = 0
self.event_count = 0
def datachange_notification(self, node, val, data):
self.datachange_count += 1
def event_notification(self, event):
self.event_count += 1
class CommonTests(object): class CommonTests(object):
''' '''
...@@ -262,189 +189,6 @@ class CommonTests(object): ...@@ -262,189 +189,6 @@ class CommonTests(object):
v2 = o.get_child("3:BNVariable with spaces and %&+?/") v2 = o.get_child("3:BNVariable with spaces and %&+?/")
self.assertEqual(v, v2) self.assertEqual(v, v2)
def test_create_delete_subscription(self):
o = self.opc.get_objects_node()
v = o.add_variable(3, 'SubscriptionVariable', [1, 2, 3])
sub = self.opc.create_subscription(100, MySubHandler())
handle = sub.subscribe_data_change(v)
time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_subscribe_events(self):
sub = self.opc.create_subscription(100, MySubHandler())
handle = sub.subscribe_events()
time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_subscribe_events_to_wrong_node(self):
sub = self.opc.create_subscription(100, MySubHandler())
with self.assertRaises(ua.UaStatusCodeError):
handle = sub.subscribe_events(self.opc.get_node("i=85"))
o = self.opc.get_objects_node()
v = o.add_variable(3, 'VariableNoEventNofierAttribute', 4)
with self.assertRaises(ua.UaStatusCodeError):
handle = sub.subscribe_events(v)
sub.delete()
def test_get_event_from_type_node_BaseEvent(self):
etype = self.opc.get_node(ua.ObjectIds.BaseEventType)
properties = opcua.common.subscription.get_event_properties_from_type_node(etype)
for child in etype.get_properties():
self.assertTrue(child in properties)
def test_get_event_from_type_node_CustomEvent(self):
etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.AuditEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
properties = opcua.common.subscription.get_event_properties_from_type_node(etype)
for child in self.opc.get_node(ua.ObjectIds.BaseEventType).get_properties():
self.assertTrue(child in properties)
for child in self.opc.get_node(ua.ObjectIds.AuditEventType).get_properties():
self.assertTrue(child in properties)
for child in self.opc.get_node(etype.nodeid).get_properties():
self.assertTrue(child in properties)
self.assertTrue(etype.get_child("2:PropertyNum") in properties)
self.assertTrue(etype.get_child("2:PropertyString") in properties)
def test_events_default(self):
evgen = self.srv.get_event_generator()
msclt = MySubHandler()
sub = self.opc.create_subscription(100, msclt)
handle = sub.subscribe_events()
tid = datetime.utcnow()
msg = b"this is my msg "
evgen.trigger(tid, msg)
ev = msclt.future.result()
self.assertIsNot(ev, None) # we did not receive event
self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
self.assertEqual(ev.Severity, 1)
self.assertEqual(ev.SourceName, self.opc.get_server_node().get_display_name().Text)
self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
self.assertEqual(ev.Message.Text, msg)
self.assertEqual(ev.Time, tid)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_events_MyObject(self):
objects = self.srv.get_objects_node()
o = objects.add_object(3, 'MyObject')
evgen = self.srv.get_event_generator(source=o)
msclt = MySubHandler()
sub = self.opc.create_subscription(100, msclt)
handle = sub.subscribe_events(o)
tid = datetime.utcnow()
msg = b"this is my msg "
evgen.trigger(tid, msg)
ev = msclt.future.result(10)
self.assertIsNot(ev, None) # we did not receive event
self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
self.assertEqual(ev.Severity, 1)
self.assertEqual(ev.SourceName, b'MyObject')
self.assertEqual(ev.SourceNode, o.nodeid)
self.assertEqual(ev.Message.Text, msg)
self.assertEqual(ev.Time, tid)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_events_wrong_source(self):
objects = self.srv.get_objects_node()
o = objects.add_object(3, 'MyObject')
evgen = self.srv.get_event_generator(source=o)
msclt = MySubHandler()
sub = self.opc.create_subscription(100, msclt)
handle = sub.subscribe_events()
tid = datetime.utcnow()
msg = b"this is my msg "
evgen.trigger(tid, msg)
with self.assertRaises(TimeoutError): # we should not receive event
ev = msclt.future.result(10)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_events_CustomEvent(self):
etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
evgen = self.srv.get_event_generator(etype)
msclt = MySubHandler()
sub = self.opc.create_subscription(100, msclt)
handle = sub.subscribe_events(evtype=etype)
propertynum = 2
propertystring = "This is my test"
evgen.event.PropertyNum = propertynum
evgen.event.PropertyString = propertystring
serverity = 500
evgen.event.Severity = serverity
tid = datetime.utcnow()
msg = b"this is my msg "
evgen.trigger(tid, msg)
ev = msclt.future.result(10)
self.assertIsNot(ev, None) # we did not receive event
self.assertEqual(ev.EventType, etype.nodeid)
self.assertEqual(ev.Severity, serverity)
self.assertEqual(ev.SourceName, self.opc.get_server_node().get_display_name().Text)
self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
self.assertEqual(ev.Message.Text, msg)
self.assertEqual(ev.Time, tid)
self.assertEqual(ev.PropertyNum, propertynum)
self.assertEqual(ev.PropertyString, propertystring)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_events_CustomEvent_MyObject(self):
objects = self.srv.get_objects_node()
o = objects.add_object(3, 'MyObject')
etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
evgen = self.srv.get_event_generator(etype, o)
msclt = MySubHandler()
sub = self.opc.create_subscription(100, msclt)
handle = sub.subscribe_events(o, etype)
propertynum = 2
propertystring = "This is my test"
evgen.event.PropertyNum = propertynum
evgen.event.PropertyString = propertystring
tid = datetime.utcnow()
msg = b"this is my msg "
evgen.trigger(tid, msg)
ev = msclt.future.result(10)
self.assertIsNot(ev, None) # we did not receive event
self.assertEqual(ev.EventType, etype.nodeid)
self.assertEqual(ev.Severity, 1)
self.assertEqual(ev.SourceName, b'MyObject')
self.assertEqual(ev.SourceNode, o.nodeid)
self.assertEqual(ev.Message.Text, msg)
self.assertEqual(ev.Time, tid)
self.assertEqual(ev.PropertyNum, propertynum)
self.assertEqual(ev.PropertyString, propertystring)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_non_existing_path(self): def test_non_existing_path(self):
root = self.opc.get_root_node() root = self.opc.get_root_node()
with self.assertRaises(ua.UaStatusCodeError): with self.assertRaises(ua.UaStatusCodeError):
...@@ -685,277 +429,6 @@ class CommonTests(object): ...@@ -685,277 +429,6 @@ class CommonTests(object):
val = v.get_value() val = v.get_value()
self.assertEqual([1], val) self.assertEqual([1], val)
def test_subscription_failure(self):
msclt = MySubHandler()
o = self.opc.get_objects_node()
sub = self.opc.create_subscription(100, msclt)
with self.assertRaises(ua.UaStatusCodeError):
handle1 = sub.subscribe_data_change(o) # we can only subscribe to variables so this should fail
sub.delete()
def test_subscription_overload(self):
nb = 10
msclt = MySubHandler()
o = self.opc.get_objects_node()
sub = self.opc.create_subscription(1, msclt)
variables = []
subs = []
for i in range(nb):
v = o.add_variable(3, 'SubscriptionVariableOverload' + str(i), 99)
variables.append(v)
for i in range(nb):
sub.subscribe_data_change(variables)
for i in range(nb):
for j in range(nb):
variables[i].set_value(j)
s = self.opc.create_subscription(1, msclt)
s.subscribe_data_change(variables)
subs.append(s)
sub.subscribe_data_change(variables[i])
for i in range(nb):
for j in range(nb):
variables[i].set_value(j)
sub.delete()
for s in subs:
s.delete()
def test_subscription_count(self):
msclt = MySubHandlerCounter()
sub = self.opc.create_subscription(1, msclt)
o = self.opc.get_objects_node()
var = o.add_variable(3, 'SubVarCounter', 0.1)
sub.subscribe_data_change(var)
nb = 12
for i in range(nb):
val = var.get_value()
var.set_value(val +1)
time.sleep(0.2) # let last event arrive
self.assertEqual(msclt.datachange_count, nb + 1)
sub.delete()
def test_subscription_count_list(self):
msclt = MySubHandlerCounter()
sub = self.opc.create_subscription(1, msclt)
o = self.opc.get_objects_node()
var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
sub.subscribe_data_change(var)
nb = 12
for i in range(nb):
val = var.get_value()
val.append(i)
var.set_value(val)
time.sleep(0.2) # let last event arrive
self.assertEqual(msclt.datachange_count, nb + 1)
sub.delete()
def test_subscription_count_no_change(self):
msclt = MySubHandlerCounter()
sub = self.opc.create_subscription(1, msclt)
o = self.opc.get_objects_node()
var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
sub.subscribe_data_change(var)
nb = 12
for i in range(nb):
val = var.get_value()
var.set_value(val)
time.sleep(0.2) # let last event arrive
self.assertEqual(msclt.datachange_count, 1)
sub.delete()
def test_subscription_count_empty(self):
msclt = MySubHandlerCounter()
sub = self.opc.create_subscription(1, msclt)
o = self.opc.get_objects_node()
var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3])
sub.subscribe_data_change(var)
while True:
val = var.get_value()
val.pop()
var.set_value(val, ua.VariantType.Double)
if not val:
break
time.sleep(0.2) # let last event arrive
self.assertEqual(msclt.datachange_count, 4)
sub.delete()
def test_subscription_overload_simple(self):
nb = 10
msclt = MySubHandler()
o = self.opc.get_objects_node()
sub = self.opc.create_subscription(1, msclt)
variables = [o.add_variable(3, 'SubVarOverload' + str(i), i) for i in range(nb)]
for i in range(nb):
sub.subscribe_data_change(variables)
sub.delete()
def test_subscription_data_change_depcrecated(self):
'''
test subscriptions. This is far too complicated for
a unittest but, setting up subscriptions requires a lot
of code, so when we first set it up, it is best
to test as many things as possible
'''
msclt = MySubHandlerDeprecated()
o = self.opc.get_objects_node()
# subscribe to a variable
startv1 = [1, 2, 3]
v1 = o.add_variable(3, 'SubscriptionVariableDeprecatedV1', startv1)
sub = self.opc.create_subscription(100, msclt)
handle1 = sub.subscribe_data_change(v1)
# Now check we get the start value
clthandle, node, val, attr = msclt.future.result()
self.assertEqual(val, startv1)
self.assertEqual(node, v1)
msclt.reset() # reset future object
# modify v1 and check we get value
v1.set_value([5])
clthandle, node, val, attr = msclt.future.result()
self.assertEqual(node, v1)
self.assertEqual(val, [5])
with self.assertRaises(ua.UaStatusCodeError):
sub.unsubscribe(999) # non existing handle
sub.unsubscribe(handle1)
with self.assertRaises(ua.UaStatusCodeError):
sub.unsubscribe(handle1) # second try should fail
sub.delete()
with self.assertRaises(ua.UaStatusCodeError):
sub.unsubscribe(handle1) # sub does not exist anymore
def test_subscription_data_change(self):
'''
test subscriptions. This is far too complicated for
a unittest but, setting up subscriptions requires a lot
of code, so when we first set it up, it is best
to test as many things as possible
'''
msclt = MySubHandler()
o = self.opc.get_objects_node()
# subscribe to a variable
startv1 = [1, 2, 3]
v1 = o.add_variable(3, 'SubscriptionVariableV1', startv1)
sub = self.opc.create_subscription(100, msclt)
handle1 = sub.subscribe_data_change(v1)
# Now check we get the start value
node, val, data = msclt.future.result()
self.assertEqual(val, startv1)
self.assertEqual(node, v1)
msclt.reset() # reset future object
# modify v1 and check we get value
v1.set_value([5])
node, val, data = msclt.future.result()
self.assertEqual(node, v1)
self.assertEqual(val, [5])
with self.assertRaises(ua.UaStatusCodeError):
sub.unsubscribe(999) # non existing handle
sub.unsubscribe(handle1)
with self.assertRaises(ua.UaStatusCodeError):
sub.unsubscribe(handle1) # second try should fail
sub.delete()
with self.assertRaises(ua.UaStatusCodeError):
sub.unsubscribe(handle1) # sub does not exist anymore
def test_subscription_data_change_bool(self):
'''
test subscriptions. This is far too complicated for
a unittest but, setting up subscriptions requires a lot
of code, so when we first set it up, it is best
to test as many things as possible
'''
msclt = MySubHandler()
o = self.opc.get_objects_node()
# subscribe to a variable
startv1 = True
v1 = o.add_variable(3, 'SubscriptionVariableBool', startv1)
sub = self.opc.create_subscription(100, msclt)
handle1 = sub.subscribe_data_change(v1)
# Now check we get the start value
node, val, data = msclt.future.result()
self.assertEqual(val, startv1)
self.assertEqual(node, v1)
msclt.reset() # reset future object
# modify v1 and check we get value
v1.set_value(False)
node, val, data = msclt.future.result()
self.assertEqual(node, v1)
self.assertEqual(val, False)
sub.delete() # should delete our monitoreditem too
def test_subscription_data_change_many(self):
'''
test subscriptions. This is far too complicated for
a unittest but, setting up subscriptions requires a lot
of code, so when we first set it up, it is best
to test as many things as possible
'''
msclt = MySubHandler2()
o = self.opc.get_objects_node()
startv1 = True
v1 = o.add_variable(3, 'SubscriptionVariableMany1', startv1)
startv2 = [1.22, 1.65]
v2 = o.add_variable(3, 'SubscriptionVariableMany2', startv2)
sub = self.opc.create_subscription(100, msclt)
handle1, handle2 = sub.subscribe_data_change([v1, v2])
# Now check we get the start values
nodes = [v1, v2]
count = 0
while not len(msclt.results) > 1:
count += 1
time.sleep(0.1)
if count > 100:
self.fail("Did not get result from subscription")
for node, val in msclt.results:
self.assertIn(node, nodes)
nodes.remove(node)
if node == v1:
self.assertEqual(startv1, val)
elif node == v2:
self.assertEqual(startv2, val)
else:
self.fail("Error node {} is neither {} nor {}".format(node, v1, v2))
sub.delete()
def test_subscribe_server_time(self):
msclt = MySubHandler()
server_time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
sub = self.opc.create_subscription(200, msclt)
handle = sub.subscribe_data_change(server_time_node)
node, val, data = msclt.future.result()
self.assertEqual(node, server_time_node)
delta = datetime.utcnow() - val
self.assertTrue(delta < timedelta(seconds=2))
sub.unsubscribe(handle)
sub.delete()
def test_use_namespace(self): def test_use_namespace(self):
idx = self.opc.get_namespace_index("urn:freeopcua:python:server") idx = self.opc.get_namespace_index("urn:freeopcua:python:server")
self.assertEqual(idx, 1) self.assertEqual(idx, 1)
......
import unittest import unittest
from tests_common import CommonTests, add_server_methods, MySubHandler
import os import os
import shelve import shelve
import time import time
from tests_common import CommonTests, add_server_methods
from tests_subscriptions import SubscriptionTests
from datetime import timedelta, datetime from datetime import timedelta, datetime
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
...@@ -17,7 +19,7 @@ port_num = 485140 ...@@ -17,7 +19,7 @@ port_num = 485140
port_discovery = 48550 port_discovery = 48550
class TestServer(unittest.TestCase, CommonTests): class TestServer(unittest.TestCase, CommonTests, SubscriptionTests):
''' '''
Run common tests on server side Run common tests on server side
...@@ -327,7 +329,7 @@ class TestServer(unittest.TestCase, CommonTests): ...@@ -327,7 +329,7 @@ class TestServer(unittest.TestCase, CommonTests):
def check_eventgenerator_SourceServer(test, evgen): def check_eventgenerator_SourceServer(test, evgen):
server = test.opc.get_server_node() server = test.opc.get_server_node()
test.assertEqual(evgen.event.SourceName, server.get_display_name().Text) test.assertEqual(evgen.event.SourceName, server.get_browse_name().Name)
test.assertEqual(evgen.event.SourceNode, ua.NodeId(ua.ObjectIds.Server)) test.assertEqual(evgen.event.SourceNode, ua.NodeId(ua.ObjectIds.Server))
test.assertEqual(server.get_attribute(ua.AttributeIds.EventNotifier).Value, ua.Variant(1, ua.VariantType.Byte)) test.assertEqual(server.get_attribute(ua.AttributeIds.EventNotifier).Value, ua.Variant(1, ua.VariantType.Byte))
refs = server.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward, ua.NodeClass.ObjectType, False) refs = server.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward, ua.NodeClass.ObjectType, False)
...@@ -335,7 +337,7 @@ def check_eventgenerator_SourceServer(test, evgen): ...@@ -335,7 +337,7 @@ def check_eventgenerator_SourceServer(test, evgen):
def check_event_generator_object(test, evgen, obj): def check_event_generator_object(test, evgen, obj):
test.assertEqual(evgen.event.SourceName, obj.get_display_name().Text) test.assertEqual(evgen.event.SourceName, obj.get_browse_name().Name)
test.assertEqual(evgen.event.SourceNode, obj.nodeid) test.assertEqual(evgen.event.SourceNode, obj.nodeid)
test.assertEqual(obj.get_attribute(ua.AttributeIds.EventNotifier).Value, ua.Variant(1, ua.VariantType.Byte)) test.assertEqual(obj.get_attribute(ua.AttributeIds.EventNotifier).Value, ua.Variant(1, ua.VariantType.Byte))
refs = obj.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward, ua.NodeClass.ObjectType, False) refs = obj.get_referenced_nodes(ua.ObjectIds.GeneratesEvent, ua.BrowseDirection.Forward, ua.NodeClass.ObjectType, False)
......
from concurrent.futures import Future, TimeoutError
import time
from datetime import datetime, timedelta
import opcua
from opcua import ua
class SubHandler():
'''
Dummy subscription client
'''
def datachange_notification(self, node, val, data):
pass
def event_notification(self, event):
pass
class MySubHandler():
'''
More advanced subscription client using Future, so we can wait for events in tests
'''
def __init__(self):
self.future = Future()
def reset(self):
self.future = Future()
def datachange_notification(self, node, val, data):
self.future.set_result((node, val, data))
def event_notification(self, event):
self.future.set_result(event)
class MySubHandler2():
def __init__(self):
self.results = []
def datachange_notification(self, node, val, data):
self.results.append((node, val))
def event_notification(self, event):
self.results.append(event)
class MySubHandlerCounter():
def __init__(self):
self.datachange_count = 0
self.event_count = 0
def datachange_notification(self, node, val, data):
self.datachange_count += 1
def event_notification(self, event):
self.event_count += 1
class SubscriptionTests(object):
def test_subscription_failure(self):
myhandler = MySubHandler()
o = self.opc.get_objects_node()
sub = self.opc.create_subscription(100, myhandler)
with self.assertRaises(ua.UaStatusCodeError):
handle1 = sub.subscribe_data_change(o) # we can only subscribe to variables so this should fail
sub.delete()
def test_subscription_overload(self):
nb = 10
myhandler = MySubHandler()
o = self.opc.get_objects_node()
sub = self.opc.create_subscription(1, myhandler)
variables = []
subs = []
for i in range(nb):
v = o.add_variable(3, 'SubscriptionVariableOverload' + str(i), 99)
variables.append(v)
for i in range(nb):
sub.subscribe_data_change(variables)
for i in range(nb):
for j in range(nb):
variables[i].set_value(j)
s = self.opc.create_subscription(1, myhandler)
s.subscribe_data_change(variables)
subs.append(s)
sub.subscribe_data_change(variables[i])
for i in range(nb):
for j in range(nb):
variables[i].set_value(j)
sub.delete()
for s in subs:
s.delete()
def test_subscription_count(self):
myhandler = MySubHandlerCounter()
sub = self.opc.create_subscription(1, myhandler)
o = self.opc.get_objects_node()
var = o.add_variable(3, 'SubVarCounter', 0.1)
sub.subscribe_data_change(var)
nb = 12
for i in range(nb):
val = var.get_value()
var.set_value(val +1)
time.sleep(0.2) # let last event arrive
self.assertEqual(myhandler.datachange_count, nb + 1)
sub.delete()
def test_subscription_count_list(self):
myhandler = MySubHandlerCounter()
sub = self.opc.create_subscription(1, myhandler)
o = self.opc.get_objects_node()
var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
sub.subscribe_data_change(var)
nb = 12
for i in range(nb):
val = var.get_value()
val.append(i)
var.set_value(val)
time.sleep(0.2) # let last event arrive
self.assertEqual(myhandler.datachange_count, nb + 1)
sub.delete()
def test_subscription_count_no_change(self):
myhandler = MySubHandlerCounter()
sub = self.opc.create_subscription(1, myhandler)
o = self.opc.get_objects_node()
var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2])
sub.subscribe_data_change(var)
nb = 12
for i in range(nb):
val = var.get_value()
var.set_value(val)
time.sleep(0.2) # let last event arrive
self.assertEqual(myhandler.datachange_count, 1)
sub.delete()
def test_subscription_count_empty(self):
myhandler = MySubHandlerCounter()
sub = self.opc.create_subscription(1, myhandler)
o = self.opc.get_objects_node()
var = o.add_variable(3, 'SubVarCounter', [0.1, 0.2, 0.3])
sub.subscribe_data_change(var)
while True:
val = var.get_value()
val.pop()
var.set_value(val, ua.VariantType.Double)
if not val:
break
time.sleep(0.2) # let last event arrive
self.assertEqual(myhandler.datachange_count, 4)
sub.delete()
def test_subscription_overload_simple(self):
nb = 10
myhandler = MySubHandler()
o = self.opc.get_objects_node()
sub = self.opc.create_subscription(1, myhandler)
variables = [o.add_variable(3, 'SubVarOverload' + str(i), i) for i in range(nb)]
for i in range(nb):
sub.subscribe_data_change(variables)
sub.delete()
def test_subscription_data_change(self):
'''
test subscriptions. This is far too complicated for
a unittest but, setting up subscriptions requires a lot
of code, so when we first set it up, it is best
to test as many things as possible
'''
myhandler = MySubHandler()
o = self.opc.get_objects_node()
# subscribe to a variable
startv1 = [1, 2, 3]
v1 = o.add_variable(3, 'SubscriptionVariableV1', startv1)
sub = self.opc.create_subscription(100, myhandler)
handle1 = sub.subscribe_data_change(v1)
# Now check we get the start value
node, val, data = myhandler.future.result()
self.assertEqual(val, startv1)
self.assertEqual(node, v1)
myhandler.reset() # reset future object
# modify v1 and check we get value
v1.set_value([5])
node, val, data = myhandler.future.result()
self.assertEqual(node, v1)
self.assertEqual(val, [5])
with self.assertRaises(ua.UaStatusCodeError):
sub.unsubscribe(999) # non existing handle
sub.unsubscribe(handle1)
with self.assertRaises(ua.UaStatusCodeError):
sub.unsubscribe(handle1) # second try should fail
sub.delete()
with self.assertRaises(ua.UaStatusCodeError):
sub.unsubscribe(handle1) # sub does not exist anymore
def test_subscription_data_change_bool(self):
'''
test subscriptions. This is far too complicated for
a unittest but, setting up subscriptions requires a lot
of code, so when we first set it up, it is best
to test as many things as possible
'''
myhandler = MySubHandler()
o = self.opc.get_objects_node()
# subscribe to a variable
startv1 = True
v1 = o.add_variable(3, 'SubscriptionVariableBool', startv1)
sub = self.opc.create_subscription(100, myhandler)
handle1 = sub.subscribe_data_change(v1)
# Now check we get the start value
node, val, data = myhandler.future.result()
self.assertEqual(val, startv1)
self.assertEqual(node, v1)
myhandler.reset() # reset future object
# modify v1 and check we get value
v1.set_value(False)
node, val, data = myhandler.future.result()
self.assertEqual(node, v1)
self.assertEqual(val, False)
sub.delete() # should delete our monitoreditem too
def test_subscription_data_change_many(self):
'''
test subscriptions. This is far too complicated for
a unittest but, setting up subscriptions requires a lot
of code, so when we first set it up, it is best
to test as many things as possible
'''
myhandler = MySubHandler2()
o = self.opc.get_objects_node()
startv1 = True
v1 = o.add_variable(3, 'SubscriptionVariableMany1', startv1)
startv2 = [1.22, 1.65]
v2 = o.add_variable(3, 'SubscriptionVariableMany2', startv2)
sub = self.opc.create_subscription(100, myhandler)
handle1, handle2 = sub.subscribe_data_change([v1, v2])
# Now check we get the start values
nodes = [v1, v2]
count = 0
while not len(myhandler.results) > 1:
count += 1
time.sleep(0.1)
if count > 100:
self.fail("Did not get result from subscription")
for node, val in myhandler.results:
self.assertIn(node, nodes)
nodes.remove(node)
if node == v1:
self.assertEqual(startv1, val)
elif node == v2:
self.assertEqual(startv2, val)
else:
self.fail("Error node {} is neither {} nor {}".format(node, v1, v2))
sub.delete()
def test_subscribe_server_time(self):
myhandler = MySubHandler()
server_time_node = self.opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
sub = self.opc.create_subscription(200, myhandler)
handle = sub.subscribe_data_change(server_time_node)
node, val, data = myhandler.future.result()
self.assertEqual(node, server_time_node)
delta = datetime.utcnow() - val
self.assertTrue(delta < timedelta(seconds=2))
sub.unsubscribe(handle)
sub.delete()
def test_create_delete_subscription(self):
o = self.opc.get_objects_node()
v = o.add_variable(3, 'SubscriptionVariable', [1, 2, 3])
sub = self.opc.create_subscription(100, MySubHandler())
handle = sub.subscribe_data_change(v)
time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_subscribe_events(self):
sub = self.opc.create_subscription(100, MySubHandler())
handle = sub.subscribe_events()
time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_subscribe_events_to_wrong_node(self):
sub = self.opc.create_subscription(100, MySubHandler())
with self.assertRaises(ua.UaStatusCodeError):
handle = sub.subscribe_events(self.opc.get_node("i=85"))
o = self.opc.get_objects_node()
v = o.add_variable(3, 'VariableNoEventNofierAttribute', 4)
with self.assertRaises(ua.UaStatusCodeError):
handle = sub.subscribe_events(v)
sub.delete()
def test_get_event_from_type_node_BaseEvent(self):
etype = self.opc.get_node(ua.ObjectIds.BaseEventType)
properties = opcua.common.subscription.get_event_properties_from_type_node(etype)
for child in etype.get_properties():
self.assertTrue(child in properties)
def test_get_event_from_type_node_CustomEvent(self):
etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.AuditEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
properties = opcua.common.subscription.get_event_properties_from_type_node(etype)
for child in self.opc.get_node(ua.ObjectIds.BaseEventType).get_properties():
self.assertTrue(child in properties)
for child in self.opc.get_node(ua.ObjectIds.AuditEventType).get_properties():
self.assertTrue(child in properties)
for child in self.opc.get_node(etype.nodeid).get_properties():
self.assertTrue(child in properties)
self.assertTrue(etype.get_child("2:PropertyNum") in properties)
self.assertTrue(etype.get_child("2:PropertyString") in properties)
def test_events_default(self):
evgen = self.srv.get_event_generator()
myhandler = MySubHandler()
sub = self.opc.create_subscription(100, myhandler)
handle = sub.subscribe_events()
tid = datetime.utcnow()
msg = b"this is my msg "
evgen.trigger(tid, msg)
ev = myhandler.future.result()
self.assertIsNot(ev, None) # we did not receive event
self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
self.assertEqual(ev.Severity, 1)
self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
self.assertEqual(ev.Message.Text, msg)
self.assertEqual(ev.Time, tid)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_events_MyObject(self):
objects = self.srv.get_objects_node()
o = objects.add_object(3, 'MyObject')
evgen = self.srv.get_event_generator(source=o)
myhandler = MySubHandler()
sub = self.opc.create_subscription(100, myhandler)
handle = sub.subscribe_events(o)
tid = datetime.utcnow()
msg = b"this is my msg "
evgen.trigger(tid, msg)
ev = myhandler.future.result(10)
self.assertIsNot(ev, None) # we did not receive event
self.assertEqual(ev.EventType, ua.NodeId(ua.ObjectIds.BaseEventType))
self.assertEqual(ev.Severity, 1)
self.assertEqual(ev.SourceName, 'MyObject')
self.assertEqual(ev.SourceNode, o.nodeid)
self.assertEqual(ev.Message.Text, msg)
self.assertEqual(ev.Time, tid)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_events_wrong_source(self):
objects = self.srv.get_objects_node()
o = objects.add_object(3, 'MyObject')
evgen = self.srv.get_event_generator(source=o)
myhandler = MySubHandler()
sub = self.opc.create_subscription(100, myhandler)
handle = sub.subscribe_events()
tid = datetime.utcnow()
msg = b"this is my msg "
evgen.trigger(tid, msg)
with self.assertRaises(TimeoutError): # we should not receive event
ev = myhandler.future.result(2)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_events_CustomEvent(self):
etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
evgen = self.srv.get_event_generator(etype)
myhandler = MySubHandler()
sub = self.opc.create_subscription(100, myhandler)
handle = sub.subscribe_events(evtype=etype)
propertynum = 2
propertystring = "This is my test"
evgen.event.PropertyNum = propertynum
evgen.event.PropertyString = propertystring
serverity = 500
evgen.event.Severity = serverity
tid = datetime.utcnow()
msg = b"this is my msg "
evgen.trigger(tid, msg)
ev = myhandler.future.result(10)
self.assertIsNot(ev, None) # we did not receive event
self.assertEqual(ev.EventType, etype.nodeid)
self.assertEqual(ev.Severity, serverity)
self.assertEqual(ev.SourceName, self.opc.get_server_node().get_browse_name().Name)
self.assertEqual(ev.SourceNode, self.opc.get_server_node().nodeid)
self.assertEqual(ev.Message.Text, msg)
self.assertEqual(ev.Time, tid)
self.assertEqual(ev.PropertyNum, propertynum)
self.assertEqual(ev.PropertyString, propertystring)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_events_CustomEvent_MyObject(self):
objects = self.srv.get_objects_node()
o = objects.add_object(3, 'MyObject')
etype = self.srv.create_custom_event_type(2, 'MyEvent', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
evgen = self.srv.get_event_generator(etype, o)
myhandler = MySubHandler()
sub = self.opc.create_subscription(100, myhandler)
handle = sub.subscribe_events(o, etype)
propertynum = 2
propertystring = "This is my test"
evgen.event.PropertyNum = propertynum
evgen.event.PropertyString = propertystring
tid = datetime.utcnow()
msg = b"this is my msg "
evgen.trigger(tid, msg)
ev = myhandler.future.result(10)
self.assertIsNot(ev, None) # we did not receive event
self.assertEqual(ev.EventType, etype.nodeid)
self.assertEqual(ev.Severity, 1)
self.assertEqual(ev.SourceName, 'MyObject')
self.assertEqual(ev.SourceNode, o.nodeid)
self.assertEqual(ev.Message.Text, msg)
self.assertEqual(ev.Time, tid)
self.assertEqual(ev.PropertyNum, propertynum)
self.assertEqual(ev.PropertyString, propertystring)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_several_different_events(self):
objects = self.srv.get_objects_node()
o = objects.add_object(3, 'MyObject')
etype1 = self.srv.create_custom_event_type(2, 'MyEvent1', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
evgen1 = self.srv.get_event_generator(etype1, o)
etype2 = self.srv.create_custom_event_type(2, 'MyEvent2', ua.ObjectIds.BaseEventType, [('PropertyNum', ua.VariantType.Float), ('PropertyString', ua.VariantType.String)])
evgen2 = self.srv.get_event_generator(etype2, o)
myhandler = MySubHandler2()
sub = self.opc.create_subscription(100, myhandler)
handle = sub.subscribe_events(o, etype1)
propertynum1 = 1
propertystring1 = "This is my test 1"
evgen1.event.PropertyNum = propertynum1
evgen1.event.PropertyString = propertystring1
propertynum2 = 2
propertystring2 = "This is my test 2"
evgen2.event.PropertyNum = propertynum2
evgen2.event.PropertyString = propertystring2
for i in range(3):
evgen1.trigger()
evgen2.trigger()
time.sleep(1)
self.assertEqual(len(myhandler.results), 3)
ev = myhandler.results[-1]
self.assertEqual(ev.EventType, etype1.nodeid)
handle = sub.subscribe_events(o, etype2)
for i in range(4):
evgen1.trigger()
evgen2.trigger()
time.sleep(1)
ev1s = [ev for ev in myhandler.results if ev.EventType == etype1.nodeid]
ev2s = [ev for ev in myhandler.results if ev.EventType == etype2.nodeid]
self.assertEqual(len(myhandler.results), 11)
self.assertEqual(len(ev2s), 4)
self.assertEqual(len(ev1s), 7)
#self.assertEqual(ev.SourceName, 'MyObject')
#self.assertEqual(ev.SourceNode, o.nodeid)
#self.assertEqual(ev.Message.Text, msg)
#self.assertEqual(ev.Time, tid)
#self.assertEqual(ev.PropertyNum, propertynum)
#self.assertEqual(ev.PropertyString, propertystring)
# time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
...@@ -9,6 +9,7 @@ from opcua import ua ...@@ -9,6 +9,7 @@ from opcua import ua
from opcua.ua import extensionobject_from_binary from opcua.ua import extensionobject_from_binary
from opcua.ua import extensionobject_to_binary from opcua.ua import extensionobject_to_binary
from opcua.ua.uatypes import flatten, get_shape, reshape from opcua.ua.uatypes import flatten, get_shape, reshape
from opcua.server.internal_subscription import WhereClauseEvaluator
...@@ -358,7 +359,31 @@ class TestUnit(unittest.TestCase): ...@@ -358,7 +359,31 @@ class TestUnit(unittest.TestCase):
n = ua.NodeId(0, 3) n = ua.NodeId(0, 3)
self.assertFalse(n.is_null()) self.assertFalse(n.is_null())
self.assertTrue(n.has_null_identifier()) self.assertTrue(n.has_null_identifier())
def test_where_clause(self):
cf = ua.ContentFilter()
el = ua.ContentFilterElement()
op = ua.SimpleAttributeOperand()
op.BrowsePath.append(ua.QualifiedName("property", 2))
el.FilterOperands.append(op)
for i in range(10):
op = ua.LiteralOperand()
op.Value = ua.Variant(i)
el.FilterOperands.append(op)
el.FilterOperator = ua.FilterOperator.InList
cf.Elements.append(el)
wce = WhereClauseEvaluator(logging.getLogger(__name__), None, cf)
ev = ua.BaseEvent()
ev._freeze = False
ev.property = 3
self.assertTrue(wce.eval(ev))
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.WARN) logging.basicConfig(level=logging.WARN)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment