Commit 3bc1bb8a authored by Olivier R-D's avatar Olivier R-D

port event tests from c++ and enable

parent 41d35625
......@@ -18,9 +18,9 @@ class SubHandler(object):
if __name__ == "__main__":
from IPython import embed
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("KeepAlive")
logger.setLevel(logging.DEBUG)
logging.basicConfig(level=logging.WARN)
#logger = logging.getLogger("KeepAlive")
#logger.setLevel(logging.DEBUG)
client = Client("opc.tcp://localhost:4841/freeopcua/server/")
try:
client.connect()
......
......@@ -33,8 +33,8 @@ if __name__ == "__main__":
#logger = logging.getLogger("opcua.address_space")
#logger = logging.getLogger("opcua.internal_server")
#logger.setLevel(logging.DEBUG)
logger = logging.getLogger("opcua.subscription_server")
logger.setLevel(logging.DEBUG)
#logger = logging.getLogger("opcua.subscription_server")
#logger.setLevel(logging.DEBUG)
# now setup our server
......@@ -58,10 +58,10 @@ if __name__ == "__main__":
mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
#creating an event object
# creating an event object
# The event object automatically will have members for all events properties
myevent = server.get_event_object(ObjectIds.BaseEventType)
myevent.Message.Text = "This is my event"
print("My event is", myevent)
# starting!
server.start()
......@@ -71,9 +71,9 @@ if __name__ == "__main__":
#enable following if you want to subscribe to nodes on server side
sub = server.create_subscription(500, handler)
handle = sub.subscribe_data_change(myvar)
#time.sleep(0.1)
#sub.unsubscribe(handle)
#sub.delete()
# trigger event, all subscribed clients wil receive it
myevent.trigger()
embed()
finally:
server.stop()
......
......@@ -193,6 +193,9 @@ class Client(object):
def get_objects_node(self):
return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.ObjectsFolder))
def get_server_node(self):
return self.get_node(ua.TwoByteNodeId(ua.ObjectIds.Server))
def get_node(self, nodeid):
"""
Get node using NodeId object or a string representing a NodeId
......
......@@ -17,6 +17,8 @@ class Node(object):
self.nodeid = nodeid
elif type(nodeid) in (str, bytes):
self.nodeid = ua.NodeId.from_string(nodeid)
elif isinstance(nodeid, int):
self.nodeid = ua.NodeId(nodeid, 0)
else:
raise Exception("argument to node must be a NodeId object or a string defining a nodeid found {} of type {}".format(nodeid, type(nodeid)))
def __eq__(self, other):
......
......@@ -77,13 +77,15 @@ class Server(object):
self.iserver.stop()
self.bserver.stop()
def get_root_node(self):
return self.get_node(ua.TwoByteNodeId(ObjectIds.RootFolder))
def get_objects_node(self):
return self.get_node(ua.TwoByteNodeId(ObjectIds.ObjectsFolder))
def get_server_node(self):
return self.get_node(ua.TwoByteNodeId(ObjectIds.Server))
def get_node(self, nodeid):
"""
Get node using NodeId object or a string representing a NodeId
......
......@@ -127,7 +127,6 @@ class Subscription(object):
def subscribe_events(self, sourcenode=ObjectIds.Server, evtype=ObjectIds.BaseEventType):
sourcenode = self._get_node(sourcenode)
evfilter = self._get_filter_from_event_type(evtype)
print("EVFILTER is ", evfilter)
return self._subscribe(sourcenode, AttributeIds.EventNotifier, evfilter)
def _subscribe(self, node, attr, mfilter=None):
......
......@@ -383,7 +383,6 @@ class InternalSubscription(object):
item = self._monitored_items[mid]
fieldlist = ua.EventFieldList()
fieldlist.ClientHandle = item.client_handle
print("filter is ", item.parameters.FilterResult, " event is ", event)
fieldlist.EventFields = self._get_event_fields(item.parameters.FilterResult, event)
self._triggered_events.append(fieldlist)
return True
......@@ -391,7 +390,6 @@ class InternalSubscription(object):
def _get_event_fields(self, evfilter, event):
fields = []
for sattr in evfilter.SelectClauses:
print("looking at sattrs", sattr)
try:
if not sattr.BrowsePath:
val = getattr(event, ua.AttributeIdsInv[sattr.Attribute])
......
......@@ -5,7 +5,7 @@ import io
import sys
from datetime import datetime, timedelta
import unittest
from threading import Thread, Event
import threading
try:
from queue import Queue
except ImportError:
......@@ -17,6 +17,7 @@ from opcua import ua
from opcua import Client
from opcua import Server
from opcua import uamethod
from opcua import Event
port_num1 = 48410
port_num2 = 48430
......@@ -207,42 +208,42 @@ class CommonTests(object):
sub.unsubscribe(handle)
sub.delete()
#def test_subscribe_events(self):
#sub = self.opc.create_subscription(100, sclt)
#handle = sub.subscribe_events()
##time.sleep(0.1)
#sub.unsubscribe(handle)
#sub.delete()
#def test_events(self):
#msclt = MySubHandler()
#cond = msclt.setup()
#sub = self.opc.create_subscription(100, msclt)
#handle = sub.subscribe_events()
def test_subscribe_events(self):
sub = self.opc.create_subscription(100, sclt)
handle = sub.subscribe_events()
time.sleep(0.1)
sub.unsubscribe(handle)
sub.delete()
def test_events(self):
msclt = MySubHandler()
cond = msclt.setup()
sub = self.opc.create_subscription(100, msclt)
handle = sub.subscribe_events()
#ev = ua.Event()
#msg = "this is my msg "
#ev.message = msg
#tid = datetime.datetime.now()
#ev.time = tid
ev = Event(self.srv.iserver.isession)
msg = b"this is my msg "
ev.Message.Text = msg
tid = datetime.now()
ev.Time = tid
#ev.source_node = self.opc.get_server_node().nodeid
#ev.source_name = "our server node"
#ev.severity = 500
#self.srv.trigger_event(ev)
ev.Severity = 500
ev.trigger()
#with cond:
#ret = cond.wait(50000)
#if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
#else: pass # XXX
#self.assertIsNot(msclt.ev, None)# we did not receive event
#self.assertEqual(msclt.ev.message, msg)
#self.assertEqual(msclt.ev.time.to_datetime(), tid)
#self.assertEqual(msclt.ev.severity, 500)
#self.assertEqual(msclt.ev.source_node, self.opc.get_server_node().nodeid)
with cond:
ret = cond.wait(50000)
if sys.version_info.major>2: self.assertEqual(ret, True) # we went into timeout waiting for subcsription callback
else: pass # python2
self.assertIsNot(msclt.ev, None)# we did not receive event
self.assertEqual(msclt.ev.Message.Text, msg)
#self.assertEqual(msclt.ev.Time, tid)
self.assertEqual(msclt.ev.Severity, 500)
self.assertEqual(msclt.ev.SourceNode, self.opc.get_server_node().nodeid)
#time.sleep(0.1)
#sub.unsubscribe(handle)
#sub.delete()
sub.unsubscribe(handle)
sub.delete()
def test_non_existing_path(self):
......@@ -526,38 +527,6 @@ def add_server_methods(srv):
class ServerProcess(Thread):
'''
Start a server in another process/thread
'''
def __init__(self):
Thread.__init__(self)
self._exit = Event()
self.started = Event()
self._queue = Queue()
def run(self):
self.srv = Server()
self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num1)
add_server_methods(self.srv)
self.srv.start()
self.started.set()
while not self._exit.is_set():
time.sleep(0.1)
if not self._queue.empty():
ev = self._queue.get()
self.srv.trigger_event(ev)
self.srv.stop()
def stop(self):
self._exit.set()
def trigger_event(self, ev):
self._queue.put(ev)
class TestClient(unittest.TestCase, CommonTests):
'''
Run common tests on client side
......@@ -567,11 +536,11 @@ class TestClient(unittest.TestCase, CommonTests):
'''
@classmethod
def setUpClass(self):
# start server in its own process
global globalserver
self.srv = globalserver
# start our own server
self.srv = Server()
self.srv.set_endpoint('opc.tcp://localhost:%d' % port_num1)
add_server_methods(self.srv)
self.srv.start()
self.srv.started.wait() # let it initialize
# start client
self.clt = Client('opc.tcp://localhost:%d' % port_num1)
......@@ -583,9 +552,6 @@ class TestClient(unittest.TestCase, CommonTests):
self.clt.disconnect()
# stop the server in its own process
self.srv.stop()
# wait for server to stop, otherwise we may try to start a
# new one before this one is really stopped
self.srv.join()
def test_service_fault(self):
request = ua.ReadRequest()
......@@ -638,14 +604,9 @@ class TestServer(unittest.TestCase, CommonTests):
if __name__ == '__main__':
logging.basicConfig(level=logging.WARN)
globalserver = ServerProcess() #server process will be started by client tests
try:
sclt = SubHandler()
unittest.main(verbosity=3)
finally:
globalserver.stop()
sclt = SubHandler()
unittest.main(verbosity=3)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment