Commit 4221be7d authored by Christian Bergmiller's avatar Christian Bergmiller

refactored tests and async fixes

parent 006689dc
......@@ -112,7 +112,7 @@ class Event(object):
async def get_filter_from_event_type(eventtypes):
evfilter = ua.EventFilter()
evfilter.SelectClauses = await select_clauses_from_evtype(eventtypes)
evfilter.WhereClause = where_clause_from_evtype(eventtypes)
evfilter.WhereClause = await where_clause_from_evtype(eventtypes)
return evfilter
......@@ -121,19 +121,19 @@ async def select_clauses_from_evtype(evtypes):
selected_paths = []
for evtype in evtypes:
for prop in await get_event_properties_from_type_node(evtype):
if prop.get_browse_name() not in selected_paths:
browse_name = await prop.get_browse_name()
if browse_name not in selected_paths:
op = ua.SimpleAttributeOperand()
op.AttributeId = ua.AttributeIds.Value
op.BrowsePath = [prop.get_browse_name()]
op.BrowsePath = [browse_name]
clauses.append(op)
selected_paths.append(prop.get_browse_name())
selected_paths.append(browse_name)
return clauses
def where_clause_from_evtype(evtypes):
async def where_clause_from_evtype(evtypes):
cf = ua.ContentFilter()
el = ua.ContentFilterElement()
# operands can be ElementOperand, LiteralOperand, AttributeOperand, SimpleAttribute
# Create a clause where the generate event type property EventType
# must be a subtype of events in evtypes argument
......@@ -144,20 +144,18 @@ def where_clause_from_evtype(evtypes):
op.BrowsePath.append(ua.QualifiedName("EventType", 0))
op.AttributeId = ua.AttributeIds.Value
el.FilterOperands.append(op)
# now create a list of all subtypes we want to accept
subtypes = []
for evtype in evtypes:
subtypes += [st.nodeid for st in ua_utils.get_node_subtypes(evtype)]
for st in await ua_utils.get_node_subtypes(evtype):
subtypes.append(st.nodeid)
subtypes = list(set(subtypes)) # remove duplicates
for subtypeid in subtypes:
op = ua.LiteralOperand()
op.Value = ua.Variant(subtypeid)
el.FilterOperands.append(op)
el.FilterOperator = ua.FilterOperator.InList
cf.Elements.append(el)
return cf
......@@ -165,7 +163,7 @@ async def get_event_properties_from_type_node(node):
properties = []
curr_node = node
while True:
properties.extend(curr_node.get_properties())
properties.extend(await curr_node.get_properties())
if curr_node.nodeid.Identifier == ua.ObjectIds.BaseEventType:
break
parents = await curr_node.get_referenced_nodes(
......
import pytest
from collections import namedtuple
from opcua import Client
from opcua import Server
from .test_common import add_server_methods
from .util_enum_struct import add_server_custom_enum_struct
port_num1 = 48510
port_num = 48540
port_num1 = 48510
port_discovery = 48550
Opc = namedtuple('opc', ['opc', 'server'])
def pytest_generate_tests(metafunc):
if 'opc' in metafunc.fixturenames:
metafunc.parametrize('opc', ['client', 'server'], indirect=True)
@pytest.fixture()
async def server():
# start our own server
srv = Server()
await srv.init()
srv.set_endpoint(f'opc.tcp://127.0.0.1:{port_num}')
await add_server_methods(srv)
await srv.start()
yield srv
# stop the server
await srv.stop()
@pytest.fixture()
async def discovery_server():
# start our own server
srv = Server()
await srv.init()
await srv.set_application_uri('urn:freeopcua:python:discovery')
srv.set_endpoint(f'opc.tcp://127.0.0.1:{port_discovery}')
await srv.start()
yield srv
# stop the server
await srv.stop()
@pytest.fixture()
async def admin_client():
# start admin client
# long timeout since travis (automated testing) can be really slow
clt = Client(f'opc.tcp://admin@127.0.0.1:{port_num1}', timeout=10)
await clt.connect()
yield clt
await clt.disconnect()
@pytest.fixture()
async def client():
# start anonymous client
ro_clt = Client(f'opc.tcp://127.0.0.1:{port_num1}')
await ro_clt.connect()
yield ro_clt
await ro_clt.disconnect()
@pytest.fixture()
async def opc(request):
"""
Fixture for tests that should run for both `Server` and `Client`
:param request:
:return:
"""
if request.param == 'client':
srv = Server()
await srv.init()
......@@ -24,7 +78,7 @@ async def opc(request):
# long timeout since travis (automated testing) can be really slow
clt = Client(f'opc.tcp://admin@127.0.0.1:{port_num}', timeout=10)
await clt.connect()
yield clt
yield Opc(clt, srv)
await clt.disconnect()
await srv.stop()
elif request.param == 'server':
......@@ -32,10 +86,10 @@ async def opc(request):
# start our own server
srv = Server()
await srv.init()
srv.set_endpoint(f'opc.tcp://127.0.0.1:{port_num}')
srv.set_endpoint(f'opc.tcp://127.0.0.1:{port_num1}')
await add_server_methods(srv)
await srv.start()
yield srv
yield Opc(srv, srv)
# stop the server
await srv.stop()
else:
......
......@@ -6,47 +6,10 @@ from opcua import Client
from opcua import Server
from opcua import ua
from .test_common import add_server_methods
from .util_enum_struct import add_server_custom_enum_struct
port_num1 = 48510
_logger = logging.getLogger(__name__)
pytestmark = pytest.mark.asyncio
@pytest.fixture()
async def admin_client():
# start admin client
# long timeout since travis (automated testing) can be really slow
clt = Client(f'opc.tcp://admin@127.0.0.1:{port_num1}', timeout=10)
await clt.connect()
yield clt
await clt.disconnect()
@pytest.fixture()
async def client():
# start anonymous client
ro_clt = Client(f'opc.tcp://127.0.0.1:{port_num1}')
await ro_clt.connect()
yield ro_clt
await ro_clt.disconnect()
@pytest.fixture()
async def server():
# start our own server
srv = Server()
await srv.init()
srv.set_endpoint(f'opc.tcp://127.0.0.1:{port_num1}')
await add_server_methods(srv)
await add_server_custom_enum_struct(srv)
await srv.start()
yield srv
# stop the server
await srv.stop()
async def test_service_fault(server, admin_client):
request = ua.ReadRequest()
request.TypeId = ua.FourByteNodeId(999) # bad type!
......
This diff is collapsed.
......@@ -7,12 +7,7 @@ import pytest
import logging
import os
import shelve
from .test_common import add_server_methods
from .tests_xml import XmlTests
from .tests_subscriptions import SubscriptionTests
from datetime import timedelta, datetime
from tempfile import NamedTemporaryFile
from datetime import timedelta
import opcua
from opcua import Server
......@@ -23,38 +18,10 @@ from opcua.common.event_objects import BaseEvent, AuditEvent, AuditChannelEvent,
AuditOpenSecureChannelEvent
from opcua.common import ua_utils
port_num = 48540
port_discovery = 48550
pytestmark = pytest.mark.asyncio
_logger = logging.getLogger(__name__)
@pytest.fixture()
async def server():
# start our own server
srv = Server()
await srv.init()
srv.set_endpoint(f'opc.tcp://127.0.0.1:{port_num}')
await add_server_methods(srv)
await srv.start()
yield srv
# stop the server
await srv.stop()
@pytest.fixture()
async def discovery_server():
# start our own server
srv = Server()
await srv.init()
await srv.set_application_uri('urn:freeopcua:python:discovery')
srv.set_endpoint(f'opc.tcp://127.0.0.1:{port_discovery}')
await srv.start()
yield srv
# stop the server
await srv.stop()
async def test_discovery(server, discovery_server):
client = Client(discovery_server.endpoint.geturl())
async with client:
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment