Commit 4c314659 authored by oroulet's avatar oroulet Committed by oroulet

remove use of loop paramter in Client as of current asynio best practices

parent 623fa1d3
......@@ -29,7 +29,7 @@ class Client:
use UaClient object, available as self.uaclient
which offers the raw OPC-UA services interface.
"""
def __init__(self, url: str, timeout: int = 4, loop=None):
def __init__(self, url: str, timeout: int = 4):
"""
:param url: url of the server.
if you are unsure of url, write at least hostname
......@@ -41,7 +41,6 @@ class Client:
attributes on the constructed object:
See the source code for the exhaustive list.
"""
self.loop = loop or asyncio.get_event_loop()
self.server_url = urlparse(url)
# take initial username and password from the url
self._username = self.server_url.username
......@@ -55,7 +54,7 @@ class Client:
self.secure_channel_timeout = 3600000 # 1 hour
self.session_timeout = 3600000 # 1 hour
self._policy_ids = []
self.uaclient: UaClient = UaClient(timeout, loop=self.loop)
self.uaclient: UaClient = UaClient(timeout)
self.user_certificate = None
self.user_private_key = None
self._server_nonce = None
......@@ -381,7 +380,7 @@ class Client:
if self.session_timeout != response.RevisedSessionTimeout:
_logger.warning("Requested session timeout to be %dms, got %dms instead", self.secure_channel_timeout, response.RevisedSessionTimeout)
self.session_timeout = response.RevisedSessionTimeout
self._renew_channel_task = self.loop.create_task(self._renew_channel_loop())
self._renew_channel_task = asyncio.create_task(self._renew_channel_loop())
return response
async def _renew_channel_loop(self):
......
......@@ -20,14 +20,12 @@ class UASocketProtocol(asyncio.Protocol):
OPEN = 'open'
CLOSED = 'closed'
def __init__(self, timeout=1, security_policy=ua.SecurityPolicy(), loop=None):
def __init__(self, timeout=1, security_policy=ua.SecurityPolicy()):
"""
:param timeout: Timeout in seconds
:param security_policy: Security policy (optional)
:param loop: Event loop (optional)
"""
self.logger = logging.getLogger(f"{__name__}.UASocketProtocol")
self.loop = loop or asyncio.get_event_loop()
self.transport = None
self.receive_buffer: Optional[bytes] = None
self.is_receiving = False
......@@ -125,7 +123,7 @@ class UASocketProtocol(asyncio.Protocol):
self._request_handle -= 1
raise
self._request_id += 1
future = self.loop.create_future()
future = asyncio.get_running_loop().create_future()
self._callbackmap[self._request_id] = future
# Change to the new security token if the connection has been renewed.
......@@ -243,13 +241,11 @@ class UaClient:
In this Python implementation most of the structures are defined in
uaprotocol_auto.py and uaprotocol_hand.py available under asyncua.ua
"""
def __init__(self, timeout=1, loop=None):
def __init__(self, timeout=1):
"""
:param timeout: Timout in seconds
:param loop: Event loop (optional)
"""
self.logger = logging.getLogger(f'{__name__}.UaClient')
self.loop = loop or asyncio.get_event_loop()
self._subscription_callbacks = {}
self._timeout = timeout
self.security_policy = ua.SecurityPolicy()
......@@ -260,14 +256,14 @@ class UaClient:
self.security_policy = policy
def _make_protocol(self):
self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy, loop=self.loop)
self.protocol = UASocketProtocol(self._timeout, security_policy=self.security_policy)
return self.protocol
async def connect_socket(self, host: str, port: int):
"""Connect to server socket."""
self.logger.info("opening connection")
# Timeout the connection when the server isn't available
await asyncio.wait_for(self.loop.create_connection(self._make_protocol, host, port), self._timeout)
await asyncio.wait_for(asyncio.get_running_loop().create_connection(self._make_protocol, host, port), self._timeout)
def disconnect_socket(self):
if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
......@@ -458,7 +454,7 @@ class UaClient:
# The current strategy is to have only one open publish request per UaClient. This might not be enough
# in high latency networks or in case many subscriptions are created. A Set of Tasks of `_publish_loop`
# could be used if necessary.
self._publish_task = self.loop.create_task(self._publish_loop())
self._publish_task = asyncio.create_task(self._publish_loop())
return response.Parameters
async def delete_subscriptions(self, subscription_ids):
......
......@@ -3,6 +3,7 @@ from a list of nodes in the address space, build an XML file
format is the one from opc-ua specification
"""
import logging
import asyncio
import functools
from collections import OrderedDict
import xml.etree.ElementTree as Et
......@@ -123,7 +124,7 @@ class XmlExporter:
if pretty:
indent(self.etree.getroot())
func = functools.partial(self.etree.write, xmlpath, encoding='utf-8', xml_declaration=True)
await self.server.loop.run_in_executor(None, func)
await asyncio.get_running_loop().run_in_executor(None, func)
def dump_etree(self):
"""
......
......@@ -123,7 +123,7 @@ class XMLParser:
if xmlstring:
self.root = ET.fromstring(xmlstring)
else:
tree = await asyncio.get_event_loop().run_in_executor(None, ET.parse, xmlpath)
tree = await asyncio.get_running_loop().run_in_executor(None, ET.parse, xmlpath)
self.root = tree.getroot()
def parse_sync(self, xmlpath=None, xmlstring=None):
......
......@@ -161,7 +161,7 @@ class Client:
self.tloop = ThreadLoop()
self.tloop.start()
self.close_tloop = True
self.aio_obj = client.Client(url, timeout, loop=self.tloop.loop)
self.aio_obj = client.Client(url, timeout)
self.nodes = Shortcuts(self.tloop, self.aio_obj.uaclient)
def __str__(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment