Commit 3cc1cc37 authored by oroulet's avatar oroulet Committed by oroulet

run method calls in their own ThreadPool so users cannot block the entire OPCUA server

parent e09c1601
import asyncio
import pickle
import shelve
import logging
import collections
from asyncio import iscoroutinefunction
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from asyncua import ua
from .users import User
......@@ -451,11 +453,16 @@ class MethodService:
def __init__(self, aspace: "AddressSpace"):
self.logger = logging.getLogger(__name__)
self._aspace: "AddressSpace" = aspace
self._pool = ThreadPoolExecutor()
def stop(self):
self._pool.shutdown()
async def call(self, methods):
results = []
for method in methods:
results.append(await self._call(method))
res = await self._call(method)
results.append(res)
return results
async def _call(self, method):
......@@ -469,10 +476,11 @@ class MethodService:
res.StatusCode = ua.StatusCode(ua.StatusCodes.BadNothingToDo)
else:
try:
if iscoroutinefunction(node.call):
result = await node.call(method.ObjectId, *method.InputArguments)
else:
result = node.call(method.ObjectId, *method.InputArguments)
result = await self._run_method(node.call, method.ObjectId, *method.InputArguments)
except Exception:
self.logger.exception("Error executing method call %s, an exception was raised: ", method)
res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
else:
if isinstance(result, ua.CallMethodResult):
res = result
elif isinstance(result, ua.StatusCode):
......@@ -481,9 +489,13 @@ class MethodService:
res.OutputArguments = result
while len(res.InputArgumentResults) < len(method.InputArguments):
res.InputArgumentResults.append(ua.StatusCode())
except Exception:
self.logger.exception("Error executing method call %s, an exception was raised: ", method)
res.StatusCode = ua.StatusCode(ua.StatusCodes.BadUnexpectedError)
return res
async def _run_method(self, func, parent, *args):
if asyncio.iscoroutinefunction(func):
return await func(parent, *args)
p = partial(func, parent, *args)
res = await asyncio.get_event_loop().run_in_executor(self._pool, p)
return res
......
......@@ -167,6 +167,7 @@ class InternalServer:
async def stop(self):
self.logger.info('stopping internal server')
self.method_service.stop()
await self.isession.close_session()
await self.history_manager.stop()
......
......@@ -121,6 +121,10 @@ class Server:
async def __aexit__(self, exc_type, exc_value, traceback):
await self.stop()
def __str__(self):
return f"OPC UA Server({self.endpoint.geturl()})"
__repr__ = __str__
async def load_certificate(self, path: str):
"""
load server certificate from file, either pem or der
......
......@@ -11,32 +11,26 @@ _logger = logging.getLogger('asyncua')
async def main():
url = 'opc.tcp://localhost:4840/freeopcua/server/'
# url = 'opc.tcp://commsvr.com:51234/UA/CAS_UA_Server'
try:
async with Client(url=url) as client:
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root = client.get_root_node()
_logger.info('Objects node is: %r', root)
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger.info('Children of root are: %r', await root.get_children())
uri = 'http://examples.freeopcua.github.io'
idx = await client.get_namespace_index(uri)
# get a specific node knowing its node id
# var = client.get_node(ua.NodeId(1002, 2))
# var = client.get_node("ns=3;i=2002")
var = await root.get_child(["0:Objects", f"{idx}:MyObject", f"{idx}:MyVariable"])
print("My variable", var, await var.get_value())
# print(var)
# var.get_data_value() # get value of node as a DataValue object
# var.get_value() # get value of node as a python builtin
# var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
# var.set_value(3.9) # set node value using implicit data type
# Now getting a variable node using its browse path
except Exception:
_logger.exception('error')
async with Client(url=url) as client:
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
root = client.get_root_node()
_logger.info('Objects node is: %r', root)
# Node objects have methods to read and write node attributes as well as browse or populate address space
_logger.info('Children of root are: %r', await root.get_children())
uri = 'http://examples.freeopcua.github.io'
idx = await client.get_namespace_index(uri)
# get a specific node knowing its node id
# var = client.get_node(ua.NodeId(1002, 2))
# var = client.get_node("ns=3;i=2002")
var = await root.get_child(["0:Objects", f"{idx}:MyObject", f"{idx}:MyVariable"])
print("My variable", var, await var.get_value())
# print(var)
# var.get_data_value() # get value of node as a DataValue object
# var.get_value() # get value of node as a python builtin
# var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
# var.set_value(3.9) # set node value using implicit data type
if __name__ == '__main__':
loop = asyncio.get_event_loop()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment