Commit b4543b0b authored by olivier R-D's avatar olivier R-D

more server tests

parent 7e42b3bf
......@@ -4,12 +4,35 @@ Test an OPC-UA server with freeopcua python client
import logging
import sys
import unittest
from concurrent.futures import Future
from datetime import datetime
from datetime import timedelta
import time
from opcua import ua
from opcua import Client
class MySubHandler(object):
class MySubHandler():
'''
More advanced subscription client using Future, so we can wait for events in tests
'''
def __init__(self):
self.future = Future()
def reset(self):
self.future = Future()
def datachange_notification(self, node, val, data):
self.future.set_result((node, val, data))
def event_notification(self, event):
self.future.set_result(event)
class MySubHandler2(object):
def __init__(self):
self.results = []
......@@ -38,6 +61,16 @@ class Tests(unittest.TestCase):
c.connect()
c.disconnect()
def test_find_servers(self):
c = Client(URL)
res = c.connect_and_find_servers()
self.assertTrue(len(res) > 0)
def test_find_endpoints(self):
c = Client(URL)
res = c.connect_and_get_server_endpoints()
self.assertTrue(len(res) > 0)
@connect
def test_get_root(self, client):
root = client.get_root_node()
......@@ -67,11 +100,34 @@ class Tests(unittest.TestCase):
node = root.get_child(["0:Objects", "0:Server" , "0:ServerArray"])
self.assertEqual(node.get_browse_name(), ua.QualifiedName("ServerArray", 0))
@connect
def test_subscribe_server_time(self, client):
time.sleep(2)
msclt = MySubHandler()
server_time_node = client.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
sub = client.create_subscription(200, msclt)
time.sleep(2)
handle = sub.subscribe_data_change(server_time_node)
node, val, data = msclt.future.result()
self.assertEqual(node, server_time_node)
delta = datetime.utcnow() - val
self.assertTrue(delta < timedelta(seconds=2))
sub.unsubscribe(handle)
sub.delete()
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
# FIXME add better arguments parsing with possibility to specify username and password
if len(sys.argv) < 2:
print("This script is meant to test compatibilty to a server with freeopcua python client library")
print("Usage: test_server.py url")
sys.exit(1)
else:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment