Commit 726bab3a authored by oroulet's avatar oroulet

try to fix loop stuff in Haclient.. probably broken

parent 6118b2e4
......@@ -108,7 +108,7 @@ class HaClient:
HEALTHY_STATE = ConnectionStates.HEALTHY
def __init__(
self, config: HaConfig, security: Optional[HaSecurityConfig] = None, loop=None
self, config: HaConfig, security: Optional[HaSecurityConfig] = None
) -> None:
self._config: HaConfig = config
self._keepalive_task: Dict[KeepAlive, asyncio.Task] = {}
......@@ -116,12 +116,11 @@ class HaClient:
self._reconciliator_task: Dict[Reconciliator, asyncio.Task] = {}
self._gen_sub: Generator[str, None, None] = self.generate_sub_name()
self.loop: asyncio.unix_events._UnixSelectorEventLoop = (
loop or asyncio.get_event_loop()
)
self._url_to_reset_lock = asyncio.Lock(loop=self.loop)
self._ideal_map_lock: asyncio.Lock = asyncio.Lock(loop=self.loop)
self._client_lock: asyncio.Lock = asyncio.Lock(loop=self.loop)
# The locks must be created in async method!
# caling get_running_loop just to make sure we crash if not called in async method
self._url_to_reset_lock = asyncio.Lock(loop=asyncio.get_running_loop())
self._ideal_map_lock: asyncio.Lock = asyncio.Lock()
self._client_lock: asyncio.Lock = asyncio.Lock()
self.clients: Dict[Client, ServerInfo] = {}
self.active_client: Optional[Client] = None
......@@ -140,7 +139,7 @@ class HaClient:
)
for url in self.urls:
c = Client(url, timeout=self._config.request_timeout, loop=self.loop)
c = Client(url, timeout=self._config.request_timeout)
# timeouts for the session and secure channel are in ms
c.session_timeout = self._config.session_timeout * 1000
c.secure_channel_timeout = self._config.secure_channel_timeout * 1000
......@@ -159,13 +158,13 @@ class HaClient:
async def start(self) -> None:
for client, server in self.clients.items():
keepalive = KeepAlive(client, server, self._config.keepalive_timer)
task = self.loop.create_task(keepalive.run())
task = asyncio.create_task(keepalive.run())
self._keepalive_task[keepalive] = task
task = self.loop.create_task(self.manager.run())
task = asyncio.create_task(self.manager.run())
self._manager_task[self.manager] = task
task = self.loop.create_task(self.reconciliator.run())
task = asyncio.create_task(self.reconciliator.run())
self._reconciliator_task[self.reconciliator] = task
self.is_running = True
......@@ -485,9 +484,8 @@ class HaManager:
def __init__(self, ha_client: HaClient, timer: Optional[int] = None) -> None:
self.ha_client = ha_client
self.loop = ha_client.loop
self.timer = self.set_loop_timer(timer)
self.stop_event = asyncio.Event(loop=self.loop)
self.stop_event = asyncio.Event()
self.is_running = False
def set_loop_timer(self, timer: Optional[int]):
......@@ -556,11 +554,11 @@ class HaManager:
tasks = []
for client in healthy:
task = self.loop.create_task(reco_resub(client, force=False))
task = asyncio.create_task(reco_resub(client, force=False))
task.add_done_callback(partial(log_exception, client))
tasks.append(task)
for client in unhealthy:
task = self.loop.create_task(reco_resub(client, force=True))
task = asyncio.create_task(reco_resub(client, force=True))
task.add_done_callback(partial(log_exception, client))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
......@@ -53,9 +53,8 @@ class Reconciliator:
def __init__(self, timer: int, ha_client: "HaClient") -> None:
self.timer = timer
self.ha_client = ha_client
self.loop = ha_client.loop
self.is_running = False
self.stop_event = asyncio.Event(loop=self.loop)
self.stop_event = asyncio.Event(loop=asyncio.get_running_loop())
self.real_map: Dict[str, SortedDict] = {}
for url in self.ha_client.urls:
......@@ -185,7 +184,7 @@ class Reconciliator:
_logger.info(f"Removing {len(sub_to_del)} subscriptions")
for sub_name in sub_to_del:
sub_handle = self.name_to_subscription[url][sub_name]
task = self.loop.create_task(sub_handle.delete())
task = asyncio.create_task(sub_handle.delete())
task.add_done_callback(
partial(self.del_from_map, url, Method.DEL_SUB, sub_name=sub_name)
)
......@@ -202,7 +201,7 @@ class Reconciliator:
client = self.ha_client.get_client_by_url(url)
for sub_name in sub_to_add:
vs = ideal_map[url][sub_name]
task = self.loop.create_task(
task = asyncio.create_task(
client.create_subscription(
vs.period, vs.handler, publishing=vs.publishing
)
......@@ -269,7 +268,7 @@ class Reconciliator:
for node_attr, nodes_obj in attr_to_nodes.items():
# some servers are sensitive to the number of MI per request
for batch_nodes_obj in batch(nodes_obj, self.BATCH_MI_SIZE):
task = self.loop.create_task(
task = asyncio.create_task(
real_sub.subscribe_data_change(
batch_nodes_obj,
*astuple(node_attr),
......@@ -308,7 +307,7 @@ class Reconciliator:
_logger.info(f"Removing {len(node_to_del)} Nodes")
for batch_nodes in batch(node_to_del, self.BATCH_MI_SIZE):
node_handles = [self.node_to_handle[url][node] for node in batch_nodes]
task = self.loop.create_task(real_sub.unsubscribe(node_handles))
task = asyncio.create_task(real_sub.unsubscribe(node_handles))
task.add_done_callback(
partial(
self.del_from_map,
......@@ -349,7 +348,7 @@ class Reconciliator:
if ideal_val != real_val:
_logger.info(f"Changing {attr} for {sub_name} to {ideal_val}")
set_func = getattr(real_sub, func)
task = self.loop.create_task(set_func(ideal_val))
task = asyncio.create_task(set_func(ideal_val))
task.add_done_callback(
partial(
self.change_mode,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment