# Copyright (C) 2023-2024  Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.

# XXX core-network - skip - verified by ors

# (*) here we verify only generated configuration because it is not possible to
#     run Amarisoft software on the testnodes due to licensing restrictions.
#
#     end-to-end testing complements unit-testing by verifying how LTE works
#     for real on dedicated hardware test setup.


import os
import json
import io
import yaml
import pcpp
import xmltodict

import sys
sys.path.insert(0, '../ru')
import xbuildout

import unittest
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass

setUpModule, _AmariTestCase = makeModuleSetUpAndTestCaseClass(
    os.path.abspath(
        os.path.join(os.path.dirname(__file__), '..', 'software.cfg')))


# ---- building blocks to construct cell/peer parameters ----
#
# - TDD/FDD indicate TDD/FDD mode.
# - LTE/NR  indicate LTE/NR cell with given downlink frequency.
# - BW      indicates specified bandwidth.
# - CENB    indicates a ENB-kind cell.
# - CUE     indicates an UE-kind cell.
# - TAC     indicates specified Traking Area Code.
# - LTE_PEER/NR_PEER indicate an LTE/NR ENB-PEER-kind cell.
# - X2_PEER/XN_PEER  indicate an LTE/NR ENB peer.

# TDD/FDD are basic parameters to indicate TDD/FDD mode.
TDD = {'rf_mode': 'tdd'}
FDD = {'rf_mode': 'fdd'}

# LTE/NR return basic parameters for an LTE/NR cell with given downlink frequency.
def LTE(dl_earfcn):
    return {
        'cell_type':    'lte',
        'dl_earfcn':    dl_earfcn,
    }
def NR(dl_nr_arfcn, nr_band):
    return {
        'cell_type':    'nr',
        'dl_nr_arfcn':  dl_nr_arfcn,
        'nr_band':      nr_band,
    }

# BW returns basic parameters to indicate specified bandwidth.
def BW(bandwidth):
    return {
        'bandwidth':    bandwidth,
    }

# CENB returns basic parameters to indicate a ENB-kind cell.
def CENB(cell_id, pci):
    return {
        'cell_kind':    'enb',
        'cell_id':      '0x%02x' % cell_id,
        'pci':          pci,
    }

# CUE indicates an UE-kind cell.
CUE = {'cell_kind': 'ue'}

#  TAC returns basic parameters to indicate specified Traking Area Code.
def TAC(tac):
    return {
        'tac':          '0x%x' % tac,
    }

# LTE_PEER/NR_PEER return basic parameters to indicate an LTE/NR ENB-PEER-kind cell.
def LTE_PEER(e_cell_id, pci, tac):
    return {
        'cell_kind':    'enb_peer',
        'e_cell_id':    '0x%07x' % e_cell_id,
        'pci':          pci,
        'tac':          '0x%x' % tac,
    }
def NR_PEER(nr_cell_id, gnb_id_bits, pci, tac):
    return {
        'cell_kind':    'enb_peer',
        'nr_cell_id':   '0x%09x' % nr_cell_id,
        'gnb_id_bits':  gnb_id_bits,
        'pci':          pci,
        'tac':          tac,
    }

# X2_PEER/XN_PEER return basic parameters to indicate an LTE/NR ENB peer.
def X2_PEER(x2_addr):
    return {
        'peer_type':    'lte',
        'x2_addr':      x2_addr,
    }
def XN_PEER(xn_addr):
    return {
        'peer_type':    'nr',
        'xn_addr':      xn_addr,
    }

# --------

# AmariTestCase is base class for all tests.
class AmariTestCase(_AmariTestCase):
    maxDiff = None  # show full diff in test run log on an error

    # stress correctness of ru_ref/cell_ref/... usage throughout all places in
    # buildout code - special characters should not lead to wrong templates or
    # code injection.
    default_partition_reference = _AmariTestCase.default_partition_reference + \
                                  ' ${a:b}\n[c]\n;'

    # faster edit/try cycle during development
    if 1:   # XXX disable by default
        instance_max_retry = 1
        report_max_retry = 1

    @classmethod
    def requestDefaultInstance(cls, state='started'):
        inst = super().requestDefaultInstance(state=state)
        cls.requestAllShared(inst)
        return inst

    # requestAllShared should add all shared instances of the testcase over imain.
    @classmethod
    def requestAllShared(cls, imain):
        raise NotImplementedError

    # requestShared requests one shared instance over imain with specified subreference and parameters.
    @classmethod
    def requestShared(cls, imain, subref, ctx):
        cls.slap.request(
            software_release=cls.getSoftwareURL(),
            software_type=cls.getInstanceSoftwareType(),
            partition_reference=cls.ref(subref),
            # XXX StandaloneSlapOS rejects filter_kw with "Can only request on embedded computer"
            #filter_kw = {'instance_guid': imain.getInstanceGuid()},
            partition_parameter_kw={'_': json.dumps(ctx)},
            shared=True)

    # ref returns full reference of shared instance with given subreference.
    #
    # for example if reference of main instance is 'MAIN-INSTANCE'
    #
    #   ref('RU') = 'MAIN-INSTANCE.RU'
    @classmethod
    def ref(cls, subref):
        return '%s.%s' % (cls.default_partition_reference, subref)

    # ipath returns path for a file inside main instance.
    @classmethod
    def ipath(cls, path):
        assert path[:1] != '/', path
        return '%s/%s' % (cls.computer_partition_root_path, path)


# ---- eNB + base class for similar services that do radio ----

# RFTestCase4 is base class for tests of all services that do radio.
#
# It instantiates a service with several Radio Units and Cells attached.
#
# 4 RU x 4 CELL are requested to verify all {FDD,TDD}·{LTE,NR} combinations.
#
# In requested instances mostly non-overlapping range of numbers are
# assigned to parameters according to the following scheme:
#
#   0+          cell_id
#   0x10+       pci
#   0x100+      tac
#   10+         tx_gain
#   20+         rx_gain
#   xxx+i·100   dl_arfcn
#   5,10,15,20  bandwidth
#   100+        root_sequence_index
#   1000+       inactivity_timer
#
# this allows to quickly see offhand to which cell/ru and parameter a
# particular number belongs to.
#
# Subclasses should define:
#
# - RUcfg(i) to return primary parameters specific for i'th RU configuration
#   like ru_type - to verify particular RU driver, sdr_dev, sfp_port and so on.
# - CELLcfg(i) to tune parameters of i'th cell, for example cell_kind.
# - .rf_cfg with loaded service config
class RFTestCase4(AmariTestCase):
    @classmethod
    def requestAllShared(cls, imain):
        def RU(i):
            ru = cls.RUcfg(i)
            ru |= {'n_antenna_dl': 4, 'n_antenna_ul': 2}
            ru |= {'tx_gain': 10+i, 'rx_gain':  20+i, 'txrx_active': 'INACTIVE'}
            cls.requestShared(imain, 'RU%d' % i, ru)

        def CELL(i, ctx):
            cell = {
                'ru': {
                    'ru_type': 'ru_ref',
                    'ru_ref':   cls.ref('RU%d' % i),
                }
            }
            cell |= cls.CELLcfg(i)
            cell |= ctx
            cls.requestShared(imain, 'RU%d.CELL' % i, cell)

        RU(1);  CELL(1, FDD | LTE(   100)    | BW( 5))
        RU(2);  CELL(2, TDD | LTE( 40200)    | BW(10))
        RU(3);  CELL(3, FDD | NR (300300,74) | BW(15))
        RU(4);  CELL(4, TDD | NR (470400,40) | BW(20))

    def test_conf_txrx_gain(t):
        # NOTE even though setting tx_gain/rx_gain does not make any difference
        #      for CPRI case, we still do set it there for consistency. For the
        #      reference: for CPRI case the real tx/rx gain is set in RU
        #      configuration and is verified by RU tests.
        t.assertEqual(t.rf_cfg['tx_gain'], [11]*4 + [12]*4 + [13]*4 + [14]*4)
        t.assertEqual(t.rf_cfg['rx_gain'], [21]*2 + [22]*2 + [23]*2 + [24]*2)


# ENBTestCase4 provides base class for unit-testing eNB service.
#
# It instantiates enb with 4 Radio Units x 4 Cells and verifies generated
# enb.cfg to match what is expected.
class ENBTestCase4(RFTestCase4):
    @classmethod
    def getInstanceSoftwareType(cls):
        return "enb"

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.enb_cfg = cls.rf_cfg = yamlpp_load(cls.ipath('etc/enb.cfg'))

    @classmethod
    def getInstanceParameterDict(cls):
        return {'_': json.dumps({
            'testing':      True,
            'enb_id':       '0x17',
            'gnb_id':       '0x23',
            'gnb_id_bits':  30,
        })}

    @classmethod
    def requestAllShared(cls, imain):
        super().requestAllShared(imain)

        def _(subref, ctx):
            return cls.requestShared(imain, subref, ctx)
        _('PEER4',      X2_PEER('44.1.1.1'))
        _('PEER5',      XN_PEER('55.1.1.1'))

        _('PEERCELL4',  LTE(700)      | LTE_PEER(0x12345,    35, 0x123))
        _('PEERCELL5',  NR(520000,38) |  NR_PEER(0x77712,22, 75, 0x321))
        cls.ho_inter = [
            dict(rat='eutra', cell_id=0x12345, n_id_cell=35, dl_earfcn=  700, tac=0x123),
            dict(rat='nr',    nr_cell_id=0x77712, gnb_id_bits=22, n_id_cell=75,
                 dl_nr_arfcn=520000, ul_nr_arfcn=520000, ssb_nr_arfcn=520090, band=38,
                 tac = 0x321),
        ]

    def CELLcfg(i):
        return CENB(i, 0x10+i) | TAC(0x100+i) | {
                 'root_sequence_index': 100+i,
                 'inactivity_timer':    1000+i}

    # basic enb parameters
    def test_enb_conf_basic(t):
        assertMatch(t, t.enb_cfg, dict(
            enb_id=0x17, gnb_id=0x23, gnb_id_bits=30,
            x2_peers=['44.1.1.1'], xn_peers=['55.1.1.1'],
        ))

    # basic cell parameters
    def test_enb_conf_cell(t):
        assertMatch(t, t.enb_cfg['cell_list'],  [
          dict( # CELL1
            uldl_config=NO,   rf_port=0,        n_antenna_dl=4,  n_antenna_ul=2,
            dl_earfcn=100,    ul_earfcn=18100,
            n_rb_dl=25,
            cell_id=0x1,      n_id_cell=0x11,   tac=0x101,
            root_sequence_index=101,  inactivity_timer=1001,
          ),
          dict( # CELL2
            uldl_config=2,    rf_port=1,        n_antenna_dl=4,  n_antenna_ul=2,
            dl_earfcn=40200,  ul_earfcn=40200,
            n_rb_dl=50,
            cell_id=0x2,      n_id_cell=0x12,   tac=0x102,
            root_sequence_index=102,  inactivity_timer=1002,
          ),
        ])

        assertMatch(t, t.enb_cfg['nr_cell_list'],  [
          dict( # CELL3
            tdd_ul_dl_config=NO, rf_port=2,           n_antenna_dl=4,       n_antenna_ul=2,
            dl_nr_arfcn=300300,  ul_nr_arfcn=290700,  ssb_nr_arfcn=300270,  band=74,
            bandwidth=15,
            cell_id=0x3,         n_id_cell=0x13,      tac=NO,
            root_sequence_index=103,  inactivity_timer=1003,
          ),

          dict( # CELL4
            tdd_ul_dl_config={'pattern1': dict(
                period=5, dl_slots=7, dl_symbols=6, ul_slots=2, ul_symbols=4,
            )},
                                 rf_port=3,           n_antenna_dl=4,       n_antenna_ul=2,
            dl_nr_arfcn=470400,  ul_nr_arfcn=470400,  ssb_nr_arfcn=470430,  band=40,
            bandwidth=20,
            cell_id=0x4,         n_id_cell=0x14,      tac=NO,
            root_sequence_index=104,  inactivity_timer=1004,
          ),
        ])


    # Carrier Aggregation
    def test_enb_conf_ca(t):
        assertMatch(t, t.enb_cfg['cell_list'],  [
          { # CELL1
            'scell_list':           [{'cell_id': 2}],                   # LTE + LTE
            'en_dc_scg_cell_list':  [{'cell_id': 3}, {'cell_id': 4}],   # LTE + NR
          },
          { # CELL2
            'scell_list':           [{'cell_id': 1}],                   # LTE + LTE
            'en_dc_scg_cell_list':  [{'cell_id': 3}, {'cell_id': 4}],   # LTE + NR
          },
        ])

        assertMatch(t, t.enb_cfg['nr_cell_list'], [
          { # CELL3
            'scell_list':           [{'cell_id': 4}],                   # NR  + NR
          },
          { # CELL4
            'scell_list':           [{'cell_id': 3}],                   # NR  + NR
          },
        ])


    # Handover
    def test_enb_conf_ho(t):
        assertMatch(t, t.enb_cfg['cell_list'],  [
          { # CELL1
            'ncell_list':   [
              dict(rat='eutra', cell_id= 0x1702, n_id_cell=0x12, dl_earfcn=40200, tac=0x102), # CELL2
              dict(rat='nr',    cell_id=      3),                                             # CELL3
              dict(rat='nr',    cell_id=      4),                                             # CELL4
            ] + t.ho_inter,
          },
          { # CELL2
            'ncell_list':   [
              dict(rat='eutra', cell_id= 0x1701, n_id_cell=0x11, dl_earfcn=  100, tac=0x101), # CELL1
              dict(rat='nr',    cell_id=      3),                                             # CELL3
              dict(rat='nr',    cell_id=      4),                                             # CELL4
            ] + t.ho_inter,
          },
        ])
        assertMatch(t, t.enb_cfg['nr_cell_list'], [
          { # CELL3
            'ncell_list':   [
              dict(rat='eutra', cell_id= 0x1701, n_id_cell=0x11, dl_earfcn=  100, tac=0x101), # CELL1
              dict(rat='eutra', cell_id= 0x1702, n_id_cell=0x12, dl_earfcn=40200, tac=0x102), # CELL2
              dict(rat='nr',    cell_id=      4),                                             # CELL4
            ] + t.ho_inter,
          },
          { # CELL4
            'ncell_list':   [
              dict(rat='eutra', cell_id= 0x1701, n_id_cell=0x11, dl_earfcn=  100, tac=0x101), # CELL1
              dict(rat='eutra', cell_id= 0x1702, n_id_cell=0x12, dl_earfcn=40200, tac=0x102), # CELL2
              dict(rat='nr',    cell_id=      3),                                             # CELL3
            ] + t.ho_inter,
          },
        ])


# ---- RU mixins to be used with RFTestCase4 ----

# SDR4 is mixin to verify SDR driver wrt all LTE/NR x FDD/TDD modes.
class SDR4:
    @classmethod
    def RUcfg(cls, i):
        return {
            'ru_type':      'sdr',
            'ru_link_type': 'sdr',
            'sdr_dev_list': [2*i,2*i+1],
        }

    # radio units configuration
    def test_enb_conf_ru(t):    # XXX enb/ue ?
        assertMatch(t, t.rf_cfg['rf_driver'],  dict(
          args='dev0=/dev/sdr2,dev1=/dev/sdr3,dev2=/dev/sdr4,dev3=/dev/sdr5,' +
               'dev4=/dev/sdr6,dev5=/dev/sdr7,dev6=/dev/sdr8,dev7=/dev/sdr9',
          cpri_mapping=NO,
          cpri_mult=NO,
          cpri_rx_delay=NO,
          cpri_tx_delay=NO,
          cpri_tx_dbm=NO,
        ))


# Lopcomm4 is mixin to verify Lopcomm driver wrt all LTE/NR x FDD/TDD modes.
class Lopcomm4:
    @classmethod
    def RUcfg(cls, i):
        return {
            'ru_type':      'lopcomm',
            'ru_link_type': 'cpri',
            'cpri_link':    {
                'sdr_dev':  0,
                'sfp_port': i,
                'mult':     4,
                'mapping':  'hw',
                'rx_delay': 40+i,
                'tx_delay': 50+i,
                'tx_dbm':   60+i
            },
            'mac_addr':     '00:0A:45:00:00:%02x' % i,
        }

    # radio units configuration in enb.cfg
    def test_enb_conf_ru(t):
        assertMatch(t, t.rf_cfg['rf_driver'],  dict(
          args='dev0=/dev/sdr0@1,dev1=/dev/sdr0@2,dev2=/dev/sdr0@3,dev3=/dev/sdr0@4',
          cpri_mapping='hw,hw,hw,hw',
          cpri_mult='4,4,4,4',
          cpri_rx_delay='41,42,43,44',
          cpri_tx_delay='51,52,53,54',
          cpri_tx_dbm='61,62,63,64',
        ))

    # RU configuration in cu_config.xml
    def test_ru_cu_cfg(t):
        def uctx(rf_mode, cell_type, dl_arfcn, ul_arfcn, bw, dl_freq, ul_freq, tx_gain, rx_gain):
            return {
                'tx-array-carriers': {
                  'rw-duplex-scheme':              rf_mode,
                  'rw-type':                       cell_type,
                  'absolute-frequency-center':    '%d' % dl_arfcn,
                  'center-of-channel-bandwidth':  '%d' % dl_freq,
                  'channel-bandwidth':            '%d' % bw,
                  'gain':                         '%d' % tx_gain,
                  'active':                       'INACTIVE',
                },
                'rx-array-carriers': {
                  'absolute-frequency-center':    '%d' % ul_arfcn,
                  'center-of-channel-bandwidth':  '%d' % ul_freq,
                  'channel-bandwidth':            '%d' % bw,
                  # XXX no rx_gain
                  'active':                       'INACTIVE',
                },
            }

        _ = t._test_ru_cu_cfg

        #       rf_mode  ctype dl_arfcn ul_arfcn   bw      dl_freq     ul_freq     txg rxg
        _(1, uctx('FDD', 'LTE',    100,   18100,  5000000, 2120000000, 1930000000, 11, 21))
        _(2, uctx('TDD', 'LTE',  40200,   40200, 10000000, 2551000000, 2551000000, 12, 22))
        _(3, uctx('FDD',  'NR', 300300,  290700, 15000000, 1501500000, 1453500000, 13, 23))
        _(4, uctx('TDD',  'NR', 470400,  470400, 20000000, 2352000000, 2352000000, 14, 24))

    def _test_ru_cu_cfg(t, i, uctx):
        cu_xml = t.ipath('etc/%s' % xbuildout.encode('%s-cu_config.xml' % t.ref('RU%d' % i)))
        with open(cu_xml, 'r') as f:
            cu = f.read()
        cu = xmltodict.parse(cu)

        assertMatch(t, cu, {
          'xc:config': {
            'user-plane-configuration': {
              'tx-endpoints': [
                {'name': 'TXA0P00C00', 'e-axcid': {'eaxc-id': '0'}},
                {'name': 'TXA0P00C01', 'e-axcid': {'eaxc-id': '1'}},
                {'name': 'TXA0P01C00', 'e-axcid': {'eaxc-id': '2'}},
                {'name': 'TXA0P01C01', 'e-axcid': {'eaxc-id': '3'}},
              ],
              'tx-links': [
                {'name': 'TXA0P00C00', 'tx-endpoint': 'TXA0P00C00'},
                {'name': 'TXA0P00C01', 'tx-endpoint': 'TXA0P00C01'},
                {'name': 'TXA0P01C00', 'tx-endpoint': 'TXA0P01C00'},
                {'name': 'TXA0P01C01', 'tx-endpoint': 'TXA0P01C01'},
              ],
              'rx-endpoints': [
                {'name': 'RXA0P00C00',   'e-axcid': {'eaxc-id': '0'}},
                {'name': 'PRACH0P00C00', 'e-axcid': {'eaxc-id': '8'}},
                {'name': 'RXA0P00C01',   'e-axcid': {'eaxc-id': '1'}},
                {'name': 'PRACH0P00C01', 'e-axcid': {'eaxc-id': '24'}},
              ],
              'rx-links': [
                {'name': 'RXA0P00C00',   'rx-endpoint': 'RXA0P00C00'},
                {'name': 'PRACH0P00C00', 'rx-endpoint': 'PRACH0P00C00'},
                {'name': 'RXA0P00C01',   'rx-endpoint': 'RXA0P00C01'},
                {'name': 'PRACH0P00C01', 'rx-endpoint': 'PRACH0P00C01'},
              ],
            } | uctx
          }
        })


# Sunwave4 is mixin to verify Sunwave driver wrt all LTE/NR x FDD/TDD modes.
class Sunwave4:
    @classmethod
    def RUcfg(cls, i):
        return {
            'ru_type':      'sunwave',
            'ru_link_type': 'cpri',
            'cpri_link':    {
                'sdr_dev':  1,
                'sfp_port': i,
                'mult':     5,
                'mapping':  'bf1',
                'rx_delay': 140+i,
                'tx_delay': 150+i,
                'tx_dbm':   160+i
            },
            'mac_addr':     '00:FA:FE:00:00:%02x' % i,
        }

    # radio units configuration in enb.cfg
    def test_enb_conf_ru(t):
        assertMatch(t, t.rf_cfg['rf_driver'],  dict(
          args='dev0=/dev/sdr1@1,dev1=/dev/sdr1@2,dev2=/dev/sdr1@3,dev3=/dev/sdr1@4',
          cpri_mapping='bf1,bf1,bf1,bf1',
          cpri_mult='5,5,5,5',
          cpri_rx_delay='141,142,143,144',
          cpri_tx_delay='151,152,153,154',
          cpri_tx_dbm='161,162,163,164',
        ))

# RUMultiType4 is mixin to verify that different RU types can be used at the same time.
class RUMultiType4:
    # ENB does not support mixing SDR + CPRI - verify only with CPRI-based units
    # see https://support.amarisoft.com/issues/26021 for details
    @classmethod
    def RUcfg(cls, i):
        assert 1 <= i <= 4, i
        if i in (1,2):
            return Lopcomm4.RUcfg(i)
        else:
            return Sunwave4.RUcfg(i)

    # radio units configuration in enb.cfg
    def test_enb_conf_ru(t):
        assertMatch(t, t.rf_cfg['rf_driver'],  dict(
          args='dev0=/dev/sdr0@1,dev1=/dev/sdr0@2,dev2=/dev/sdr1@3,dev3=/dev/sdr1@4',
          cpri_mapping='hw,hw,bf1,bf1',
          cpri_mult='4,4,5,5',
          cpri_rx_delay='41,42,143,144',
          cpri_tx_delay='51,52,153,154',
          cpri_tx_dbm='61,62,163,164',
        ))


# instantiate eNB tests
class TestENB_SDR4        (ENBTestCase4, SDR4):         pass
class TestENB_Lopcomm4    (ENBTestCase4, Lopcomm4):     pass
class TestENB_Sunwave4    (ENBTestCase4, Sunwave4):     pass
class TestENB_RUMultiType4(ENBTestCase4, RUMultiType4): pass


# ---- UEsim ----

# UEsimTestCase4 provides base class for unit-testing UEsim service.
#
# It is similar to ENBTestCase4 but configures UE cells instead of eNB cells.
class UEsimTestCase4(RFTestCase4):
    @classmethod
    def getInstanceSoftwareType(cls):
        return "ue"

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.ue_cfg = cls.rf_cfg = yamlpp_load(cls.ipath('etc/ue.cfg'))

    @classmethod
    def getInstanceParameterDict(cls):
        return {'_': json.dumps({
            'testing':      True,
        })}

    @classmethod
    def CELLcfg(cls, i):
        return CUE

    @classmethod
    def requestAllShared(cls, imain):
        super().requestAllShared(imain)

        def UE(i):
            ue = {
                'ue_type':  ('lte', 'nr') [(i-1) % 2],
                'rue_addr': 'host%d'    % i,
                'sim_algo': ('xor', 'milenage', 'tuak') [i-1],
                'imsi':     '%015d'     % i,
                'opc':      '%032x'     % i,
                'amf':      '0x%04x'    % (0x9000+i),
                'sqn':      '%012x'     % i,
                'k':        'FFFF%028x' % i,
                'impi':     'impi%d@rapid.space' % i,
            }
            cls.requestShared(imain, 'UE%d' % i, ue)

        UE(1)
        UE(2)
        UE(3)

    # ue parameters
    def test_uesim_ue(t):
        assertMatch(t, t.ue_cfg['ue_list'], [
          dict(
            as_release=13,  ue_category=13,   rue_addr='host1',
            sim_algo='xor', amf =0x9001,      impi='impi1@rapid.space',
            sqn ='000000000001',
            imsi='000000000000001',
            opc ='00000000000000000000000000000001',
            K   ='FFFF0000000000000000000000000001',
          ),
          dict(
            as_release=15,  ue_category='nr', rue_addr='host2',
            sim_algo='milenage', amf =0x9002, impi='impi2@rapid.space',
            sqn ='000000000002',
            imsi='000000000000002',
            opc ='00000000000000000000000000000002',
            K   ='FFFF0000000000000000000000000002',
          ),
          dict(
            as_release=13,  ue_category=13,   rue_addr='host3',
            sim_algo='tuak', amf =0x9003,     impi='impi3@rapid.space',
            sqn ='000000000003',
            imsi='000000000000003',
            opc ='00000000000000000000000000000003',
            K   ='FFFF0000000000000000000000000003',
          ),
        ])

    # cells
    def test_uesim_conf_cell(t):
        assertMatch(t, t.ue_cfg['cell_groups'], [
          dict(
            group_type='lte',
            cells=[
              dict( # CELL1
                rf_port=0,        n_antenna_dl=4,  n_antenna_ul=2,
                dl_earfcn=100,    ul_earfcn=18100,
                bandwidth=5,
              ),
              dict( # CELL2
                rf_port=1,        n_antenna_dl=4,  n_antenna_ul=2,
                dl_earfcn=40200,  ul_earfcn=40200,
                bandwidth=10,
              ),
            ]
          ),
          dict(
            group_type='nr',
            cells=[
              dict( # CELL3
                rf_port=2,           n_antenna_dl=4,      n_antenna_ul=2,
                dl_nr_arfcn=300300,  ul_nr_arfcn=290700,  ssb_nr_arfcn=300270,  band=74,
                bandwidth=15,
              ),
              dict( # CELL4
                rf_port=3,           n_antenna_dl=4,      n_antenna_ul=2,
                dl_nr_arfcn=470400,  ul_nr_arfcn=470400,  ssb_nr_arfcn=470430,  band=40,
                bandwidth=20,
              ),
            ]
          )
        ])


# instantiate UEsim tests
class TestUEsim_SDR4        (UEsimTestCase4, SDR4):         pass
class TestUEsim_Lopcomm4    (UEsimTestCase4, Lopcomm4):     pass
class TestUEsim_Sunwave4    (UEsimTestCase4, Sunwave4):     pass
class TestUEsim_RUMultiType4(UEsimTestCase4, RUMultiType4): pass


# ---- misc ----

# yamlpp_load loads yaml config file after preprocessing it.
#
# preprocessing is needed to e.g. remove // and /* comments.
def yamlpp_load(path):
    with open(path, 'r') as f:
        data = f.read()     # original input
    p = pcpp.Preprocessor()
    p.parse(data)
    f = io.StringIO()
    p.write(f)
    data_ = f.getvalue()    # preprocessed input
    return yaml.load(data_, Loader=yaml.Loader)


# assertMatch recursively matches data structure against specified pattern.
#
# - dict match by verifying v[k] == vok[k] for keys from the pattern.
#   vok[k]=NO means v[k] must be absent
# - list match by matching all elements individually
# - atomic types like int and str match by equality
class NOClass:
    def __repr__(self):
        return 'ø'
NO = NOClass()
def assertMatch(t: unittest.TestCase, v, vok):
    v_ = _matchCollect(v, vok)
    t.assertEqual(v_, vok)

def _matchCollect(v, vok):
    if type(v) is not type(vok):
        return v
    if type(v) is dict:
        v_ = {}
        for k in vok:
            #v_[k] = v.get(k, NO)
            v_[k] = _matchCollect(v.get(k, NO), vok[k])
        return v_
    if type(v) is list:
        v_ = []
        for i in range(max(len(v), len(vok))):
            e   = NO
            eok = NO
            if i < len(v):
                e = v[i]
            if i < len(vok):
                eok = vok[i]

            if e is not NO:
                if eok is not NO:
                    v_.append(_matchCollect(e, eok))
                else:
                    v_.append(e)
        return v_

    # other types, e.g. atomic int/str/... - return as is
    assert type(v) is not tuple, v
    return v


# XXX test for assertMatch