Commit 1981da58 authored by Nick James's avatar Nick James Committed by GitHub

add basic test for CLI tools (#212)

* add basic test for CLI tools

* tests pass, client still broken

* add test for uaserver invocation

* make uaserver test pass

* try installing asyncua during travis setup to make CLI tools available
parent e316e7cc
......@@ -13,5 +13,6 @@ install:
- pip install pytest-asyncio
- pip install cryptography
- pip install dataclasses
- pip install --editable .
# command to run tests
script: pytest -v -s
......@@ -5,6 +5,7 @@ import argparse
from datetime import datetime, timedelta
import math
import time
import concurrent.futures._base
try:
from IPython import embed
......@@ -15,7 +16,7 @@ except ImportError:
code.interact(local=dict(globals(), **locals()))
from asyncua import ua
from asyncua import Client
from asyncua import Client, Server
from asyncua import Node, uamethod
from asyncua import sync
from asyncua.ua.uaerrors import UaStatusCodeError
......@@ -278,8 +279,8 @@ async def _uawrite():
client = Client(args.url, timeout=args.timeout)
await _configure_client_with_args(client, args)
await client.connect()
try:
await client.connect()
node = await get_node(client, args)
val = _val_to_variant(args.value, args)
await node.write_attribute(args.attribute, ua.DataValue(val))
......@@ -312,18 +313,19 @@ async def _uals():
client = Client(args.url, timeout=args.timeout)
await _configure_client_with_args(client, args)
await client.connect()
try:
node = await get_node(client, args)
print(f"Browsing node {node} at {args.url}\n")
if args.long_format == 0:
await _lsprint_0(node, args.depth - 1)
elif args.long_format == 1:
await _lsprint_1(node, args.depth - 1)
else:
_lsprint_long(node, args.depth - 1)
finally:
await client.disconnect()
async with client:
node = await get_node(client, args)
print(f"Browsing node {node} at {args.url}\n")
if args.long_format == 0:
await _lsprint_0(node, args.depth - 1)
elif args.long_format == 1:
await _lsprint_1(node, args.depth - 1)
else:
_lsprint_long(node, args.depth - 1)
except (OSError, concurrent.futures._base.TimeoutError) as e:
print(e)
sys.exit(1)
async def _lsprint_0(node, depth, indent=""):
......@@ -477,6 +479,9 @@ def endpoint_to_strings(ep):
def uaclient():
run(_uaclient())
async def _uaclient():
parser = argparse.ArgumentParser(description="Connect to server and start python shell. root and objects nodes are available. Node specificed in command line is available as mynode variable")
add_common_args(parser)
parser.add_argument("-c",
......@@ -487,25 +492,25 @@ def uaclient():
help="set client private key")
args = parse_args(parser)
client = sync.Client(args.url, timeout=args.timeout)
_configure_client_with_args(client, args)
client = Client(args.url, timeout=args.timeout)
await _configure_client_with_args(client, args)
if args.certificate:
client.load_client_certificate(args.certificate)
if args.private_key:
client.load_private_key(args.private_key)
sync.start_thread_loop()
client.connect()
try:
mynode = get_node(client, args)
embed()
client.disconnect()
finally:
sync.stop_thread_loop()
async with client:
mynode = await get_node(client, args)
# embed()
except (OSError, concurrent.futures._base.TimeoutError) as e:
print(e)
sys.exit(1)
sys.exit(0)
def uaserver():
async def _uaserver():
parser = argparse.ArgumentParser(description="Run an example OPC-UA server. By importing xml definition and using uawrite command line, it is even possible to expose real data using this server")
# we setup a server, this is a bit different from other tool so we do not reuse common arguments
parser.add_argument("-u",
......@@ -542,8 +547,7 @@ def uaserver():
args = parser.parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s", level=getattr(logging, args.loglevel))
sync.start_thread_loop()
server = sync.Server()
server = Server()
server.set_endpoint(args.url)
if args.certificate:
server.load_certificate(args.certificate)
......@@ -570,24 +574,30 @@ def uaserver():
myprop = myobj.add_property(idx, "MyProperty", "I am a property")
mymethod = myobj.add_method(idx, "MyMethod", multiply, [ua.VariantType.Double, ua.VariantType.Int64], [ua.VariantType.Double])
server.start()
try:
if args.shell:
embed()
elif args.populate:
count = 0
while True:
time.sleep(1)
myvar.write_value(math.sin(count / 10))
myarrayvar.write_value([math.sin(count / 10), math.sin(count / 100)])
count += 1
else:
while True:
time.sleep(1)
server.stop()
finally:
sync.stop_thread_loop()
await server.init()
async with server:
if args.shell:
embed()
elif args.populate:
count = 0
while True:
time.sleep(1)
myvar.write_value(math.sin(count / 10))
myarrayvar.write_value([math.sin(count / 10), math.sin(count / 100)])
count += 1
else:
while True:
time.sleep(1)
except OSError as e:
print(e)
sys.exit(1)
except KeyboardInterrupt:
pass
sys.exit(0)
def uaserver():
run(_uaserver())
def uadiscover():
......@@ -613,26 +623,30 @@ async def _uadiscover():
client = Client(args.url, timeout=args.timeout)
if args.network:
try:
if args.network:
print(f"Performing discovery at {args.url}\n")
for i, server in enumerate(await client.connect_and_find_servers_on_network(), start=1):
print(f'Server {i}:')
# for (n, v) in application_to_strings(server):
# print(' {}: {}'.format(n, v))
print('')
print(f"Performing discovery at {args.url}\n")
for i, server in enumerate(await client.connect_and_find_servers_on_network(), start=1):
for i, server in enumerate(await client.connect_and_find_servers(), start=1):
print(f'Server {i}:')
# for (n, v) in application_to_strings(server):
# print(' {}: {}'.format(n, v))
for (n, v) in application_to_strings(server):
print(f' {n}: {v}')
print('')
print(f"Performing discovery at {args.url}\n")
for i, server in enumerate(await client.connect_and_find_servers(), start=1):
print(f'Server {i}:')
for (n, v) in application_to_strings(server):
print(f' {n}: {v}')
print('')
for i, ep in enumerate(await client.connect_and_get_server_endpoints(), start=1):
print(f'Endpoint {i}:')
for (n, v) in endpoint_to_strings(ep):
print(f' {n}: {v}')
print('')
for i, ep in enumerate(await client.connect_and_get_server_endpoints(), start=1):
print(f'Endpoint {i}:')
for (n, v) in endpoint_to_strings(ep):
print(f' {n}: {v}')
print('')
except (OSError, concurrent.futures._base.TimeoutError) as e:
print(e)
sys.exit(1)
sys.exit(0)
......
import asyncio
import sys
sys.path.insert(0, "..")
# sys.path.insert(0, "..")
import logging
from asyncua import Client, Node, ua
......
import pytest
from threading import Thread
import time
from unittest.mock import patch
import sys
import subprocess
from asyncua.tools import uaread, uals, uawrite, uasubscribe, uahistoryread, uaserver, uaclient, uadiscover, uacall, uageneratestructs
@pytest.mark.parametrize("tool", [uaread, uals, uawrite, uasubscribe, uahistoryread, uaclient, uadiscover, uacall, uageneratestructs])
def test_that_tool_can_be_invoked_without_internal_error(tool):
# It's necessary to mock argv, else the tool is invoked with *pytest's* argv
with patch.object(sys, 'argv', [""]):
try:
tool()
except SystemExit:
pass
def test_that_server_can_be_invoked_without_internal_error():
proc = subprocess.Popen(['uaserver'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
time.sleep(2)
proc.send_signal(subprocess.signal.SIGINT)
proc.wait(timeout=2)
assert proc.returncode == 0
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment