Commit 56cf68df authored by oroulet's avatar oroulet Committed by oroulet

make sure the correct event loop is used in the entire application

parent ef9e89f1
......@@ -11,7 +11,6 @@ from ..crypto import uacrypto, security_policies
__all__ = ["Client"]
_logger = logging.getLogger(__name__)
asyncio.get_event_loop().set_debug(True)
class Client:
......
......@@ -639,7 +639,7 @@ class Node:
results = await self.server.add_references(params)
_check_results(results, len(params))
async def set_modelling_rule(self, mandatory):
async def set_modelling_rule(self, mandatory: bool):
"""
Add a modelling rule reference to Node.
When creating a new object type, its variable and child nodes will not
......
......@@ -80,7 +80,6 @@ class Subscription:
"""
def __init__(self, server, params, handler):
self.loop = asyncio.get_event_loop()
self.logger = logging.getLogger(__name__)
self.server = server
self._client_handle = 200
......
......@@ -126,7 +126,7 @@ class XmlExporter:
if pretty:
indent(self.etree.getroot())
func = functools.partial(self.etree.write, xmlpath, encoding='utf-8', xml_declaration=True)
await asyncio.get_event_loop().run_in_executor(None, func)
await self.server.loop.run_in_executor(None, func)
def dump_etree(self):
"""
......
......@@ -50,7 +50,7 @@ class XmlImporter:
import xml and return added nodes
"""
self.logger.info("Importing XML file %s", xmlpath)
self.parser = XMLParser()
self.parser = XMLParser(loop=self.server.loop)
await self.parser.parse(xmlpath, xmlstring)
self.namespaces = await self._map_namespaces(self.parser.get_used_namespaces())
self.aliases = self._map_aliases(self.parser.get_aliases())
......
......@@ -94,7 +94,8 @@ class ExtObj:
class XMLParser:
def __init__(self):
def __init__(self, loop=None):
self.loop = None or asyncio.get_event_loop()
self.logger = logging.getLogger(__name__)
self._retag = re.compile(r"(\{.*\})(.*)")
self.root = None
......
......@@ -17,8 +17,7 @@ class OPCUAProtocol(asyncio.Protocol):
Instantiated for every connection.
"""
def __init__(self, iserver=None, policies=None, clients=None):
self.loop = asyncio.get_event_loop()
def __init__(self, iserver, policies, clients):
self.peer_name = None
self.transport = None
self.processor = None
......@@ -42,13 +41,13 @@ class OPCUAProtocol(asyncio.Protocol):
self.processor.set_policies(self.policies)
self.iserver.asyncio_transports.append(transport)
self.clients.append(self)
self._task = self.loop.create_task(self._process_received_message_loop())
self._task = self.iserver.loop.create_task(self._process_received_message_loop())
def connection_lost(self, ex):
logger.info('Lost connection from %s, %s', self.peer_name, ex)
self.transport.close()
self.iserver.asyncio_transports.remove(self.transport)
self.loop.create_task(self.processor.close())
self.iserver.loop.create_task(self.processor.close())
if self in self.clients:
self.clients.remove(self)
self.messages.put_nowait((None, None))
......@@ -105,7 +104,6 @@ class BinaryServer:
self.hostname = hostname
self.port = port
self.iserver = internal_server
self.loop = asyncio.get_event_loop()
self._server = None
self._policies = []
self.clients = []
......@@ -118,7 +116,7 @@ class BinaryServer:
return OPCUAProtocol(iserver=self.iserver, policies=self._policies, clients=self.clients)
async def start(self):
self._server = await self.loop.create_server(self._make_protocol, self.hostname, self.port)
self._server = await self.iserver.loop.create_server(self._make_protocol, self.hostname, self.port)
# get the port and the hostname from the created server socket
# only relevant for dynamic port asignment (when self.port == 0)
if self.port == 0 and len(self._server.sockets) == 1:
......@@ -134,5 +132,5 @@ class BinaryServer:
for transport in self.iserver.asyncio_transports:
transport.close()
if self._server:
self.loop.call_soon(self._server.close)
self.iserver.loop.call_soon(self._server.close)
await self._server.wait_closed()
......@@ -185,9 +185,9 @@ class HistoryDict(HistoryStorageInterface):
class SubHandler:
def __init__(self, storage):
def __init__(self, storage, loop):
self.storage = storage
self.loop = asyncio.get_event_loop()
self.loop = loop
def datachange_notification(self, node, val, data):
self.loop.create_task(
......@@ -234,7 +234,7 @@ class HistoryManager:
Subscribe to the nodes' data changes and store the data in the active storage.
"""
if not self._sub:
self._sub = await self._create_subscription(SubHandler(self.storage))
self._sub = await self._create_subscription(SubHandler(self.storage, self.iserver.loop))
if node in self._handlers:
raise ua.UaError("Node {0} is already historized".format(node))
await self.storage.new_historized_node(node.nodeid, period, count)
......@@ -255,7 +255,7 @@ class HistoryManager:
must be deleted manually so that a new table with the custom event fields can be created.
"""
if not self._sub:
self._sub = await self._create_subscription(SubHandler(self.storage))
self._sub = await self._create_subscription(SubHandler(self.storage, self.iserver.loop))
if source in self._handlers:
raise ua.UaError("Events from {0} are already historized".format(source))
......
......@@ -22,7 +22,7 @@ class HistorySQLite(HistoryStorageInterface):
note that PARSE_DECLTYPES is active so certain data types (such as datetime) will not be BLOBs
"""
def __init__(self, path="history.db"):
def __init__(self, path="history.db", loop=None):
self.logger = logging.getLogger(__name__)
self._datachanges_period = {}
self._db_file = path
......@@ -30,7 +30,7 @@ class HistorySQLite(HistoryStorageInterface):
self._event_fields = {}
self._conn: sqlite3.Connection = None
self._cur: sqlite3.Cursor = None
self._loop = get_event_loop()
self._loop = loop or get_event_loop()
async def init(self):
self._conn = sqlite3.connect(self._db_file, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
......
......@@ -44,7 +44,8 @@ class ServerDesc:
class InternalServer:
def __init__(self):
def __init__(self, loop):
self.loop = loop
self.logger = logging.getLogger(__name__)
self.server_callback_dispatcher = CallbackDispatcher()
self.endpoints = []
......@@ -59,7 +60,6 @@ class InternalServer:
self.view_service = ViewService(self.aspace)
self.method_service = MethodService(self.aspace)
self.node_mgt_service = NodeManagementService(self.aspace)
self.loop = asyncio.get_event_loop()
self.asyncio_transports = []
self.subscription_service: SubscriptionService = SubscriptionService(self.loop, self.aspace)
self.history_manager = HistoryManager(self)
......
......@@ -4,6 +4,7 @@ server side implementation of a subscription object
import logging
import asyncio
import inspect
from asyncua import ua
......@@ -280,6 +281,9 @@ class InternalSubscription:
self.publish_results()
except asyncio.CancelledError:
pass
except Exception:
# seems this except is necessary to print errors
self.logger.exception("Exception in subscription loop")
def has_published_results(self):
if self._startup or self._triggered_datachanges or self._triggered_events:
......@@ -303,8 +307,10 @@ class InternalSubscription:
self._publish_cycles_count += 1
result = self._pop_publish_result()
if result is not None:
self.subservice.loop.create_task(self.callback(result))
#await self.callback(result)
if inspect.iscoroutinefunction(self.callback):
self.subservice.loop.create_task(self.callback(result))
else:
self.callback(result)
def _pop_publish_result(self):
result = ua.PublishResult()
......
......@@ -71,7 +71,7 @@ class Server:
self.manufacturer_name = "FreeOpcUa"
self.application_type = ua.ApplicationType.ClientAndServer
self.default_timeout = 60 * 60 * 1000
self.iserver = iserver if iserver else InternalServer()
self.iserver = iserver if iserver else InternalServer(self.loop)
self.bserver: BinaryServer = None
self._discovery_clients = {}
self._discovery_period = 60
......
......@@ -159,8 +159,9 @@ class Server:
def get_node(self, nodeid):
return Node(server.Server.get_node(self, nodeid))
@syncmethod
def import_xml(self, path=None, xmlstring=None):
return self.aio_obj.import_xml(path=None, xmlstring=None)
pass
def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
return self.aio_obj.set_attribute_value(nodeid, datavalue, attr)
......
......@@ -124,9 +124,10 @@ async def main():
await myarrayvar.set_value(var)
await mydevice_var.set_value("Running")
myevgen.trigger(message="This is BaseEvent")
server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9)) # Server side write method which is a but faster than using set_value
server.set_attribute_value(myvar.nodeid, ua.DataValue(0.9)) # Server side write method which is a but faster than using set_value
while True:
await asyncio.sleep(1)
await asyncio.sleep(0.1)
server.set_attribute_value(myvar.nodeid, ua.DataValue(sin(time.time())))
finally:
......
......@@ -37,10 +37,9 @@ async def main():
async with server:
count = 0
while True:
await asyncio.sleep(1000)
print("UPDATE")
await asyncio.sleep(1)
count += 0.1
print("SET VALUE", myvar, count)
await myvar.set_value(count)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment