Commit f874f7db authored by Julien PRIGENT's avatar Julien PRIGENT Committed by GitHub

[HaClient] Asyncua ha client (#367)

* [HaClient] Asyncua client wrapper

It exposes an interface similar to Client. You'll find some examples
and tests to guide you through this new feature.

Under the hood, it starts a keepalive task to monitor the status and
service level for each server.

It also includes an HaManager task which promotes a primary and
reconnect unhealthy clients. Unhealthy clients are detected based
on the socket status and keepalive data collected.

The requests received by users as well as the HaManager decisions
produce an ideal state called the "ideal_map". Regularly, the reconciliator task
kicks in, lock this configuration, and applies the "ideal_map" configuration.
It then stores the actual subscriptions status into the "real_map" using the
lower level components of the library.

Current limits:

- We only support HA WARM mode and datachange notifications (no event/status_change).
_ We support multiple subscriptions, but a node should only be subscribed once.

Component details:

- HaClient:
    - Configuration based on dataclasses: HaConfig (required) and HaConfigSecurity (optional)
    - Responsible to start up sides tasks: keepalive, hamanager, reconciliator
    - Mutates the ideal_map via VirtualSubscription
    - Generic hooks (i.e: to monitor subscription performance)

- KeepAlive (task):
    - Regularly hits the server to check its service_level / status.

- HaManager (task):
    - Promote primary / secondaries
    - Reconnect disconnected/unhealthy client based on the keepalive
    feedback.

- Reconciliator (task):
    - Applies the ideal configuration
    - Perform checks on the call responses
    - Mutates the real_map

- VirtualSubscription:
    - Key component of the ideal and real maps.
    - Exposes an interface similar to the subscription but only
    store the settings.

* [HaClient] fix test for py3.8/py3.9

CancelledError and TimeoutError have moved from concurrent.futures to
asyncio. Note that CancelledError now inherits from BaseException and
not Exception anymore. See https://bugs.python.org/issue32528 for the
details.
Finally, fix pytest_yield_fixture deprecation

* [HaClient] address the comments (utils + refacto reconciliator)
Co-authored-by: default avataroroulet <oroulet@users.noreply.github.com>
parent cc379565
import asyncio
import hashlib
import logging
import pickle
from itertools import chain, islice
_logger = logging.getLogger(__name__)
class ClientNotFound(Exception):
pass
async def event_wait(evt, timeout) -> bool:
try:
await asyncio.wait_for(evt.wait(), timeout)
except asyncio.TimeoutError:
pass
return evt.is_set()
def get_digest(conf) -> str:
return hashlib.md5(pickle.dumps(conf)).hexdigest()
def batch(iterable, size):
iterator = iter(iterable)
while True:
try:
batchiter = islice(iterator, size)
yield list(chain([next(batchiter)], batchiter))
except StopIteration:
break
This diff is collapsed.
This diff is collapsed.
from dataclasses import dataclass, field
from typing import Any, Iterable, Optional, Set
from asyncua import ua
from sortedcontainers import SortedDict
TypeSubHandler = Any
@dataclass(frozen=True)
class NodeAttr:
attr: Optional[ua.AttributeIds] = None
queuesize: int = 0
@dataclass
class VirtualSubscription:
period: int
handler: TypeSubHandler
publishing: bool
monitoring: ua.MonitoringMode
# type annotation (not supported yet): SortedDict[str, NodeAttr]
# see: https://github.com/grantjenks/python-sortedcontainers/pull/107
nodes: SortedDict = field(default_factory=SortedDict)
def subscribe_data_change(
self, nodes: Iterable[str], attr: ua.AttributeIds, queuesize: int
) -> None:
for node in nodes:
self.nodes[node] = NodeAttr(attr, queuesize)
def unsubscribe(self, nodes: Iterable[str]) -> None:
for node in nodes:
if self.nodes.get(node):
self.nodes.pop(node)
def set_monitoring_mode(self, mode: ua.MonitoringMode) -> None:
self.monitoring = mode
def set_publishing_mode(self, mode: bool) -> None:
self.publishing = mode
def get_nodes(self) -> Set[str]:
return set(self.nodes)
import sys
import asyncio
import logging
import time
# import asyncua
sys.path.insert(0, "..")
from asyncua import Server, ua
from asyncua.client.ha.ha_client import HaClient, HaMode, HaConfig
# set up logging
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
# diable logging for the servers
logging.getLogger("asyncua.server").setLevel(logging.WARNING)
class SubHandler:
"""
Basic subscription handler to support datachange_notification.
No need to implement the other handlermethods since the
HA_CLIENT only supports datachange for now.
"""
def datachange_notification(self, node, val, data):
"""
called for every datachange notification from server
"""
print(f"Node: {node} has value: {val}\n")
async def start_servers():
""" Spin up two servers with identical configurations """
ports = [4840, 4841]
urls = []
loop = asyncio.get_event_loop()
for port in ports:
server = Server()
await server.init()
url = f"opc.tcp://0.0.0.0:{port}/freeopcua/server/"
urls.append(url)
server.set_endpoint(url)
server.set_server_name("FreeOpcUa Example Server {port}")
# setup our own namespace
uri = "http://examples.freeopcua.github.io"
idx = await server.register_namespace(uri)
myobj = await server.nodes.objects.add_object(idx, "MyObject")
myvar = await myobj.add_variable(idx, "MyVariable", 6.7)
await server.start()
loop.create_task(server_var_update(server, myvar))
return urls, myvar
async def server_var_update(server, myvar):
"""
Constantly increment the variable with epoch time
to simulate data notifications.
"""
while True:
await asyncio.sleep(1)
await server.write_attribute_value(myvar.nodeid, ua.DataValue(time.time()))
async def main():
# start the servers
urls, node = await start_servers()
# set up ha_client with the serveur urls
ha_config = HaConfig(
HaMode.WARM,
keepalive_timer=15,
manager_timer=15,
reconciliator_timer=15,
urls=urls,
session_timeout=30
)
ha = HaClient(ha_config)
await ha.start()
publish_interval = 1000
handler = SubHandler()
# subscribe to two nodes
sub1 = await ha.create_subscription(publish_interval, handler)
await ha.subscribe_data_change(sub1, [node])
# Watch the debug log and check what's happening in the background.
# A basic check could be to `iptables -A OUTPUT -p tcp --dport 4840 -j DROP`
# and observe the failover in action
await asyncio.sleep(60)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
......@@ -15,7 +15,7 @@ setup(
packages=find_packages(),
provides=["asyncua"],
license="GNU Lesser General Public License v3 or later",
install_requires=["aiofiles", "aiosqlite", "python-dateutil", "pytz", 'cryptography'],
install_requires=["aiofiles", "aiosqlite", "python-dateutil", "pytz", "cryptography", "sortedcontainers"],
classifiers=[
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
......
import asyncio
import pytest
import operator
import os
import socket
from collections import namedtuple
from asyncua import ua, Client, Server
from asyncua.client.ha.ha_client import HaClient, HaConfig, HaMode
from asyncua.server.history import HistoryDict
from asyncua.server.history_sql import HistorySQLite
......@@ -9,12 +14,34 @@ from .test_common import add_server_methods
from .util_enum_struct import add_server_custom_enum_struct
from threading import Thread
port_num = 48540
port_num1 = 48510
port_discovery = 48550
from asyncio import get_event_loop_policy, sleep
from contextlib import closing
import pytest
from asyncua import Server, ua
RETRY = 20
SLEEP = 0.4
PORTS_USED = set()
Opc = namedtuple('opc', ['opc', 'server'])
def find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
port = s.getsockname()[1]
if port not in PORTS_USED:
PORTS_USED.add(port)
return port
else:
return find_free_port()
port_num = find_free_port()
port_num1 = find_free_port()
port_discovery = find_free_port()
def pytest_generate_tests(metafunc):
mark = metafunc.definition.get_closest_marker('parametrize')
......@@ -29,10 +56,10 @@ def pytest_generate_tests(metafunc):
metafunc.parametrize('history_server', ['dict', 'sqlite'], indirect=True)
@pytest.yield_fixture(scope='module')
@pytest.fixture(scope='module')
def event_loop(request):
"""Create an instance of the default event loop for each test case."""
loop = asyncio.get_event_loop_policy().new_event_loop()
loop = get_event_loop_policy().new_event_loop()
loop.set_debug(True)
yield loop
loop.close()
......@@ -221,3 +248,184 @@ async def history_server(request):
srv = await create_history_server(sqlite=True)
yield srv
await srv.srv.stop()
@pytest.fixture(scope="session")
def client_key_and_cert(request):
base_dir = os.path.dirname(os.path.dirname(__file__))
cert_dir = os.path.join(base_dir, "examples/certificates") + os.sep
key = f"{cert_dir}peer-private-key-example-1.pem"
cert = f"{cert_dir}peer-certificate-example-1.der"
return key, cert
@pytest.fixture(scope="session")
def server_key_and_cert(request):
base_dir = os.path.dirname(os.path.dirname(__file__))
cert_dir = os.path.join(base_dir, "examples") + os.sep
key = f"{cert_dir}private-key-example.pem"
cert = f"{cert_dir}certificate-example.der"
return key, cert
@pytest.fixture(scope="function")
def ha_config():
"""
Factory method to return a HaConfig
"""
def _ha_config(srv_port1, srv_port2, ha_mode=HaMode.WARM):
return HaConfig(
ha_mode,
keepalive_timer=1,
manager_timer=1,
reconciliator_timer=1,
urls=[
f"opc.tcp://127.0.0.1:{srv_port1}",
f"opc.tcp://127.0.0.1:{srv_port2}",
],
session_timeout=30,
)
return _ha_config
@pytest.fixture(scope="module")
async def ha_servers(server_key_and_cert):
# start our own server
srvs = []
key, cert = server_key_and_cert
async def create_srv(port, key, cert):
srv = Server()
srvs.append(srv)
await srv.init()
await srv.set_application_uri("urn:freeopcua:python:discovery")
srv.set_endpoint(f"opc.tcp://127.0.0.1:{port}")
srv.set_security_policy(
[
ua.SecurityPolicyType.NoSecurity,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
]
)
await srv.load_certificate(cert)
await srv.load_private_key(key)
await srv.start()
# default the service level to 255 once started
slevel = srv.get_node(ua.NodeId(ua.ObjectIds.Server_ServiceLevel))
await slevel.write_value(ua.Variant(255, ua.VariantType.Byte))
return srv
port_1 = find_free_port()
port_2 = find_free_port()
srv1 = await create_srv(port_1, key, cert)
srv2 = await create_srv(port_2, key, cert)
yield srv1, srv2
# stop the servers
for srv in srvs:
await srv.stop()
PORTS_USED.remove(port_1)
PORTS_USED.remove(port_2)
@pytest.fixture(scope="module")
async def srv_variables(ha_servers):
"""
This fixture returns client like ha_client
server within the module scope.
"""
clts = []
# add the variables to the client
node_to_var = {}
for srv in ha_servers:
for var in ("V1", "V2"):
url = f"{srv.endpoint.scheme}://admin@{srv.endpoint.netloc}"
c = Client(url, timeout=10)
clts.append(c)
await c.connect()
values = [1, 2, 3]
o = c.nodes.objects
node = await o.add_variable(3, f"SubscriptionVariable{var}", values)
node_to_var[node] = values
yield node_to_var
# disconnect admin@clients used to write the custom variables
for c in clts:
await c.disconnect()
@pytest.fixture(scope="function")
async def ha_client(ha_config, ha_servers):
"""
This fixture returns everytime a new
HaClient but configured with the same
server within the module scope.
"""
srv1, srv2 = ha_servers
srv1_port = srv1.endpoint.port
srv2_port = srv2.endpoint.port
ha_config = ha_config(srv1_port, srv2_port)
ha_client = HaClient(ha_config)
yield ha_client
if ha_client.is_running:
await ha_client.stop()
async def wait_clients_socket(ha_client, state):
for client in ha_client.get_clients():
for _ in range(RETRY):
if client.uaclient.protocol and client.uaclient.protocol.state == state:
break
await sleep(SLEEP)
assert client.uaclient.protocol.state == state
async def wait_sub_in_real_map(ha_client, sub, negation=False):
reconciliator = ha_client.reconciliator
oper = operator.not_ if negation else operator.truth
for client in ha_client.get_clients():
url = client.server_url.geturl()
for _ in range(RETRY):
if oper(
reconciliator.real_map.get(url) and reconciliator.real_map[url].get(sub)
):
break
await sleep(SLEEP)
assert oper(reconciliator.real_map[url].get(sub))
async def wait_node_in_real_map(ha_client, sub, node_str, negation=False):
oper = operator.not_ if negation else operator.truth
reconciliator = ha_client.reconciliator
for client in ha_client.get_clients():
url = client.server_url.geturl()
for _ in range(RETRY):
# virtual subscription must already exist,
if reconciliator.real_map.get(url):
vs = reconciliator.real_map[url][sub]
if oper(node_str in vs.nodes):
break
await sleep(SLEEP)
assert oper(node_str in vs.nodes)
async def wait_mode_in_real_map(ha_client, client, sub, mode, value):
reconciliator = ha_client.reconciliator
await wait_sub_in_real_map(ha_client, sub)
url = client.server_url.geturl()
vsub = reconciliator.real_map[url][sub]
for _ in range(RETRY):
option = getattr(vsub, mode)
if option == value:
break
await sleep(SLEEP)
assert option == value
async def wait_for_status_change(ha_client, client, status):
# wait for the KeepAlive task to update its client status
srv_info = ha_client.clients[client]
for _ in range(RETRY):
if srv_info.status == status:
break
await sleep(SLEEP)
assert srv_info.status == status
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment