Commit b7a7f421 authored by Christian Bergmiller's avatar Christian Bergmiller

test refactoring wip

parent fc60e03c
...@@ -523,8 +523,8 @@ class Client(object): ...@@ -523,8 +523,8 @@ class Client(object):
uries = await self.get_namespace_array() uries = await self.get_namespace_array()
return uries.index(uri) return uries.index(uri)
def delete_nodes(self, nodes, recursive=False): async def delete_nodes(self, nodes, recursive=False):
return delete_nodes(self.uaclient, nodes, recursive) return await delete_nodes(self.uaclient, nodes, recursive)
def import_xml(self, path=None, xmlstring=None): def import_xml(self, path=None, xmlstring=None):
""" """
......
...@@ -103,7 +103,7 @@ class UASocketProtocol(asyncio.Protocol): ...@@ -103,7 +103,7 @@ class UASocketProtocol(asyncio.Protocol):
self.transport.write(msg) self.transport.write(msg)
return future return future
async def send_request(self, request, callback=None, timeout=1000, message_type=ua.MessageType.SecureMessage): async def send_request(self, request, callback=None, timeout=10, message_type=ua.MessageType.SecureMessage):
""" """
Send a request to the server. Send a request to the server.
Timeout is the timeout written in ua header. Timeout is the timeout written in ua header.
......
...@@ -14,7 +14,7 @@ async def copy_node(parent, node, nodeid=None, recursive=True): ...@@ -14,7 +14,7 @@ async def copy_node(parent, node, nodeid=None, recursive=True):
rdesc = await _rdesc_from_node(parent, node) rdesc = await _rdesc_from_node(parent, node)
if nodeid is None: if nodeid is None:
nodeid = ua.NodeId(namespaceidx=node.nodeid.NamespaceIndex) nodeid = ua.NodeId(namespaceidx=node.nodeid.NamespaceIndex)
added_nodeids = _copy_node(parent.server, parent.nodeid, rdesc, nodeid, recursive) added_nodeids = await _copy_node(parent.server, parent.nodeid, rdesc, nodeid, recursive)
return [Node(parent.server, nid) for nid in added_nodeids] return [Node(parent.server, nid) for nid in added_nodeids]
...@@ -29,12 +29,12 @@ async def _copy_node(server, parent_nodeid, rdesc, nodeid, recursive): ...@@ -29,12 +29,12 @@ async def _copy_node(server, parent_nodeid, rdesc, nodeid, recursive):
node_to_copy = Node(server, rdesc.NodeId) node_to_copy = Node(server, rdesc.NodeId)
attr_obj = getattr(ua, rdesc.NodeClass.name + "Attributes") attr_obj = getattr(ua, rdesc.NodeClass.name + "Attributes")
await _read_and_copy_attrs(node_to_copy, attr_obj(), addnode) await _read_and_copy_attrs(node_to_copy, attr_obj(), addnode)
res = await server.add_nodes([addnode])[0] res = (await server.add_nodes([addnode]))[0]
added_nodes = [res.AddedNodeId] added_nodes = [res.AddedNodeId]
if recursive: if recursive:
descs = await node_to_copy.get_children_descriptions() descs = await node_to_copy.get_children_descriptions()
for desc in descs: for desc in descs:
nodes = _copy_node(server, res.AddedNodeId, desc, nodeid=ua.NodeId(namespaceidx=desc.NodeId.NamespaceIndex), recursive=True) nodes = await _copy_node(server, res.AddedNodeId, desc, nodeid=ua.NodeId(namespaceidx=desc.NodeId.NamespaceIndex), recursive=True)
added_nodes.extend(nodes) added_nodes.extend(nodes)
return added_nodes return added_nodes
......
...@@ -406,14 +406,14 @@ def _guess_datatype(variant): ...@@ -406,14 +406,14 @@ def _guess_datatype(variant):
return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name)) return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
def delete_nodes(server, nodes, recursive=False, delete_target_references=True): async def delete_nodes(server, nodes, recursive=False, delete_target_references=True):
""" """
Delete specified nodes. Optionally delete recursively all nodes with a Delete specified nodes. Optionally delete recursively all nodes with a
downward hierachic references to the node downward hierachic references to the node
""" """
nodestodelete = [] nodestodelete = []
if recursive: if recursive:
nodes += _add_childs(nodes) nodes += await _add_childs(nodes)
for mynode in nodes: for mynode in nodes:
it = ua.DeleteNodesItem() it = ua.DeleteNodesItem()
it.NodeId = mynode.nodeid it.NodeId = mynode.nodeid
...@@ -421,11 +421,11 @@ def delete_nodes(server, nodes, recursive=False, delete_target_references=True): ...@@ -421,11 +421,11 @@ def delete_nodes(server, nodes, recursive=False, delete_target_references=True):
nodestodelete.append(it) nodestodelete.append(it)
params = ua.DeleteNodesParameters() params = ua.DeleteNodesParameters()
params.NodesToDelete = nodestodelete params.NodesToDelete = nodestodelete
return server.delete_nodes(params) return await server.delete_nodes(params)
def _add_childs(nodes): async def _add_childs(nodes):
results = [] results = []
for mynode in nodes[:]: for mynode in nodes[:]:
results += mynode.get_children() results += await mynode.get_children()
return results return results
...@@ -6,7 +6,7 @@ from opcua import ua ...@@ -6,7 +6,7 @@ from opcua import ua
from opcua.common import node from opcua.common import node
def call_method(parent, methodid, *args): async def call_method(parent, methodid, *args):
""" """
Call an OPC-UA method. methodid is browse name of child method or the Call an OPC-UA method. methodid is browse name of child method or the
nodeid of method as a NodeId object nodeid of method as a NodeId object
...@@ -14,7 +14,7 @@ def call_method(parent, methodid, *args): ...@@ -14,7 +14,7 @@ def call_method(parent, methodid, *args):
which may be of different types which may be of different types
returns a list of values or a single value depending on the output of the method returns a list of values or a single value depending on the output of the method
""" """
result = call_method_full(parent, methodid, *args) result = await call_method_full(parent, methodid, *args)
if len(result.OutputArguments) == 0: if len(result.OutputArguments) == 0:
return None return None
...@@ -24,7 +24,7 @@ def call_method(parent, methodid, *args): ...@@ -24,7 +24,7 @@ def call_method(parent, methodid, *args):
return result.OutputArguments return result.OutputArguments
def call_method_full(parent, methodid, *args): async def call_method_full(parent, methodid, *args):
""" """
Call an OPC-UA method. methodid is browse name of child method or the Call an OPC-UA method. methodid is browse name of child method or the
nodeid of method as a NodeId object nodeid of method as a NodeId object
...@@ -33,7 +33,7 @@ def call_method_full(parent, methodid, *args): ...@@ -33,7 +33,7 @@ def call_method_full(parent, methodid, *args):
returns a CallMethodResult object with converted OutputArguments returns a CallMethodResult object with converted OutputArguments
""" """
if isinstance(methodid, (str, ua.uatypes.QualifiedName)): if isinstance(methodid, (str, ua.uatypes.QualifiedName)):
methodid = parent.get_child(methodid).nodeid methodid = (await parent.get_child(methodid)).nodeid
elif isinstance(methodid, node.Node): elif isinstance(methodid, node.Node):
methodid = methodid.nodeid methodid = methodid.nodeid
......
...@@ -405,15 +405,11 @@ class Node: ...@@ -405,15 +405,11 @@ class Node:
Since address space may have circular references, a max length is specified Since address space may have circular references, a max length is specified
""" """
path = [] path = await self._get_path(max_length)
for ref in await self._get_path(max_length): path = [Node(self.server, ref.NodeId) for ref in path]
path.append(Node(self.server, ref.NodeId))
path.append(self) path.append(self)
if as_string: if as_string:
str_path = [] path = [(await el.get_browse_name()).to_string() for el in path]
for el in path:
name = await el.get_browse_name()
str_path.append(name.to_string())
return path return path
async def _get_path(self, max_length=20): async def _get_path(self, max_length=20):
...@@ -598,7 +594,7 @@ class Node: ...@@ -598,7 +594,7 @@ class Node:
else: else:
raise ua.UaStatusCodeError(ua.StatusCodes.BadNotFound) raise ua.UaStatusCodeError(ua.StatusCodes.BadNotFound)
ditem = self._fill_delete_reference_item(rdesc, bidirectional) ditem = self._fill_delete_reference_item(rdesc, bidirectional)
await self.server.delete_references([ditem])[0].check() (await self.server.delete_references([ditem]))[0].check()
async def add_reference(self, target, reftype, forward=True, bidirectional=True): async def add_reference(self, target, reftype, forward=True, bidirectional=True):
""" """
...@@ -660,6 +656,7 @@ class Node: ...@@ -660,6 +656,7 @@ class Node:
return opcua.common.manage_nodes.create_method(self, *args) return opcua.common.manage_nodes.create_method(self, *args)
def add_reference_type(self, nodeid, bname, symmetric=True, inversename=None): def add_reference_type(self, nodeid, bname, symmetric=True, inversename=None):
"""COROUTINE"""
return opcua.common.manage_nodes.create_reference_type(self, nodeid, bname, symmetric, inversename) return opcua.common.manage_nodes.create_reference_type(self, nodeid, bname, symmetric, inversename)
def call_method(self, methodid, *args): def call_method(self, methodid, *args):
......
...@@ -141,7 +141,7 @@ class InternalServer(object): ...@@ -141,7 +141,7 @@ class InternalServer(object):
def add_endpoint(self, endpoint): def add_endpoint(self, endpoint):
self.endpoints.append(endpoint) self.endpoints.append(endpoint)
def get_endpoints(self, params=None, sockname=None): async def get_endpoints(self, params=None, sockname=None):
self.logger.info('get endpoint') self.logger.info('get endpoint')
if sockname: if sockname:
# return to client the ip address it has access to # return to client the ip address it has access to
...@@ -261,10 +261,10 @@ class InternalSession(object): ...@@ -261,10 +261,10 @@ class InternalSession(object):
return 'InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})'.format( return 'InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})'.format(
self.name, self.user, self.session_id, self.authentication_token) self.name, self.user, self.session_id, self.authentication_token)
def get_endpoints(self, params=None, sockname=None): async def get_endpoints(self, params=None, sockname=None):
return self.iserver.get_endpoints(params, sockname) return await self.iserver.get_endpoints(params, sockname)
def create_session(self, params, sockname=None): async def create_session(self, params, sockname=None):
self.logger.info('Create session request') self.logger.info('Create session request')
result = ua.CreateSessionResult() result = ua.CreateSessionResult()
...@@ -274,7 +274,7 @@ class InternalSession(object): ...@@ -274,7 +274,7 @@ class InternalSession(object):
result.MaxRequestMessageSize = 65536 result.MaxRequestMessageSize = 65536
self.nonce = utils.create_nonce(32) self.nonce = utils.create_nonce(32)
result.ServerNonce = self.nonce result.ServerNonce = self.nonce
result.ServerEndpoints = self.get_endpoints(sockname=sockname) result.ServerEndpoints = await self.get_endpoints(sockname=sockname)
return result return result
......
...@@ -23,13 +23,9 @@ from opcua.common.structures import load_type_definitions ...@@ -23,13 +23,9 @@ from opcua.common.structures import load_type_definitions
from opcua.common.xmlexporter import XmlExporter from opcua.common.xmlexporter import XmlExporter
from opcua.common.xmlimporter import XmlImporter from opcua.common.xmlimporter import XmlImporter
from opcua.common.ua_utils import get_nodes_of_namespace from opcua.common.ua_utils import get_nodes_of_namespace
from opcua.crypto import uacrypto
use_crypto = True _logger = logging.getLogger(__name__)
try:
from opcua.crypto import uacrypto
except ImportError:
logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
use_crypto = False
class Server: class Server:
...@@ -159,7 +155,7 @@ class Server: ...@@ -159,7 +155,7 @@ class Server:
uries.append(uri) uries.append(uri)
await ns_node.set_value(uries) await ns_node.set_value(uries)
def find_servers(self, uris=None): async def find_servers(self, uris=None):
""" """
find_servers. mainly implemented for symmetry with client find_servers. mainly implemented for symmetry with client
""" """
...@@ -211,8 +207,8 @@ class Server: ...@@ -211,8 +207,8 @@ class Server:
def set_endpoint(self, url): def set_endpoint(self, url):
self.endpoint = urlparse(url) self.endpoint = urlparse(url)
def get_endpoints(self): async def get_endpoints(self):
return self.iserver.get_endpoints() return await self.iserver.get_endpoints()
def set_security_policy(self, security_policy): def set_security_policy(self, security_policy):
""" """
...@@ -545,8 +541,8 @@ class Server: ...@@ -545,8 +541,8 @@ class Server:
nodes = await get_nodes_of_namespace(self, namespaces) nodes = await get_nodes_of_namespace(self, namespaces)
self.export_xml(nodes, path) self.export_xml(nodes, path)
def delete_nodes(self, nodes, recursive=False): async def delete_nodes(self, nodes, recursive=False):
return delete_nodes(self.iserver.isession, nodes, recursive) return await delete_nodes(self.iserver.isession, nodes, recursive)
async def historize_node_data_change(self, node, period=timedelta(days=7), count=0): async def historize_node_data_change(self, node, period=timedelta(days=7), count=0):
""" """
......
...@@ -4,13 +4,14 @@ from threading import RLock, Lock ...@@ -4,13 +4,14 @@ from threading import RLock, Lock
import time import time
from opcua import ua from opcua import ua
from opcua.server.internal_server import InternalServer, InternalSession
from opcua.ua.ua_binary import nodeid_from_binary, struct_from_binary from opcua.ua.ua_binary import nodeid_from_binary, struct_from_binary
from opcua.ua.ua_binary import struct_to_binary, uatcp_to_binary from opcua.ua.ua_binary import struct_to_binary, uatcp_to_binary
from opcua.common import utils from opcua.common import utils
from opcua.common.connection import SecureConnection from opcua.common.connection import SecureConnection
class PublishRequestData(object): class PublishRequestData:
def __init__(self): def __init__(self):
self.requesthdr = None self.requesthdr = None
...@@ -19,14 +20,14 @@ class PublishRequestData(object): ...@@ -19,14 +20,14 @@ class PublishRequestData(object):
self.timestamp = time.time() self.timestamp = time.time()
class UaProcessor(object): class UaProcessor:
def __init__(self, internal_server, socket): def __init__(self, internal_server: InternalServer, socket):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.iserver = internal_server self.iserver: InternalServer = internal_server
self.name = socket.get_extra_info('peername') self.name = socket.get_extra_info('peername')
self.sockname = socket.get_extra_info('sockname') self.sockname = socket.get_extra_info('sockname')
self.session = None self.session: InternalSession = None
self.socket = socket self.socket = socket
self._datalock = RLock() self._datalock = RLock()
self._publishdata_queue = [] self._publishdata_queue = []
...@@ -119,7 +120,7 @@ class UaProcessor(object): ...@@ -119,7 +120,7 @@ class UaProcessor(object):
# create the session on server # create the session on server
self.session = self.iserver.create_session(self.name, external=True) self.session = self.iserver.create_session(self.name, external=True)
# get a session creation result to send back # get a session creation result to send back
sessiondata = self.session.create_session(params, sockname=self.sockname) sessiondata = await self.session.create_session(params, sockname=self.sockname)
response = ua.CreateSessionResponse() response = ua.CreateSessionResponse()
response.Parameters = sessiondata response.Parameters = sessiondata
response.Parameters.ServerCertificate = self._connection.security_policy.client_certificate response.Parameters.ServerCertificate = self._connection.security_policy.client_certificate
...@@ -188,7 +189,7 @@ class UaProcessor(object): ...@@ -188,7 +189,7 @@ class UaProcessor(object):
elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary): elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
self.logger.info("get endpoints request") self.logger.info("get endpoints request")
params = struct_from_binary(ua.GetEndpointsParameters, body) params = struct_from_binary(ua.GetEndpointsParameters, body)
endpoints = self.iserver.get_endpoints(params, sockname=self.sockname) endpoints = await self.iserver.get_endpoints(params, sockname=self.sockname)
response = ua.GetEndpointsResponse() response = ua.GetEndpointsResponse()
response.Endpoints = endpoints response.Endpoints = endpoints
self.logger.info("sending get endpoints response") self.logger.info("sending get endpoints response")
......
...@@ -22,7 +22,7 @@ async def opc(request): ...@@ -22,7 +22,7 @@ async def opc(request):
await srv.start() await srv.start()
# start client # start client
# long timeout since travis (automated testing) can be really slow # long timeout since travis (automated testing) can be really slow
clt = Client(f'opc.tcp://127.0.0.1:{port_num}', timeout=10) clt = Client(f'opc.tcp://admin@127.0.0.1:{port_num}', timeout=10)
await clt.connect() await clt.connect()
yield clt yield clt
await clt.disconnect() await clt.disconnect()
......
...@@ -7,7 +7,7 @@ from opcua import Server ...@@ -7,7 +7,7 @@ from opcua import Server
from opcua import ua from opcua import ua
from .test_common import add_server_methods from .test_common import add_server_methods
from .tests_enum_struct import add_server_custom_enum_struct from .util_enum_struct import add_server_custom_enum_struct
port_num1 = 48510 port_num1 = 48510
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
......
# encoding: utf-8 # encoding: utf-8
from concurrent.futures import Future, TimeoutError
"""
Tests that will be run twice. Once on server side and once on
client side since we have been carefull to have the exact
same api on server and client side
"""
import pytest import pytest
from datetime import datetime from datetime import datetime
from datetime import timedelta from datetime import timedelta
import math import math
from contextlib import contextmanager
from opcua import ua from opcua import ua
from opcua import Node
from opcua import uamethod from opcua import uamethod
from opcua import instantiate from opcua import instantiate
from opcua import copy_node from opcua import copy_node
from opcua.common import ua_utils from opcua.common import ua_utils
from opcua.common.methods import call_method_full from opcua.common.methods import call_method_full
pytestmark = pytest.mark.asyncio
async def add_server_methods(srv): async def add_server_methods(srv):
@uamethod @uamethod
...@@ -70,22 +76,17 @@ async def add_server_methods(srv): ...@@ -70,22 +76,17 @@ async def add_server_methods(srv):
return 1, 2, 3 return 1, 2, 3
o = srv.get_objects_node() o = srv.get_objects_node()
await o.add_method(ua.NodeId("ServerMethodTuple", 2), ua.QualifiedName('ServerMethodTuple', 2), func5, [], await o.add_method(
[ua.VariantType.Int64, ua.VariantType.Int64, ua.VariantType.Int64]) ua.NodeId("ServerMethodTuple", 2), ua.QualifiedName('ServerMethodTuple', 2), func5, [],
[ua.VariantType.Int64, ua.VariantType.Int64, ua.VariantType.Int64]
)
"""
Tests that will be run twice. Once on server side and once on
client side since we have been carefull to have the exact
same api on server and client side
"""
async def test_find_servers(opc): async def test_find_servers(opc):
servers = await opc.find_servers() servers = await opc.find_servers()
# FIXME : finish # FIXME : finish
async def test_add_node_bad_args(opc): async def test_add_node_bad_args(opc):
obj = opc.get_objects_node() obj = opc.get_objects_node()
...@@ -104,6 +105,7 @@ async def test_add_node_bad_args(opc): ...@@ -104,6 +105,7 @@ async def test_add_node_bad_args(opc):
with pytest.raises(ua.UaError): with pytest.raises(ua.UaError):
fold = await obj.add_folder("i=0;s='oooo'", "tt:oioi") fold = await obj.add_folder("i=0;s='oooo'", "tt:oioi")
async def test_delete_nodes(opc): async def test_delete_nodes(opc):
obj = opc.get_objects_node() obj = opc.get_objects_node()
fold = await obj.add_folder(2, "FolderToDelete") fold = await obj.add_folder(2, "FolderToDelete")
...@@ -116,7 +118,8 @@ async def test_delete_nodes(opc): ...@@ -116,7 +118,8 @@ async def test_delete_nodes(opc):
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
await obj.get_child(["2:FolderToDelete", "2:VarToDelete"]) await obj.get_child(["2:FolderToDelete", "2:VarToDelete"])
childs = await fold.get_children() childs = await fold.get_children()
assert var not in childs assert var not in childs
async def test_delete_nodes_recursive(opc): async def test_delete_nodes_recursive(opc):
obj = opc.get_objects_node() obj = opc.get_objects_node()
...@@ -128,6 +131,7 @@ async def test_delete_nodes_recursive(opc): ...@@ -128,6 +131,7 @@ async def test_delete_nodes_recursive(opc):
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
await obj.get_child(["2:FolderToDelete", "2:VarToDelete"]) await obj.get_child(["2:FolderToDelete", "2:VarToDelete"])
async def test_delete_nodes_recursive2(opc): async def test_delete_nodes_recursive2(opc):
obj = opc.get_objects_node() obj = opc.get_objects_node()
fold = await obj.add_folder(2, "FolderToDeleteRoot") fold = await obj.add_folder(2, "FolderToDeleteRoot")
...@@ -148,731 +152,794 @@ async def test_delete_nodes_recursive2(opc): ...@@ -148,731 +152,794 @@ async def test_delete_nodes_recursive2(opc):
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
await node.get_browse_name() await node.get_browse_name()
def test_delete_references(opc):
newtype = opc.get_node(ua.ObjectIds.HierarchicalReferences).add_reference_type(0, "HasSuperSecretVariable") async def test_delete_references(opc):
newtype = await opc.get_node(ua.ObjectIds.HierarchicalReferences).add_reference_type(0, "HasSuperSecretVariable")
obj = opc.get_objects_node() obj = opc.get_objects_node()
fold = obj.add_folder(2, "FolderToRef") fold = await obj.add_folder(2, "FolderToRef")
var = fold.add_variable(2, "VarToRef", 42) var = await fold.add_variable(2, "VarToRef", 42)
fold.add_reference(var, newtype) await fold.add_reference(var, newtype)
assert [fold] == var.get_referenced_nodes(newtype) assert [fold] == await var.get_referenced_nodes(newtype)
assert [var] == fold.get_referenced_nodes(newtype) assert [var] == await fold.get_referenced_nodes(newtype)
fold.delete_reference(var, newtype) await fold.delete_reference(var, newtype)
assert [] == var.get_referenced_nodes(newtype) assert [] == await var.get_referenced_nodes(newtype)
assert [] == fold.get_referenced_nodes(newtype) assert [] == await fold.get_referenced_nodes(newtype)
fold.add_reference(var, newtype, bidirectional=False) await fold.add_reference(var, newtype, bidirectional=False)
assert [] == var.get_referenced_nodes(newtype) assert [] == await var.get_referenced_nodes(newtype)
assert [var] == fold.get_referenced_nodes(newtype) assert [var] == await fold.get_referenced_nodes(newtype)
fold.delete_reference(var, newtype) await fold.delete_reference(var, newtype)
assert [] == var.get_referenced_nodes(newtype) assert [] == await var.get_referenced_nodes(newtype)
assert [] == fold.get_referenced_nodes(newtype) assert [] == await fold.get_referenced_nodes(newtype)
var.add_reference(fold, newtype, forward=False, bidirectional=False) await var.add_reference(fold, newtype, forward=False, bidirectional=False)
assert [fold] == var.get_referenced_nodes(newtype) assert [fold] == await var.get_referenced_nodes(newtype)
assert [] == fold.get_referenced_nodes(newtype) assert [] == await fold.get_referenced_nodes(newtype)
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
fold.delete_reference(var, newtype) await fold.delete_reference(var, newtype)
assert [fold] == var.get_referenced_nodes(newtype) assert [fold] == await var.get_referenced_nodes(newtype)
assert [] == fold.get_referenced_nodes(newtype) assert [] == await fold.get_referenced_nodes(newtype)
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
var.delete_reference(fold, newtype) await var.delete_reference(fold, newtype)
assert [fold] == var.get_referenced_nodes(newtype) assert [fold] == await var.get_referenced_nodes(newtype)
assert [] == fold.get_referenced_nodes(newtype) assert [] == await fold.get_referenced_nodes(newtype)
var.delete_reference(fold, newtype, forward=False) await var.delete_reference(fold, newtype, forward=False)
assert [] == var.get_referenced_nodes(newtype) assert [] == await var.get_referenced_nodes(newtype)
assert [] == fold.get_referenced_nodes(newtype) assert [] == await fold.get_referenced_nodes(newtype)
# clean-up # clean-up
opc.delete_nodes([fold, newtype], recursive=True) await opc.delete_nodes([fold, newtype], recursive=True)
def test_server_node(opc): async def test_server_node(opc):
node = opc.get_server_node() node = opc.get_server_node()
assert ua.QualifiedName('Server', 0) == node.get_browse_name() assert ua.QualifiedName('Server', 0) == await node.get_browse_name()
def test_root(opc):
async def test_root(opc):
root = opc.get_root_node() root = opc.get_root_node()
assert ua.QualifiedName('Root', 0) == root.get_browse_name() assert ua.QualifiedName('Root', 0) == await root.get_browse_name()
assert ua.LocalizedText('Root') == root.get_display_name() assert ua.LocalizedText('Root') == await root.get_display_name()
nid = ua.NodeId(84, 0) nid = ua.NodeId(84, 0)
assert nid == root.nodeid assert nid == root.nodeid
def test_objects(opc):
async def test_objects(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
assert ua.QualifiedName('Objects', 0) == objects.get_browse_name() assert ua.QualifiedName('Objects', 0) == await objects.get_browse_name()
nid = ua.NodeId(85, 0) nid = ua.NodeId(85, 0)
assert nid == objects.nodeid assert nid == objects.nodeid
def test_browse(opc):
async def test_browse(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
obj = objects.add_object(4, "browsetest") obj = await objects.add_object(4, "browsetest")
folder = obj.add_folder(4, "folder") folder = await obj.add_folder(4, "folder")
prop = obj.add_property(4, "property", 1) prop = await obj.add_property(4, "property", 1)
prop2 = obj.add_property(4, "property2", 2) prop2 = await obj.add_property(4, "property2", 2)
var = obj.add_variable(4, "variable", 3) var = await obj.add_variable(4, "variable", 3)
obj2 = obj.add_object(4, "obj") obj2 = await obj.add_object(4, "obj")
alle = obj.get_children() alle = await obj.get_children()
assert prop in alle assert prop in alle
assert prop2 in alle assert prop2 in alle
assert var in alle assert var in alle
assert folder in alle assert folder in alle
assert obj not in alle assert obj not in alle
props = obj.get_children(refs=ua.ObjectIds.HasProperty) props = await obj.get_children(refs=ua.ObjectIds.HasProperty)
assert prop in props assert prop in props
assert prop2 in props assert prop2 in props
assert var not in props assert var not in props
assert folder not in props assert folder not in props
assert obj2 not in props assert obj2 not in props
all_vars = obj.get_children(nodeclassmask=ua.NodeClass.Variable) all_vars = await obj.get_children(nodeclassmask=ua.NodeClass.Variable)
assert prop in all_vars assert prop in all_vars
assert var in all_vars assert var in all_vars
assert folder not in props assert folder not in props
assert obj2 not in props assert obj2 not in props
all_objs = obj.get_children(nodeclassmask=ua.NodeClass.Object) all_objs = await obj.get_children(nodeclassmask=ua.NodeClass.Object)
assert folder in all_objs assert folder in all_objs
assert obj2 in all_objs assert obj2 in all_objs
assert var not in all_objs assert var not in all_objs
def test_browse_references(opc):
async def test_browse_references(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
folder = objects.add_folder(4, "folder") folder = await objects.add_folder(4, "folder")
childs = objects.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Forward, childs = await objects.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Forward, includesubtypes=False
)
assert folder in childs assert folder in childs
childs = objects.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Both, childs = await objects.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Both, includesubtypes=False
)
assert folder in childs assert folder in childs
childs = objects.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, childs = await objects.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, includesubtypes=False
)
assert folder not in childs assert folder not in childs
parents = folder.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, parents = await folder.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, includesubtypes=False
)
assert objects in parents assert objects in parents
parents = folder.get_referenced_nodes(refs=ua.ObjectIds.HierarchicalReferences, parents = await folder.get_referenced_nodes(
direction=ua.BrowseDirection.Inverse, includesubtypes=False) refs=ua.ObjectIds.HierarchicalReferences, direction=ua.BrowseDirection.Inverse, includesubtypes=False
)
assert objects in parents assert objects in parents
assert await folder.get_parent() == objects
parent = folder.get_parent()
assert parent == objects
def test_browsename_with_spaces(opc): async def test_browsename_with_spaces(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
v = o.add_variable(3, 'BNVariable with spaces and %&+?/', 1.3) v = await o.add_variable(3, 'BNVariable with spaces and %&+?/', 1.3)
v2 = o.get_child("3:BNVariable with spaces and %&+?/") v2 = await o.get_child("3:BNVariable with spaces and %&+?/")
assert v == v2 assert v == v2
def test_non_existing_path(opc):
async def test_non_existing_path(opc):
root = opc.get_root_node() root = opc.get_root_node()
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
server_time_node = root.get_child(['0:Objects', '0:Server', '0:nonexistingnode']) await root.get_child(['0:Objects', '0:Server', '0:nonexistingnode'])
def test_bad_attribute(opc): async def test_bad_attribute(opc):
root = opc.get_root_node() root = opc.get_root_node()
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
root.set_value(99) await root.set_value(99)
def test_get_node_by_nodeid(opc): async def test_get_node_by_nodeid(opc):
root = opc.get_root_node() root = opc.get_root_node()
server_time_node = root.get_child(['0:Objects', '0:Server', '0:ServerStatus', '0:CurrentTime']) server_time_node = await root.get_child(['0:Objects', '0:Server', '0:ServerStatus', '0:CurrentTime'])
correct = opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime)) correct = opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
assert server_time_node == correct assert server_time_node == correct
def test_datetime_read(opc):
async def test_datetime_read(opc):
time_node = opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime)) time_node = opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
dt = time_node.get_value() dt = await time_node.get_value()
utcnow = datetime.utcnow() utcnow = datetime.utcnow()
delta = utcnow - dt delta = utcnow - dt
assert delta < timedelta(seconds=1) assert delta < timedelta(seconds=1)
def test_datetime_write(opc):
async def test_datetime_write(opc):
time_node = opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime)) time_node = opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
now = datetime.utcnow() now = datetime.utcnow()
objects = opc.get_objects_node() objects = opc.get_objects_node()
v1 = objects.add_variable(4, "test_datetime", now) v1 = await objects.add_variable(4, "test_datetime", now)
tid = v1.get_value() tid = await v1.get_value()
assert now == tid assert now == tid
def test_variant_array_dim(opc):
async def test_variant_array_dim(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
l = [[[1.0, 1.0, 1.0, 1.0], [2.0, 2.0, 2.0, 2.0], [3.0, 3.0, 3.0, 3.0]], l = [[[1.0, 1.0, 1.0, 1.0], [2.0, 2.0, 2.0, 2.0], [3.0, 3.0, 3.0, 3.0]],
[[5.0, 5.0, 5.0, 5.0], [7.0, 8.0, 9.0, 01.0], [1.0, 1.0, 1.0, 1.0]]] [[5.0, 5.0, 5.0, 5.0], [7.0, 8.0, 9.0, 01.0], [1.0, 1.0, 1.0, 1.0]]]
v = objects.add_variable(3, 'variableWithDims', l) v = await objects.add_variable(3, 'variableWithDims', l)
v.set_array_dimensions([0, 0, 0]) await v.set_array_dimensions([0, 0, 0])
dim = v.get_array_dimensions() dim = await v.get_array_dimensions()
assert [0, 0, 0] == dim assert [0, 0, 0] == dim
v.set_value_rank(0) await v.set_value_rank(0)
rank = v.get_value_rank() rank = await v.get_value_rank()
assert 0 == rank assert 0 == rank
v2 = v.get_value() v2 = await v.get_value()
assert l == v2 assert l == v2
dv = v.get_data_value() dv = await v.get_data_value()
assert [2, 3, 4] == dv.Value.Dimensions assert [2, 3, 4] == dv.Value.Dimensions
l = [[[], [], []], [[], [], []]] l = [[[], [], []], [[], [], []]]
variant = ua.Variant(l, ua.VariantType.UInt32) variant = ua.Variant(l, ua.VariantType.UInt32)
v = objects.add_variable(3, 'variableWithDimsEmpty', variant) v = await objects.add_variable(3, 'variableWithDimsEmpty', variant)
v2 = v.get_value() v2 = await v.get_value()
assert l == v2 assert l == v2
dv = v.get_data_value() dv = await v.get_data_value()
assert [2, 3, 0] == dv.Value.Dimensions assert [2, 3, 0] == dv.Value.Dimensions
def test_add_numeric_variable(opc):
async def test_add_numeric_variable(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
v = objects.add_variable('ns=3;i=888;', '3:numericnodefromstring', 99) v = await objects.add_variable('ns=3;i=888;', '3:numericnodefromstring', 99)
nid = ua.NodeId(888, 3) nid = ua.NodeId(888, 3)
qn = ua.QualifiedName('numericnodefromstring', 3) qn = ua.QualifiedName('numericnodefromstring', 3)
assert nid == v.nodeid assert nid == v.nodeid
assert qn == v.get_browse_name() assert qn == await v.get_browse_name()
def test_add_string_variable(opc): async def test_add_string_variable(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
v = objects.add_variable('ns=3;s=stringid;', '3:stringnodefromstring', [68]) v = await objects.add_variable('ns=3;s=stringid;', '3:stringnodefromstring', [68])
nid = ua.NodeId('stringid', 3) nid = ua.NodeId('stringid', 3)
qn = ua.QualifiedName('stringnodefromstring', 3) qn = ua.QualifiedName('stringnodefromstring', 3)
assert nid == v.nodeid assert nid == v.nodeid
assert qn == v.get_browse_name() assert qn == await v.get_browse_name()
def test_utf8(opc): async def test_utf8(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
utf_string = "æøå@%&" utf_string = "æøå@%&"
bn = ua.QualifiedName(utf_string, 3) bn = ua.QualifiedName(utf_string, 3)
nid = ua.NodeId("æølå", 3) nid = ua.NodeId("æølå", 3)
val = "æøå" val = "æøå"
v = objects.add_variable(nid, bn, val) v = await objects.add_variable(nid, bn, val)
assert nid == v.nodeid assert nid == v.nodeid
val2 = v.get_value() val2 = await v.get_value()
assert val == val2 assert val == val2
bn2 = v.get_browse_name() bn2 = await v.get_browse_name()
assert bn == bn2 assert bn == bn2
def test_null_variable(opc):
async def test_null_variable(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
var = objects.add_variable(3, 'nullstring', "a string") var = await objects.add_variable(3, 'nullstring', "a string")
var.set_value(None) await var.set_value(None)
val = var.get_value() val = await var.get_value()
assert val is None assert val is None
var.set_value("") await var.set_value("")
val = var.get_value() val = await var.get_value()
assert val is not None assert val is not None
assert "" == val assert "" == val
def test_variable_data_type(opc):
async def test_variable_data_type(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
var = objects.add_variable(3, 'stringfordatatype', "a string") var = await objects.add_variable(3, 'stringfordatatype', "a string")
val = var.get_data_type_as_variant_type() val = await var.get_data_type_as_variant_type()
assert ua.VariantType.String == val assert ua.VariantType.String == val
var = objects.add_variable(3, 'stringarrayfordatatype', ["a", "b"]) var = await objects.add_variable(3, 'stringarrayfordatatype', ["a", "b"])
val = var.get_data_type_as_variant_type() val = await var.get_data_type_as_variant_type()
assert ua.VariantType.String == val assert ua.VariantType.String == val
def test_add_string_array_variable(opc):
async def test_add_string_array_variable(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
v = objects.add_variable('ns=3;s=stringarrayid;', '9:stringarray', ['l', 'b']) v = await objects.add_variable('ns=3;s=stringarrayid;', '9:stringarray', ['l', 'b'])
nid = ua.NodeId('stringarrayid', 3) nid = ua.NodeId('stringarrayid', 3)
qn = ua.QualifiedName('stringarray', 9) qn = ua.QualifiedName('stringarray', 9)
assert nid == v.nodeid assert nid == v.nodeid
assert qn == v.get_browse_name() assert qn == await v.get_browse_name()
val = v.get_value() val = await v.get_value()
assert ['l', 'b'] == val assert ['l', 'b'] == val
def test_add_numeric_node(opc):
async def test_add_numeric_node(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
nid = ua.NodeId(9999, 3) nid = ua.NodeId(9999, 3)
qn = ua.QualifiedName('AddNodeVar1', 3) qn = ua.QualifiedName('AddNodeVar1', 3)
v1 = objects.add_variable(nid, qn, 0) v1 = await objects.add_variable(nid, qn, 0)
assert nid == v1.nodeid assert nid == v1.nodeid
assert qn == v1.get_browse_name() assert qn == await v1.get_browse_name()
def test_add_string_node(opc): async def test_add_string_node(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
qn = ua.QualifiedName('AddNodeVar2', 3) qn = ua.QualifiedName('AddNodeVar2', 3)
nid = ua.NodeId('AddNodeVar2Id', 3) nid = ua.NodeId('AddNodeVar2Id', 3)
v2 = objects.add_variable(nid, qn, 0) v2 = await objects.add_variable(nid, qn, 0)
assert nid == v2.nodeid assert nid == v2.nodeid
assert qn == v2.get_browse_name() assert qn == await v2.get_browse_name()
def test_add_find_node_(opc): async def test_add_find_node_(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
o = objects.add_object('ns=2;i=101;', '2:AddFindObject') o = await objects.add_object('ns=2;i=101;', '2:AddFindObject')
o2 = objects.get_child('2:AddFindObject') o2 = await objects.get_child('2:AddFindObject')
assert o == o2 assert o == o2
def test_node_path(opc):
async def test_node_path(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
o = objects.add_object('ns=2;i=105;', '2:NodePathObject') o = await objects.add_object('ns=2;i=105;', '2:NodePathObject')
root = opc.get_root_node() root = opc.get_root_node()
o2 = root.get_child(['0:Objects', '2:NodePathObject']) o2 = await root.get_child(['0:Objects', '2:NodePathObject'])
assert o == o2 assert o == o2
def test_add_read_node(opc):
async def test_add_read_node(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
o = objects.add_object('ns=2;i=102;', '2:AddReadObject') o = await objects.add_object('ns=2;i=102;', '2:AddReadObject')
nid = ua.NodeId(102, 2) nid = ua.NodeId(102, 2)
assert nid == o.nodeid assert nid == o.nodeid
qn = ua.QualifiedName('AddReadObject', 2) qn = ua.QualifiedName('AddReadObject', 2)
assert qn == o.get_browse_name() assert qn == await o.get_browse_name()
def test_simple_value(opc): async def test_simple_value(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
v = o.add_variable(3, 'VariableTestValue', 4.32) v = await o.add_variable(3, 'VariableTestValue', 4.32)
val = v.get_value() val = await v.get_value()
assert 4.32 == val assert 4.32 == val
def test_add_exception(opc):
async def test_add_exception(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
o = objects.add_object('ns=2;i=103;', '2:AddReadObject') await objects.add_object('ns=2;i=103;', '2:AddReadObject')
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
o2 = objects.add_object('ns=2;i=103;', '2:AddReadObject') await objects.add_object('ns=2;i=103;', '2:AddReadObject')
def test_negative_value(opc): async def test_negative_value(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
v = o.add_variable(3, 'VariableNegativeValue', 4) v = await o.add_variable(3, 'VariableNegativeValue', 4)
v.set_value(-4.54) await v.set_value(-4.54)
val = v.get_value() assert -4.54 == await v.get_value()
assert -4.54 == val
def test_read_server_state(opc): async def test_read_server_state(opc):
statenode = opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_State)) statenode = opc.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_State))
state = statenode.get_value() assert 0 == await statenode.get_value()
assert 0 == state
def test_bad_node(opc):
async def test_bad_node(opc):
bad = opc.get_node(ua.NodeId(999, 999)) bad = opc.get_node(ua.NodeId(999, 999))
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
bad.get_browse_name() await bad.get_browse_name()
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
bad.set_value(89) await bad.set_value(89)
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
bad.add_object(0, "0:myobj") await bad.add_object(0, "0:myobj")
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
bad.get_child("0:myobj") await bad.get_child("0:myobj")
def test_value(opc): async def test_value(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
var = ua.Variant(1.98, ua.VariantType.Double) var = ua.Variant(1.98, ua.VariantType.Double)
v = o.add_variable(3, 'VariableValue', var) v = await o.add_variable(3, 'VariableValue', var)
val = v.get_value() assert 1.98 == await v.get_value()
assert 1.98 == val
dvar = ua.DataValue(var) dvar = ua.DataValue(var)
dv = v.get_data_value() dv = await v.get_data_value()
assert ua.DataValue == type(dv) assert ua.DataValue == type(dv)
assert dvar.Value == dv.Value assert dvar.Value == dv.Value
assert dvar.Value == var assert dvar.Value == var
def test_set_value(opc):
async def test_set_value(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
var = ua.Variant(1.98, ua.VariantType.Double) var = ua.Variant(1.98, ua.VariantType.Double)
dvar = ua.DataValue(var) dvar = ua.DataValue(var)
v = o.add_variable(3, 'VariableValue', var) v = await o.add_variable(3, 'VariableValue', var)
v.set_value(var.Value) await v.set_value(var.Value)
v1 = v.get_value() v1 = await v.get_value()
assert v1 == var.Value assert v1 == var.Value
v.set_value(var) await v.set_value(var)
v2 = v.get_value() v2 = await v.get_value()
assert v2 == var.Value assert v2 == var.Value
v.set_data_value(dvar) await v.set_data_value(dvar)
v3 = v.get_data_value() v3 = await v.get_data_value()
assert v3.Value == dvar.Value assert v3.Value == dvar.Value
def test_array_value(opc):
async def test_array_value(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
v = o.add_variable(3, 'VariableArrayValue', [1, 2, 3]) v = await o.add_variable(3, 'VariableArrayValue', [1, 2, 3])
val = v.get_value() assert [1, 2, 3] == await v.get_value()
assert [1, 2, 3] == val
def test_bool_variable(opc): async def test_bool_variable(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
v = o.add_variable(3, 'BoolVariable', True) v = await o.add_variable(3, 'BoolVariable', True)
dt = v.get_data_type_as_variant_type() dt = await v.get_data_type_as_variant_type()
assert ua.VariantType.Boolean == dt assert ua.VariantType.Boolean == dt
val = v.get_value() val = await v.get_value()
assert val is True assert val is True
v.set_value(False) await v.set_value(False)
val = v.get_value() val = await v.get_value()
assert val is False assert val is False
def test_array_size_one_value(opc):
async def test_array_size_one_value(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
v = o.add_variable(3, 'VariableArrayValue', [1, 2, 3]) v = await o.add_variable(3, 'VariableArrayValue', [1, 2, 3])
v.set_value([1]) await v.set_value([1])
val = v.get_value() assert [1] == await v.get_value()
assert [1] == val
def test_use_namespace(opc): async def test_use_namespace(opc):
idx = opc.get_namespace_index("urn:freeopcua:python:server") idx = await opc.get_namespace_index("urn:freeopcua:python:server")
assert 1 == idx assert 1 == idx
root = opc.get_root_node() root = opc.get_root_node()
myvar = root.add_variable(idx, 'var_in_custom_namespace', [5]) myvar = await root.add_variable(idx, 'var_in_custom_namespace', [5])
myid = myvar.nodeid myid = myvar.nodeid
assert idx == myid.NamespaceIndex assert idx == myid.NamespaceIndex
def test_method(opc):
async def test_method(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
m = o.get_child("2:ServerMethod") await o.get_child("2:ServerMethod")
result = o.call_method("2:ServerMethod", 2.1) result = await o.call_method("2:ServerMethod", 2.1)
assert 4.2 == result assert 4.2 == result
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
# FIXME: we should raise a more precise exception # FIXME: we should raise a more precise exception
result = o.call_method("2:ServerMethod", 2.1, 89, 9) await o.call_method("2:ServerMethod", 2.1, 89, 9)
with pytest.raises(ua.UaStatusCodeError): with pytest.raises(ua.UaStatusCodeError):
result = o.call_method(ua.NodeId(999), 2.1) # non existing method await o.call_method(ua.NodeId(999), 2.1) # non existing method
def test_method_array(opc):
async def test_method_array(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
m = o.get_child("2:ServerMethodArray") m = await o.get_child("2:ServerMethodArray")
result = o.call_method(m, "sin", ua.Variant(math.pi)) result = await o.call_method(m, "sin", ua.Variant(math.pi))
assert result < 0.01 assert result < 0.01
with pytest.raises(ua.UaStatusCodeError) as cm: with pytest.raises(ua.UaStatusCodeError) as cm:
result = o.call_method(m, "cos", ua.Variant(math.pi)) await o.call_method(m, "cos", ua.Variant(math.pi))
assert ua.StatusCodes.BadInvalidArgument == cm.exception.code assert ua.StatusCodes.BadInvalidArgument == cm.exception.code
with pytest.raises(ua.UaStatusCodeError) as cm: with pytest.raises(ua.UaStatusCodeError) as cm:
result = o.call_method(m, "panic", ua.Variant(math.pi)) await o.call_method(m, "panic", ua.Variant(math.pi))
assert ua.StatusCodes.BadOutOfMemory == cm.exception.code assert ua.StatusCodes.BadOutOfMemory == cm.exception.code
def test_method_array2(opc):
async def test_method_array2(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
m = o.get_child("2:ServerMethodArray2") m = o.get_child("2:ServerMethodArray2")
result = o.call_method(m, [1.1, 3.4, 9]) result = await o.call_method(m, [1.1, 3.4, 9])
assert [2.2, 6.8, 18] == result assert [2.2, 6.8, 18] == result
result = call_method_full(o, m, [1.1, 3.4, 9]) result = await call_method_full(o, m, [1.1, 3.4, 9])
assert [[2.2, 6.8, 18]] == result.OutputArguments assert [[2.2, 6.8, 18]] == result.OutputArguments
def test_method_tuple(opc):
async def test_method_tuple(opc):
o = opc.get_objects_node() o = opc.get_objects_node()
m = o.get_child("2:ServerMethodTuple") m = await o.get_child("2:ServerMethodTuple")
result = o.call_method(m) result = await o.call_method(m)
assert [1, 2, 3] == result assert [1, 2, 3] == result
result = call_method_full(o, m) result = await call_method_full(o, m)
assert [1, 2, 3] == result.OutputArguments assert [1, 2, 3] == result.OutputArguments
def test_method_none(opc):
async def test_method_none(opc):
# this test calls the function linked to the type's method.. # this test calls the function linked to the type's method..
o = opc.get_node(ua.ObjectIds.BaseObjectType).get_child("2:ObjectWithMethodsType") o = await opc.get_node(ua.ObjectIds.BaseObjectType).get_child("2:ObjectWithMethodsType")
m = o.get_child("2:ServerMethodDefault") m = await o.get_child("2:ServerMethodDefault")
result = o.call_method(m) result = await o.call_method(m)
assert result is None assert result is None
result = call_method_full(o, m) result = await call_method_full(o, m)
assert [] == result.OutputArguments assert [] == result.OutputArguments
def test_add_nodes(opc):
async def test_add_nodes(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
f = objects.add_folder(3, 'MyFolder') f = await objects.add_folder(3, 'MyFolder')
child = objects.get_child("3:MyFolder") child = await objects.get_child("3:MyFolder")
assert child == f assert child == f
o = f.add_object(3, 'MyObject') o = await f.add_object(3, 'MyObject')
child = f.get_child("3:MyObject") child = await f.get_child("3:MyObject")
assert child == o assert child == o
v = f.add_variable(3, 'MyVariable', 6) v = await f.add_variable(3, 'MyVariable', 6)
child = f.get_child("3:MyVariable") child = await f.get_child("3:MyVariable")
assert child == v assert child == v
p = f.add_property(3, 'MyProperty', 10) p = await f.add_property(3, 'MyProperty', 10)
child = f.get_child("3:MyProperty") child = await f.get_child("3:MyProperty")
assert child == p assert child == p
childs = f.get_children() childs = await f.get_children()
assert o in childs assert o in childs
assert v in childs assert v in childs
assert p in childs assert p in childs
def test_modelling_rules(opc):
obj = opc.nodes.base_object_type.add_object_type(2, 'MyFooObjectType')
v = obj.add_variable(2, "myvar", 1.1)
v.set_modelling_rule(True)
p = obj.add_property(2, "myvar", 1.1)
p.set_modelling_rule(False)
refs = obj.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule) async def test_modelling_rules(opc):
obj = await opc.nodes.base_object_type.add_object_type(2, 'MyFooObjectType')
v = await obj.add_variable(2, "myvar", 1.1)
await v.set_modelling_rule(True)
p = await obj.add_property(2, "myvar", 1.1)
await p.set_modelling_rule(False)
refs = await obj.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule)
assert 0 == len(refs) assert 0 == len(refs)
refs = v.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule) refs = await v.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule)
assert opc.get_node(ua.ObjectIds.ModellingRule_Mandatory) == refs[0] assert opc.get_node(ua.ObjectIds.ModellingRule_Mandatory) == refs[0]
refs = p.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule) refs = await p.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule)
assert opc.get_node(ua.ObjectIds.ModellingRule_Optional) == refs[0] assert opc.get_node(ua.ObjectIds.ModellingRule_Optional) == refs[0]
p.set_modelling_rule(None) await p.set_modelling_rule(None)
refs = p.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule) refs = await p.get_referenced_nodes(refs=ua.ObjectIds.HasModellingRule)
assert 0 == len(refs) assert 0 == len(refs)
def test_incl_subtypes(opc):
base_type = opc.get_root_node().get_child(["0:Types", "0:ObjectTypes", "0:BaseObjectType"]) async def test_incl_subtypes(opc):
descs = base_type.get_children_descriptions(includesubtypes=True) base_type = await opc.get_root_node().get_child(["0:Types", "0:ObjectTypes", "0:BaseObjectType"])
descs = await base_type.get_children_descriptions(includesubtypes=True)
assert len(descs) > 10 assert len(descs) > 10
descs = base_type.get_children_descriptions(includesubtypes=False) descs = await base_type.get_children_descriptions(includesubtypes=False)
assert 0 == len(descs) assert 0 == len(descs)
def test_add_node_with_type(opc):
async def test_add_node_with_type(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
f = objects.add_folder(3, 'MyFolder_TypeTest') f = await objects.add_folder(3, 'MyFolder_TypeTest')
o = f.add_object(3, 'MyObject1', ua.ObjectIds.BaseObjectType) o = await f.add_object(3, 'MyObject1', ua.ObjectIds.BaseObjectType)
assert ua.ObjectIds.BaseObjectType == o.get_type_definition().Identifier assert ua.ObjectIds.BaseObjectType == (await o.get_type_definition()).Identifier
o = f.add_object(3, 'MyObject2', ua.NodeId(ua.ObjectIds.BaseObjectType, 0)) o = await f.add_object(3, 'MyObject2', ua.NodeId(ua.ObjectIds.BaseObjectType, 0))
assert ua.ObjectIds.BaseObjectType == o.get_type_definition().Identifier assert ua.ObjectIds.BaseObjectType == (await o.get_type_definition()).Identifier
base_otype = opc.get_node(ua.ObjectIds.BaseObjectType) base_otype = opc.get_node(ua.ObjectIds.BaseObjectType)
custom_otype = base_otype.add_object_type(2, 'MyFooObjectType') custom_otype = await base_otype.add_object_type(2, 'MyFooObjectType')
o = f.add_object(3, 'MyObject3', custom_otype.nodeid) o = await f.add_object(3, 'MyObject3', custom_otype.nodeid)
assert custom_otype.nodeid.Identifier == o.get_type_definition().Identifier assert custom_otype.nodeid.Identifier == (await o.get_type_definition()).Identifier
references = o.get_references(refs=ua.ObjectIds.HasTypeDefinition, direction=ua.BrowseDirection.Forward) references = await o.get_references(refs=ua.ObjectIds.HasTypeDefinition, direction=ua.BrowseDirection.Forward)
assert 1 == len(references) assert 1 == len(references)
assert custom_otype.nodeid == references[0].NodeId assert custom_otype.nodeid == references[0].NodeId
def test_references_for_added_nodes(opc):
async def test_references_for_added_nodes(opc):
objects = opc.get_objects_node() objects = opc.get_objects_node()
o = objects.add_object(3, 'MyObject') o = await objects.add_object(3, 'MyObject')
nodes = objects.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Forward, nodes = await objects.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Forward, includesubtypes=False
)
assert o in nodes assert o in nodes
nodes = o.get_referenced_nodes(refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, nodes = await o.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.Organizes, direction=ua.BrowseDirection.Inverse, includesubtypes=False
)
assert objects in nodes assert objects in nodes
assert objects == o.get_parent() assert objects == await o.get_parent()
assert ua.ObjectIds.BaseObjectType == o.get_type_definition().Identifier assert ua.ObjectIds.BaseObjectType == (await o.get_type_definition()).Identifier
assert [] == o.get_references(ua.ObjectIds.HasModellingRule) assert [] == await o.get_references(ua.ObjectIds.HasModellingRule)
o2 = o.add_object(3, 'MySecondObject') o2 = await o.add_object(3, 'MySecondObject')
nodes = o.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Forward, nodes = await o.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Forward, includesubtypes=False
)
assert o2 in nodes assert o2 in nodes
nodes = o2.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Inverse, nodes = await o2.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Inverse, includesubtypes=False
)
assert o in nodes assert o in nodes
assert o == o2.get_parent() assert o == await o2.get_parent()
assert ua.ObjectIds.BaseObjectType == o2.get_type_definition().Identifier assert ua.ObjectIds.BaseObjectType == (await o2.get_type_definition()).Identifier
assert [] == o2.get_references(ua.ObjectIds.HasModellingRule) assert [] == await o2.get_references(ua.ObjectIds.HasModellingRule)
v = o.add_variable(3, 'MyVariable', 6) v = await o.add_variable(3, 'MyVariable', 6)
nodes = o.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Forward, nodes = await o.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Forward, includesubtypes=False
)
assert v in nodes assert v in nodes
nodes = v.get_referenced_nodes(refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Inverse, nodes = await v.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.HasComponent, direction=ua.BrowseDirection.Inverse, includesubtypes=False
)
assert o in nodes assert o in nodes
assert o == v.get_parent() assert o == await v.get_parent()
assert ua.ObjectIds.BaseDataVariableType == v.get_type_definition().Identifier assert ua.ObjectIds.BaseDataVariableType == (await v.get_type_definition()).Identifier
assert [] == v.get_references(ua.ObjectIds.HasModellingRule) assert [] == await v.get_references(ua.ObjectIds.HasModellingRule)
p = o.add_property(3, 'MyProperty', 2) p = await o.add_property(3, 'MyProperty', 2)
nodes = o.get_referenced_nodes(refs=ua.ObjectIds.HasProperty, direction=ua.BrowseDirection.Forward, nodes = await o.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.HasProperty, direction=ua.BrowseDirection.Forward, includesubtypes=False
)
assert p in nodes assert p in nodes
nodes = p.get_referenced_nodes(refs=ua.ObjectIds.HasProperty, direction=ua.BrowseDirection.Inverse, nodes = await p.get_referenced_nodes(
includesubtypes=False) refs=ua.ObjectIds.HasProperty, direction=ua.BrowseDirection.Inverse, includesubtypes=False
)
assert o in nodes assert o in nodes
assert 0 == p.get_parent() assert 0 == await p.get_parent()
assert ua.ObjectIds.PropertyType == p.get_type_definition().Identifier assert ua.ObjectIds.PropertyType == (await p.get_type_definition()).Identifier
assert [] == p.get_references(ua.ObjectIds.HasModellingRule) assert [] == await p.get_references(ua.ObjectIds.HasModellingRule)
m = await objects.get_child("2:ServerMethod")
assert [] == await m.get_references(ua.ObjectIds.HasModellingRule)
m = objects.get_child("2:ServerMethod")
assert [] == m.get_references(ua.ObjectIds.HasModellingRule)
def test_path_string(opc): async def test_path_string(opc):
o = opc.nodes.objects.add_folder(1, "titif").add_object(3, "opath") o = await (await opc.nodes.objects.add_folder(1, "titif")).add_object(3, "opath")
path = o.get_path(as_string=True) path = await o.get_path(as_string=True)
assert ["0:Root", "0:Objects", "1:titif", "3:opath"] == path assert ["0:Root", "0:Objects", "1:titif", "3:opath"] == path
path = o.get_path(2, as_string=True) path = await o.get_path(2, as_string=True)
assert ["1:titif", "3:opath"] == path assert ["1:titif", "3:opath"] == path
def test_path(opc):
of = opc.nodes.objects.add_folder(1, "titif") async def test_path(opc):
op = of.add_object(3, "opath") of = await opc.nodes.objects.add_folder(1, "titif")
path = op.get_path() op = await of.add_object(3, "opath")
path = await op.get_path()
assert [opc.nodes.root, opc.nodes.objects, of, op] == path assert [opc.nodes.root, opc.nodes.objects, of, op] == path
path = op.get_path(2) path = await op.get_path(2)
assert [of, op] == path assert [of, op] == path
target = opc.get_node("i=13387") target = opc.get_node("i=13387")
path = target.get_path() path = await target.get_path()
assert [opc.nodes.root, opc.nodes.types, opc.nodes.object_types, opc.nodes.base_object_type, assert [
opc.nodes.folder_type, opc.get_node(ua.ObjectIds.FileDirectoryType), target] == path opc.nodes.root, opc.nodes.types, opc.nodes.object_types, opc.nodes.base_object_type,
opc.nodes.folder_type, opc.get_node(ua.ObjectIds.FileDirectoryType), target
] == path
def test_get_endpoints(opc): async def test_get_endpoints(opc):
endpoints = opc.get_endpoints() endpoints = await opc.get_endpoints()
assert len(endpoints) > 0 assert len(endpoints) > 0
assert endpoints[0].EndpointUrl.startswith("opc.tcp://") assert endpoints[0].EndpointUrl.startswith("opc.tcp://")
def test_copy_node(opc):
dev_t = opc.nodes.base_data_type.add_object_type(0, "MyDevice")
v_t = dev_t.add_variable(0, "sensor", 1.0)
p_t = dev_t.add_property(0, "sensor_id", "0340")
ctrl_t = dev_t.add_object(0, "controller")
prop_t = ctrl_t.add_property(0, "state", "Running")
# Create device sutype
devd_t = dev_t.add_object_type(0, "MyDeviceDervived")
v_t = devd_t.add_variable(0, "childparam", 1.0)
p_t = devd_t.add_property(0, "sensorx_id", "0340")
nodes = copy_node(opc.nodes.objects, dev_t) async def test_copy_node(opc):
dev_t = await opc.nodes.base_data_type.add_object_type(0, "MyDevice")
v_t = await dev_t.add_variable(0, "sensor", 1.0)
p_t = await dev_t.add_property(0, "sensor_id", "0340")
ctrl_t = await dev_t.add_object(0, "controller")
prop_t = await ctrl_t.add_property(0, "state", "Running")
# Create device sutype
devd_t = await dev_t.add_object_type(0, "MyDeviceDervived")
v_t = await devd_t.add_variable(0, "childparam", 1.0)
p_t = await devd_t.add_property(0, "sensorx_id", "0340")
nodes = await copy_node(opc.nodes.objects, dev_t)
mydevice = nodes[0] mydevice = nodes[0]
assert ua.NodeClass.ObjectType == await mydevice.get_node_class()
assert ua.NodeClass.ObjectType == mydevice.get_node_class() assert 4 == len(await mydevice.get_children())
assert 4 == len(mydevice.get_children()) obj = await mydevice.get_child(["0:controller"])
obj = mydevice.get_child(["0:controller"]) prop = await mydevice.get_child(["0:controller", "0:state"])
prop = mydevice.get_child(["0:controller", "0:state"]) assert ua.ObjectIds.PropertyType == (await prop.get_type_definition()).Identifier
assert ua.ObjectIds.PropertyType == prop.get_type_definition().Identifier assert "Running" == await prop.get_value()
assert "Running" == prop.get_value()
assert prop.nodeid != prop_t.nodeid assert prop.nodeid != prop_t.nodeid
def test_instantiate_1(opc):
async def test_instantiate_1(opc):
# Create device type # Create device type
dev_t = opc.nodes.base_object_type.add_object_type(0, "MyDevice") dev_t = await opc.nodes.base_object_type.add_object_type(0, "MyDevice")
v_t = dev_t.add_variable(0, "sensor", 1.0) v_t = await dev_t.add_variable(0, "sensor", 1.0)
v_t.set_modelling_rule(True) await v_t.set_modelling_rule(True)
p_t = dev_t.add_property(0, "sensor_id", "0340") p_t = await dev_t.add_property(0, "sensor_id", "0340")
p_t.set_modelling_rule(True) await p_t.set_modelling_rule(True)
ctrl_t = dev_t.add_object(0, "controller") ctrl_t = await dev_t.add_object(0, "controller")
ctrl_t.set_modelling_rule(True) await ctrl_t.set_modelling_rule(True)
v_opt_t = dev_t.add_variable(0, "vendor", 1.0) v_opt_t = await dev_t.add_variable(0, "vendor", 1.0)
v_opt_t.set_modelling_rule(False) await v_opt_t.set_modelling_rule(False)
v_none_t = dev_t.add_variable(0, "model", 1.0) v_none_t = await dev_t.add_variable(0, "model", 1.0)
v_none_t.set_modelling_rule(None) await v_none_t.set_modelling_rule(None)
prop_t = ctrl_t.add_property(0, "state", "Running") prop_t = await ctrl_t.add_property(0, "state", "Running")
prop_t.set_modelling_rule(True) await prop_t.set_modelling_rule(True)
# Create device sutype # Create device sutype
devd_t = dev_t.add_object_type(0, "MyDeviceDervived") devd_t = await dev_t.add_object_type(0, "MyDeviceDervived")
v_t = devd_t.add_variable(0, "childparam", 1.0) v_t = await devd_t.add_variable(0, "childparam", 1.0)
v_t.set_modelling_rule(True) await v_t.set_modelling_rule(True)
p_t = devd_t.add_property(0, "sensorx_id", "0340") p_t = await devd_t.add_property(0, "sensorx_id", "0340")
p_t.set_modelling_rule(True) await p_t.set_modelling_rule(True)
# instanciate device # instanciate device
nodes = instantiate(opc.nodes.objects, dev_t, bname="2:Device0001") nodes = await instantiate(opc.nodes.objects, dev_t, bname="2:Device0001")
mydevice = nodes[0] mydevice = nodes[0]
assert ua.NodeClass.Object == mydevice.get_node_class() assert ua.NodeClass.Object == await mydevice.get_node_class()
assert dev_t.nodeid == mydevice.get_type_definition() assert dev_t.nodeid == await mydevice.get_type_definition()
obj = mydevice.get_child(["0:controller"]) obj = await mydevice.get_child(["0:controller"])
prop = mydevice.get_child(["0:controller", "0:state"]) prop = await mydevice.get_child(["0:controller", "0:state"])
with pytest.raises(ua.UaError): with pytest.raises(ua.UaError):
mydevice.get_child(["0:controller", "0:vendor"]) await mydevice.get_child(["0:controller", "0:vendor"])
with pytest.raises(ua.UaError): with pytest.raises(ua.UaError):
mydevice.get_child(["0:controller", "0:model"]) await mydevice.get_child(["0:controller", "0:model"])
assert ua.ObjectIds.PropertyType == prop.get_type_definition().Identifier assert ua.ObjectIds.PropertyType == (await prop.get_type_definition()).Identifier
assert "Running" == prop.get_value() assert "Running" == await prop.get_value()
assert prop.nodeid != prop_t.nodeid assert prop.nodeid != prop_t.nodeid
# instanciate device subtype # instanciate device subtype
nodes = instantiate(opc.nodes.objects, devd_t, bname="2:Device0002") nodes = await instantiate(opc.nodes.objects, devd_t, bname="2:Device0002")
mydevicederived = nodes[0] mydevicederived = nodes[0]
prop1 = mydevicederived.get_child(["0:sensorx_id"]) prop1 = await mydevicederived.get_child(["0:sensorx_id"])
var1 = mydevicederived.get_child(["0:childparam"]) var1 = await mydevicederived.get_child(["0:childparam"])
var_parent = mydevicederived.get_child(["0:sensor"]) var_parent = await mydevicederived.get_child(["0:sensor"])
prop_parent = mydevicederived.get_child(["0:sensor_id"]) prop_parent = await mydevicederived.get_child(["0:sensor_id"])
def test_instantiate_string_nodeid(opc):
async def test_instantiate_string_nodeid(opc):
# Create device type # Create device type
dev_t = opc.nodes.base_object_type.add_object_type(0, "MyDevice2") dev_t = await opc.nodes.base_object_type.add_object_type(0, "MyDevice2")
v_t = dev_t.add_variable(0, "sensor", 1.0) v_t = await dev_t.add_variable(0, "sensor", 1.0)
v_t.set_modelling_rule(True) await v_t.set_modelling_rule(True)
p_t = dev_t.add_property(0, "sensor_id", "0340") p_t = await dev_t.add_property(0, "sensor_id", "0340")
p_t.set_modelling_rule(True) await p_t.set_modelling_rule(True)
ctrl_t = dev_t.add_object(0, "controller") ctrl_t = await dev_t.add_object(0, "controller")
ctrl_t.set_modelling_rule(True) await ctrl_t.set_modelling_rule(True)
prop_t = ctrl_t.add_property(0, "state", "Running") prop_t = await ctrl_t.add_property(0, "state", "Running")
prop_t.set_modelling_rule(True) await prop_t.set_modelling_rule(True)
# instanciate device # instanciate device
nodes = instantiate(opc.nodes.objects, dev_t, nodeid=ua.NodeId("InstDevice", 2, ua.NodeIdType.String), nodes = await instantiate(opc.nodes.objects, dev_t, nodeid=ua.NodeId("InstDevice", 2, ua.NodeIdType.String),
bname="2:InstDevice") bname="2:InstDevice")
mydevice = nodes[0] mydevice = nodes[0]
assert ua.NodeClass.Object == mydevice.get_node_class() assert ua.NodeClass.Object == await mydevice.get_node_class()
assert dev_t.nodeid == mydevice.get_type_definition() assert dev_t.nodeid == await mydevice.get_type_definition()
obj = mydevice.get_child(["0:controller"]) obj = await mydevice.get_child(["0:controller"])
obj_nodeid_ident = obj.nodeid.Identifier obj_nodeid_ident = obj.nodeid.Identifier
prop = mydevice.get_child(["0:controller", "0:state"]) prop = await mydevice.get_child(["0:controller", "0:state"])
assert "InstDevice.controller" == obj_nodeid_ident assert "InstDevice.controller" == obj_nodeid_ident
assert ua.ObjectIds.PropertyType == prop.get_type_definition().Identifier assert ua.ObjectIds.PropertyType == (await prop.get_type_definition()).Identifier
assert "Running" == prop.get_value() assert "Running" == await prop.get_value()
assert prop.nodeid != prop_t.nodeid assert prop.nodeid != prop_t.nodeid
def test_variable_with_datatype(opc):
v1 = opc.nodes.objects.add_variable(3, 'VariableEnumType1', ua.ApplicationType.ClientAndServer, async def test_variable_with_datatype(opc):
datatype=ua.NodeId(ua.ObjectIds.ApplicationType)) v1 = await opc.nodes.objects.add_variable(
tp1 = v1.get_data_type() 3, 'VariableEnumType1', ua.ApplicationType.ClientAndServer, datatype=ua.NodeId(ua.ObjectIds.ApplicationType)
)
tp1 = await v1.get_data_type()
assert tp1 == ua.NodeId(ua.ObjectIds.ApplicationType) assert tp1 == ua.NodeId(ua.ObjectIds.ApplicationType)
v2 = opc.nodes.objects.add_variable(3, 'VariableEnumType2', ua.ApplicationType.ClientAndServer, v2 = await opc.nodes.objects.add_variable(
datatype=ua.NodeId(ua.ObjectIds.ApplicationType)) 3, 'VariableEnumType2', ua.ApplicationType.ClientAndServer, datatype=ua.NodeId(ua.ObjectIds.ApplicationType)
tp2 = v2.get_data_type() )
tp2 = await v2.get_data_type()
assert tp2 == ua.NodeId(ua.ObjectIds.ApplicationType) assert tp2 == ua.NodeId(ua.ObjectIds.ApplicationType)
def test_enum(opc):
async def test_enum(opc):
# create enum type # create enum type
enums = opc.get_root_node().get_child(["0:Types", "0:DataTypes", "0:BaseDataType", "0:Enumeration"]) enums = await opc.get_root_node().get_child(["0:Types", "0:DataTypes", "0:BaseDataType", "0:Enumeration"])
myenum_type = enums.add_data_type(0, "MyEnum") myenum_type = await enums.add_data_type(0, "MyEnum")
es = myenum_type.add_variable(0, "EnumStrings", es = await myenum_type.add_variable(
[ua.LocalizedText("String0"), ua.LocalizedText("String1"), 0, "EnumStrings", [ua.LocalizedText("String0"), ua.LocalizedText("String1"), ua.LocalizedText("String2")],
ua.LocalizedText("String2")], ua.VariantType.LocalizedText
ua.VariantType.LocalizedText) )
# es.set_value_rank(1) # es.set_value_rank(1)
# instantiate # instantiate
o = opc.get_objects_node() o = opc.get_objects_node()
myvar = o.add_variable(2, "MyEnumVar", ua.LocalizedText("String1"), datatype=myenum_type.nodeid) myvar = await o.add_variable(2, "MyEnumVar", ua.LocalizedText("String1"), datatype=myenum_type.nodeid)
# myvar.set_writable(True) # myvar.set_writable(True)
# tests # tests
assert myenum_type.nodeid == myvar.get_data_type() assert myenum_type.nodeid == await myvar.get_data_type()
myvar.set_value(ua.LocalizedText("String2")) await myvar.set_value(ua.LocalizedText("String2"))
def test_supertypes(opc):
async def test_supertypes(opc):
nint32 = opc.get_node(ua.ObjectIds.Int32) nint32 = opc.get_node(ua.ObjectIds.Int32)
node = ua_utils.get_node_supertype(nint32) node = await ua_utils.get_node_supertype(nint32)
assert opc.get_node(ua.ObjectIds.Integer) == node assert opc.get_node(ua.ObjectIds.Integer) == node
nodes = ua_utils.get_node_supertypes(nint32) nodes = await ua_utils.get_node_supertypes(nint32)
assert opc.get_node(ua.ObjectIds.Number) == nodes[1] assert opc.get_node(ua.ObjectIds.Number) == nodes[1]
assert opc.get_node(ua.ObjectIds.Integer) == nodes[0] assert opc.get_node(ua.ObjectIds.Integer) == nodes[0]
# test custom # test custom
dtype = nint32.add_data_type(0, "MyCustomDataType") dtype = await nint32.add_data_type(0, "MyCustomDataType")
node = ua_utils.get_node_supertype(dtype) node = await ua_utils.get_node_supertype(dtype)
assert nint32 == node assert nint32 == node
dtype2 = dtype.add_data_type(0, "MyCustomDataType2") dtype2 = await dtype.add_data_type(0, "MyCustomDataType2")
node = ua_utils.get_node_supertype(dtype2) node = await ua_utils.get_node_supertype(dtype2)
assert dtype == node assert dtype == node
def test_base_data_type(opc):
async def test_base_data_type(opc):
nint32 = opc.get_node(ua.ObjectIds.Int32) nint32 = opc.get_node(ua.ObjectIds.Int32)
dtype = nint32.add_data_type(0, "MyCustomDataType") dtype = await nint32.add_data_type(0, "MyCustomDataType")
dtype2 = dtype.add_data_type(0, "MyCustomDataType2") dtype2 = await dtype.add_data_type(0, "MyCustomDataType2")
assert nint32 == ua_utils.get_base_data_type(dtype) assert nint32 == await ua_utils.get_base_data_type(dtype)
assert nint32 == ua_utils.get_base_data_type(dtype2) assert nint32 == await ua_utils.get_base_data_type(dtype2)
ext = opc.nodes.objects.add_variable(0, "MyExtensionObject", ua.Argument()) ext = await opc.nodes.objects.add_variable(0, "MyExtensionObject", ua.Argument())
d = ext.get_data_type() d = await ext.get_data_type()
d = opc.get_node(d) d = opc.get_node(d)
assert opc.get_node(ua.ObjectIds.Structure) == ua_utils.get_base_data_type(d) assert opc.get_node(ua.ObjectIds.Structure) == await ua_utils.get_base_data_type(d)
assert ua.VariantType.ExtensionObject == ua_utils.data_type_to_variant_type(d) assert ua.VariantType.ExtensionObject == await ua_utils.data_type_to_variant_type(d)
def test_data_type_to_variant_type(opc): async def test_data_type_to_variant_type(opc):
test_data = { test_data = {
ua.ObjectIds.Boolean: ua.VariantType.Boolean, ua.ObjectIds.Boolean: ua.VariantType.Boolean,
ua.ObjectIds.Byte: ua.VariantType.Byte, ua.ObjectIds.Byte: ua.VariantType.Byte,
...@@ -888,4 +955,4 @@ def test_data_type_to_variant_type(opc): ...@@ -888,4 +955,4 @@ def test_data_type_to_variant_type(opc):
ua.ObjectIds.AxisScaleEnumeration: ua.VariantType.Int32 # enumeration ua.ObjectIds.AxisScaleEnumeration: ua.VariantType.Int32 # enumeration
} }
for dt, vdt in test_data.items(): for dt, vdt in test_data.items():
assert vdt == ua_utils.data_type_to_variant_type(opc.get_node(ua.NodeId(dt))) assert vdt == await ua_utils.data_type_to_variant_type(opc.get_node(ua.NodeId(dt)))
import unittest import pytest
from opcua import Client from opcua import Client
from opcua import Server from opcua import Server
...@@ -13,10 +13,110 @@ except ImportError: ...@@ -13,10 +13,110 @@ except ImportError:
else: else:
disable_crypto_tests = False disable_crypto_tests = False
pytestmark = pytest.mark.asyncio
port_num1 = 48515 port_num1 = 48515
port_num2 = 48512 port_num2 = 48512
uri_crypto = 'opc.tcp://127.0.0.1:{0:d}'.format(port_num1)
uri_no_crypto = 'opc.tcp://127.0.0.1:{0:d}'.format(port_num2)
@pytest.fixture()
async def srv_crypto():
# start our own server
srv = Server()
await srv.init()
srv.set_endpoint(uri_crypto)
await srv.load_certificate("examples/certificate-example.der")
await srv.load_private_key("examples/private-key-example.pem")
await srv.start()
yield srv
# stop the server
await srv.stop()
@pytest.fixture()
async def srv_no_crypto():
# start our own server
srv = Server()
await srv.init()
srv.set_endpoint(uri_no_crypto)
await srv.start()
yield srv
# stop the server
await srv.stop()
async def test_nocrypto(srv_no_crypto):
clt = Client(uri_no_crypto)
async with clt:
await clt.get_objects_node().get_children()
async def test_nocrypto_fail():
clt = Client(uri_no_crypto)
with pytest.raises(ua.UaError):
await clt.set_security_string("Basic256,Sign,examples/certificate-example.der,examples/private-key-example.pem")
async def test_basic256(srv_crypto):
clt = Client(uri_crypto)
await clt.set_security_string("Basic256,Sign,examples/certificate-example.der,examples/private-key-example.pem")
async with clt:
assert await clt.get_objects_node().get_children()
async def test_basic256_encrypt():
clt = Client(uri_crypto)
await clt.set_security_string(
"Basic256,SignAndEncrypt,examples/certificate-example.der,examples/private-key-example.pem")
async with clt:
assert await clt.get_objects_node().get_children()
async def test_basic128Rsa15():
clt = Client(uri_crypto)
await clt.set_security_string("Basic128Rsa15,Sign,examples/certificate-example.der,examples/private-key-example.pem")
async with clt:
assert await clt.get_objects_node().get_children()
async def test_basic128Rsa15_encrypt():
clt = Client(uri_crypto)
await clt.set_security_string(
"Basic128Rsa15,SignAndEncrypt,examples/certificate-example.der,examples/private-key-example.pem"
)
async with clt:
assert await clt.get_objects_node().get_children()
async def test_basic256_encrypt_success():
clt = Client(uri_crypto)
await clt.set_security(
security_policies.SecurityPolicyBasic256,
'examples/certificate-example.der',
'examples/private-key-example.pem',
None,
ua.MessageSecurityMode.SignAndEncrypt
)
async with clt:
assert await clt.get_objects_node().get_children()
async def test_basic256_encrypt_feil():
# FIXME: how to make it feil???
clt = Client(uri_crypto)
with pytest.raises(ua.UaError):
await clt.set_security(
security_policies.SecurityPolicyBasic256,
'examples/certificate-example.der',
'examples/private-key-example.pem',
None,
ua.MessageSecurityMode.None_
)
"""
@unittest.skipIf(disable_crypto_tests, "crypto not available") @unittest.skipIf(disable_crypto_tests, "crypto not available")
class TestCryptoConnect(unittest.TestCase): class TestCryptoConnect(unittest.TestCase):
...@@ -48,27 +148,6 @@ class TestCryptoConnect(unittest.TestCase): ...@@ -48,27 +148,6 @@ class TestCryptoConnect(unittest.TestCase):
cls.srv_no_crypto.stop() cls.srv_no_crypto.stop()
cls.srv_crypto.stop() cls.srv_crypto.stop()
def test_nocrypto(self):
clt = Client(self.uri_no_crypto)
clt.connect()
try:
clt.get_objects_node().get_children()
finally:
clt.disconnect()
def test_nocrypto_feil(self):
clt = Client(self.uri_no_crypto)
with self.assertRaises(ua.UaError):
clt.set_security_string("Basic256,Sign,examples/certificate-example.der,examples/private-key-example.pem")
def test_basic256(self):
clt = Client(self.uri_crypto)
try:
clt.set_security_string("Basic256,Sign,examples/certificate-example.der,examples/private-key-example.pem")
clt.connect()
self.assertTrue(clt.get_objects_node().get_children())
finally:
clt.disconnect()
def test_basic256_encrypt(self): def test_basic256_encrypt(self):
clt = Client(self.uri_crypto) clt = Client(self.uri_crypto)
...@@ -121,3 +200,4 @@ class TestCryptoConnect(unittest.TestCase): ...@@ -121,3 +200,4 @@ class TestCryptoConnect(unittest.TestCase):
None, None,
ua.MessageSecurityMode.None_ ua.MessageSecurityMode.None_
) )
"""
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment