Commit 776c55e3 authored by Yuta Okamoto's avatar Yuta Okamoto Committed by oroulet

add translate_browsepaths() to Client

parent 8ce692c0
......@@ -911,3 +911,17 @@ class Client:
parameters.NodesToBrowse = nodestobrowse
results = await self.uaclient.browse(parameters)
return list(zip(nodes, results))
async def translate_browsepaths(self, starting_node: ua.NodeId, relative_paths: List[Union[ua.RelativePath, str]]) -> List[ua.BrowsePathResult]:
bpaths = []
for p in relative_paths:
try:
rpath = ua.RelativePath.from_string(p) if isinstance(p, str) else p
except ValueError as e:
raise ua.UaStringParsingError(f"Failed to parse one of RelativePath: {p}") from e
bpath = ua.BrowsePath()
bpath.StartingNode = starting_node
bpath.RelativePath = rpath
bpaths.append(bpath)
return await self.uaclient.translate_browsepaths_to_nodeids(bpaths)
......@@ -315,6 +315,10 @@ class Client:
def write_values(self, nodes, values, raise_on_partial_error=True):
pass
@syncmethod
def translate_browsepaths(self, starting_node: ua.NodeId, relative_paths: List[Union[ua.RelativePath, str]]) -> List[ua.BrowsePathResult]:
pass
def __enter__(self):
try:
self.connect()
......
......@@ -158,3 +158,27 @@ async def test_browse_nodes(server, client):
assert isinstance(results[1][0], Node)
assert isinstance(results[0][1], ua.BrowseResult)
assert isinstance(results[1][1], ua.BrowseResult)
async def test_translate_browsepaths(server, client: Client):
server_node = await client.nodes.objects.get_child("Server")
relative_paths = ["/0:ServiceLevel", "/0:ServerStatus/0:State"]
results = await client.translate_browsepaths(server_node.nodeid, relative_paths)
assert len(results) == 2
assert isinstance(results, list)
assert results[0].StatusCode.value == ua.StatusCodes.Good
assert results[0].Targets[0].TargetId == ua.NodeId.from_string("ns=0;i=2267")
assert results[1].StatusCode.value == ua.StatusCodes.Good
assert results[1].Targets[0].TargetId == ua.NodeId.from_string("ns=0;i=2259")
for result in results:
assert isinstance(result, ua.BrowsePathResult)
results2 = await client.translate_browsepaths(server_node.nodeid, ["/0:UnknownPath"])
assert len(results2) == 1
assert isinstance(results2, list)
assert results2[0].StatusCode.value == ua.StatusCodes.BadNoMatch
assert len(results2[0].Targets) == 0
with pytest.raises(ua.UaStringParsingError):
await client.translate_browsepaths(server_node.nodeid, ["/1:<Boiler"])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment