kpi.py 20.5 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2022-2024  Nexedi SA and Contributors.
3
#                          Kirill Smelkov <kirr@nexedi.com>
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package amari.kpi provides driver for KPI-related measurements for Amarisoft LTE stack.

Use LogMeasure to convert enb.xlog (TODO and enb.log) to Measurements.
The KPIs themselves can be computed from Measurements via package xlte.kpi .
"""

26 27
from __future__ import print_function, division, absolute_import

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
from xlte import kpi
from xlte.amari import xlog
from golang import func


# LogMeasure takes enb.xlog (TODO and enb.log) as input, and produces kpi.Measurements on output.
#
#     enb.xlog     ─────────
#     ─────────>  │   Log   │
#                 │         │ ────> []kpi.Measurement
#     ─────────>  │ Measure │
#     enb.log      ─────────
#
# Use LogMeasure(rxlog, rlog) to create it.
# Use .read() to retrieve Measurements.
# Use .close() when done.
class LogError(RuntimeError):
    # .timestamp    s | None for invalid input
    pass
class LogMeasure:
    # ._rxlog       IO reader for enb.xlog
    # ._rlog        IO reader for enb.log
    #
51
    # ._estats      \/ last xlog.Message with read stats result
52
    #               \/ last xlog.Event\sync | LogError
53 54 55
    #               \/ None
    # ._m           kpi.Measurement being prepared covering [_estats_prev, _estats) | None
    # ._m_next      kpi.Measurement being prepared covering [_estats, _estats_next) | None
56 57
    #
    # ._drb_stats   last xlog.Message with x.drb_stats | None   ; reset on error|event
58 59 60 61 62 63 64 65 66 67 68 69
    pass


# LogMeasure(rxlog, rlog) creates new LogMeasure object that will read
# enb.xlog and enb.log data from IO readers rxlog and rlog.
#
# The readers must provide .readline() and .read() methods.
# The ownership of rxlog and rlog is transferred to LogMeasure.
@func(LogMeasure)
def __init__(logm, rxlog, rlog):
    logm._rxlog = xlog.Reader(rxlog)
    logm._rlog  = rlog
70
    logm._estats = None
71
    logm._m = None
72
    logm._m_next = None
73
    logm._drb_stats = None
74 75 76 77 78 79 80 81 82 83 84 85 86

# close releases resources associated with LogMeasure and closes underlying readers.
@func(LogMeasure)
def close(logm):
    logm._rxlog.close()
    logm._rlog .close()


# read retrieves and returns next Measurement or None at EOF.
#
# It reads data from enb.xlog (TODO and enb.log) as needed.
@func(LogMeasure)
def read(logm):  # -> kpi.Measurement | None
87
    _trace('\n\n  LogMeasure.read')
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
    m = logm._read()
    _trace('  <-', m)
    return m

@func(LogMeasure)
def _read(logm):
    # read log data organizing periods around stats queries.
    #
    # we emit measurement X after reading stats X+2 - i.e. we emit measurement
    # for a period after reading data covering _next_ period. It is organized
    # this way to account for init/fini correction(*):
    #
    #              fini adjust
    #             -------------
    #            '             '
    #      Sx    v     Sx+1    '   Sx+2
    #   ────|───────────|───────────|────
    #        Measurement Measurement
    #             X          X+1
    #
    #
    # (*) see kpi.Measurement documentation for more details about init/fini correction.
110
    m = None  # kpi.Measurement to return
111 112
    while 1:
        _trace()
113 114 115 116
        _trace('m:       \t', m)
        _trace('._m:     \t', logm._m)
        _trace('._estats:\t', logm._estats)
        _trace('._m_next:\t', logm._m_next)
117
        _trace('._drb_stats:\t', logm._drb_stats)
118 119 120 121 122 123 124 125

        if m is not None:
            return m

        # flush the queue at an error or an event, e.g. at "service detach".
        estats = logm._estats
        if isinstance(estats, (xlog.Event, LogError)):
            # <- M for [estats_prev, estats)
126 127 128 129
            m = logm._m
            if m is not None:
                logm._m = None
                return m
130 131 132
            # note ._m_next is not flushed:
            # if ._m_next != None - it remains initialized with X.Tstart = estats.timestamp

133
            # <- error|EOF
134 135 136
            if isinstance(estats, LogError):
                logm._estats = None
                if estats is LogError.EOF:
137
                    return None
138
                raise estats
139

140 141 142 143 144 145
            # queue should be flushed now till including estats with
            # event remaining non-none, e.g. "service detach", but not an error
            assert logm._m is None
            assert isinstance(logm._estats, xlog.Event)
            assert isinstance(logm._m_next, kpi.Measurement)
            assert logm._m_next['X.Tstart'] == logm._estats.timestamp
146 147 148 149 150 151 152 153 154 155 156 157


        # fetch next entry from xlog
        try:
            x = logm._rxlog.read()
        except Exception as e:
            x = LogError(None, str(e)) # e.g. it was xlog.ParseError(...)
        _trace('  xlog:', x)

        if x is None:
            x = LogError.EOF # represent EOF as LogError

158
        # ignore sync events
159
        if isinstance(x, xlog.SyncEvent):
160 161
            continue

162 163
        # handle messages that update current Measurement
        if isinstance(x, xlog.Message):
164 165 166
            if x.message == "x.drb_stats":
                logm._handle_drb_stats(x)
                continue
167 168 169 170
            if x.message != "stats":
                continue    # ignore other messages


171
        # it is an error, event\sync or stats.
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
        # if it is an event or stats -> finalize timestamp for _m_next.
        # start building next _m_next covering [x, x_next).
        # shift m <- ._m <- ._m_next <- (new Measurement | None for LogError)
        # a LogError throws away preceding Measurement and does not start a new one after it
        if logm._m_next is not None:
            if not isinstance(x, LogError):
                logm._m_next['X.δT'] = x.timestamp - logm._m_next['X.Tstart']
            else:
                logm._m_next = None # throw it away on seeing e.g. "stats, error"
        m = logm._m
        logm._m = logm._m_next
        if not isinstance(x, LogError):
            logm._m_next = kpi.Measurement()
            logm._m_next['X.Tstart'] = x.timestamp # note X.δT remains NA until next stats|event
        else:
            logm._m_next = None

        if isinstance(x, (xlog.Event, LogError)):
            logm._estats = x # it is ok to forget previous event after e.g. bad line with ParseError
191
            logm._drb_stats = None # reset ._drb_stats at an error or event
192
            continue         # flush the queue
193

194 195 196 197 198 199 200 201
        assert isinstance(x, xlog.Message)
        assert x.message == "stats"
        logm._handle_stats(x, m)
        # NOTE _handle_stats indicates logic error in x by setting ._estats to
        # LogError instead of stats. However those LogErrors come with
        # timestamp and are thus treated similarly to events: we do not throw
        # away neither ._m, nor ._m_next like we do with LogErrors that
        # represent errors at the log parsing level.
202 203
        continue

204 205

# _handle_stats handles next stats xlog entry upon _read request.
206
@func(LogMeasure)
207
def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
    # build Measurement from stats' counters.
    #
    # we take δ(stats_prev, stat) and process it mapping Amarisoft counters to
    # 3GPP ones specified by kpi.Measurement. This approach has following limitations:
    #
    # - for most of the counters there is no direct mapping in between
    #   Amarisoft and 3GPP. For example we currently use s1_erab_setup_request for
    #   ERAB.EstabAddAtt.sum, but this mapping is not strictly correct and will
    #   break if corresponding S1 E-RAB SETUP REQUEST message contains multiple
    #   ERABs. The code has corresponding FIXME marks where such approximations
    #   are used.
    #
    # - it is not possible to implement init/fini correction precisely. From
    #   aggregated statistics we only get total amount for a fini value for a
    #   period - without knowing which part of it corresponds to init events
    #   from previous period, and which part to init events from current one.
    #   With that it is only possible to make a reasonable guess and try to
    #   preserve statistical properties, but not more. See m_initfini below for
    #   details.
    #
228
    # - it is not easy to produce per-cell measurements. This limitation
229 230
    #   comes from the fact that in Amarisoft LTE stack S1-related counters
    #   come as "globals" ones, while e.g. RRC-related counters are "per-cell".
231 232 233
    #   It is thus hard to see how much S1 connection establishments are associated
    #   with one particular cell if there are several of them. One S1 connection could
    #   be even related to multiple cells simultaneously when carriers are aggregated.
234 235 236 237 238 239 240 241
    #
    # TODO also parse enb.log to fix those issues.

    # check if new stats follows required structure.
    # handle it as an error event if it is not.
    try:
        _stats_check(stats)
    except LogError as e:
242 243
        logm._estats = e  # stays M(ø) for [estats_prev, bad_stats)
        return
244 245

    # stats is pre-checked to be good. push it to the queue.
246 247 248 249 250 251 252 253
    estats_prev = logm._estats
    logm._estats = stats

    # first stats after e.g. service attach -> stays M(ø) for [event_prev, stats)
    if estats_prev is None:
        return
    if isinstance(estats_prev, (xlog.Event, LogError)):
        return
254

255 256 257 258 259
    assert isinstance(estats_prev, xlog.Message)
    assert estats_prev.message == "stats"
    stats_prev = estats_prev

    # we have 2 adjacent stats. Adjust corresponding Measurement from their δ.
260
    # do init/fini correction if there was also third preceding stats message.
261
    m = logm._m.copy() # [stats_prev, stats)
262

263
    # δcc(counter) tells how specified global cumulative counter changed since last stats result.
264 265 266 267 268 269 270
    def δcc(counter):
        old = _stats_cc(stats_prev, counter)
        new = _stats_cc(stats,      counter)
        if new < old:
            raise LogError(stats.timestamp, "cc %s↓  (%s → %s)" % (counter, old, new))
        return new - old

271 272 273 274 275 276 277 278
    # δcell_cc(counter) tells how specified per-cell cumulative counter changed since last stats result.
    def δcell_cc(cell, counter):
        old = _stats_cell_cc(stats_prev, cell, counter)
        new = _stats_cell_cc(stats,      cell, counter)
        if new < old:
            raise LogError(stats.timestamp, "cc C%s.%s↓  (%s → %s)" % (cell, counter, old, new))
        return new - old

279 280 281
    # m_initfini populates m[init] and m[fini] from vinit and vfini values.
    # copy of previous ._m[fini] is correspondingly adjusted for init/fini correction.
    p = None
282 283
    if m_prev is not None:
        p = m_prev.copy()
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
    def m_initfini(init, vinit, fini, vfini):
        m[init] = vinit
        m[fini] = vfini
        # take as much as possible from current fini to populate prev fini.
        # this way we expose moved fini events as appearing in previous
        # period, and, with correct values coming from xlog, will have to
        # throw-away (see below for "too much" case) as minimum as possible
        # fini events. And even though we don't know exactly how many moved fini
        # was from previous period, and how much was actually from current
        # period, tossing fini values in between those periods should not change
        # overall statistics if it is computed taking both periods into account.
        if p is not None:
            if p[fini] < p[init]:
                δ = min(p[init]-p[fini], m[fini])
                p[fini] += δ
                m[fini] -= δ
        # if we still have too much fini - throw it away pretending that it
        # came from even older uncovered period
        if m[fini] > m[init]:
            m[fini] = m[init]

    # compute δ for counters.
    # any logic error in data will be reported via LogError.
    try:
        # RRC: connection establishment
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
        #
        # Aggregate statistics for all cells because in E-RAB Accessibility we need
        # aggregated RRC.ConnEstab* for whole eNB. It would be more logical to emit
        # per-cell RRC statistics here and aggregate the result in KPI computation
        # routine, but for now we are not delving to rework kpi.Measurement to
        # contain per-cell values. For E-RAB Accessibility the end result is the
        # same whether we do aggregation here or in kpi.Calc.erab_accessibility().
        #
        # TODO rework to emit per-cell measurements when/if we need per-cell KPIs
        cells = set(stats['cells'].keys())  # NOTE cells are taken only from stats, not from stat_prev
        δΣcell_rrc_connection_request = 0   # (if a cell disappears its counters stop to be accounted)
        δΣcell_rrc_connection_setup_complete = 0
        for cell in cells:
            δΣcell_rrc_connection_request         += δcell_cc(cell, 'rrc_connection_request')
            δΣcell_rrc_connection_setup_complete  += δcell_cc(cell, 'rrc_connection_setup_complete')
324
        m_initfini(
325 326
            'RRC.ConnEstabAtt.sum',         δΣcell_rrc_connection_request,
            'RRC.ConnEstabSucc.sum',        δΣcell_rrc_connection_setup_complete)
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349

        # S1: connection establishment
        m_initfini(
            'S1SIG.ConnEstabAtt',           δcc('s1_initial_context_setup_request'),
            'S1SIG.ConnEstabSucc',          δcc('s1_initial_context_setup_response'))

        # ERAB: Initial establishment
        # FIXME not correct if multiple ERABs are present in one message
        m_initfini(
            'ERAB.EstabInitAttNbr.sum',     δcc('s1_initial_context_setup_request'),
            'ERAB.EstabInitSuccNbr.sum',    δcc('s1_initial_context_setup_response'))

        # ERAB: Additional establishment
        # FIXME not correct if multiple ERABs are present in one message
        m_initfini(
            'ERAB.EstabAddAttNbr.sum',      δcc('s1_erab_setup_request'),
            'ERAB.EstabAddSuccNbr.sum',     δcc('s1_erab_setup_response'))

    except Exception as e:
        if not isinstance(e, LogError):
            _ = e
            e = LogError(stats.timestamp, "internal failure")
            e.__cause__ = _
350 351
        logm._estats = e
        return
352 353

    # all adjustments and checks are over.
354 355 356 357
    logm._m = m             # we can now remember our Measurement adjustments for current stats,
    if m_prev is not None:  # and commit adjustments to previous measurement, if it was there.
        m_prev.put((0,), p) # copy m_prev <- p
    return
358 359 360 361 362 363


# _stats_check verifies stats message to have required structure.
def _stats_check(stats: xlog.Message):
    try:
        stats.get1("counters", dict).get1("messages", dict)
364 365 366
        cells = stats.get1("cells", dict)
        for cell in cells:
            cells.get1(cell, dict).get1("counters", dict).get1("messages", dict)
367 368 369 370
    except Exception as e:
        raise LogError(stats.timestamp, "stats: %s" % e)  from None
    return

371
# _stats_cc returns specified global cumulative counter from stats result.
372 373 374
#
# stats is assumed to be already verified by _stats_check.
def _stats_cc(stats: xlog.Message, counter: str):
375 376 377 378 379 380 381 382
    return stats['counters']['messages'].get(counter, 0)

# _stats_cell_cc is like _stats_cc but returns specified per-cell cumulative counter from stats result.
def _stats_cell_cc(stats: xlog.Message, cell: str, counter: str):
    _ = stats['cells'].get(cell)
    if _ is None:
        return 0    # cell is absent in this stats
    return _['counters']['messages'].get(counter, 0)
383 384


385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
# _handle_drb_stats handles next x.drb_stats xlog entry upon _read request.
@func(LogMeasure)
def _handle_drb_stats(logm, drb_stats: xlog.Message):
    # TODO precheck for correct message structure similarly to _stats_check

    drb_stats_prev = logm._drb_stats
    logm._drb_stats = drb_stats

    # first drb_stats after an event - we don't know which time period it covers
    if drb_stats_prev is None:
        return

    assert isinstance(drb_stats_prev, xlog.Message)
    assert drb_stats_prev.message == "x.drb_stats"

    # time coverage for current drb_stats
    τ_lo = drb_stats_prev.timestamp
    τ_hi = drb_stats.timestamp
    δτ = τ_hi - τ_lo

    # see with which ._m or ._m_next, if any, drb_stats overlaps with ≥ 50% of
    # time first, and update that measurement correspondingly.
    if not (δτ > 0):
        return

    if logm._m is not None:
        m_lo = logm._m['X.Tstart']
        m_hi = m_lo + logm._m['X.δT']

        d = max(0, min(τ_hi, m_hi) -
                   max(τ_lo, m_lo))
        if d >= δτ/2:  # NOTE ≥ 50%, not > 50% not to skip drb_stats if fill is exactly 50%
            _drb_update(logm._m, drb_stats)
            return

    if logm._m_next is not None:
        n_lo = logm._m_next['X.Tstart']
        # n_hi - don't know as _m_next['X.δT'] is ø yet

        d = max(0,     τ_hi        -
                   max(τ_lo, n_lo))
        if d >= δτ/2:
            _drb_update(logm._m_next, drb_stats)
            return

# _drb_update updates Measurement from dl/ul DRB statistics related to measurement's time coverage.
def _drb_update(m: kpi.Measurement, drb_stats: xlog.Message):
    # TODO Exception -> LogError("internal failure") similarly to _handle_stats
    qci_trx = drb_stats.get1("qci_dict", dict)

    for dir in ('dl', 'ul'):
        qvol      = m['DRB.IPVol%s.QCI'          % dir.capitalize()]
        qtime     = m['DRB.IPTime%s.QCI'         % dir.capitalize()]
        qtime_err = m['XXX.DRB.IPTime%s_err.QCI' % dir.capitalize()]

        # qci_dict carries entries only for qci's with non-zero values, but if
        # we see drb_stats we know we have information for all qcis.
        # -> pre-initialize to zero everything
        if kpi.isNA(qvol).all():        qvol[:]      = 0
        if kpi.isNA(qtime).all():       qtime[:]     = 0
        if kpi.isNA(qtime_err).all():   qtime_err[:] = 0

        for qci_str, trx in qci_trx.items():
            qci = int(qci_str)

            # DRB.IPVol and DRB.IPTime are collected to compute throughput.
            #
            # thp = ΣB*/ΣT*  where B* is tx'ed bytes in the sample without taking last tti into account
            #                and   T* is time of tx also without taking that sample's tail tti.
            #
            # we only know ΣB (whole amount of tx), ΣT and ΣT* with some error.
            #
            # -> thp can be estimated to be inside the following interval:
            #
            #          ΣB            ΣB
            #         ───── ≤ thp ≤ ─────           (1)
            #         ΣT_hi         ΣT*_lo
            #
            # the upper layer in xlte.kpi will use the following formula for
            # final throughput calculation:
            #
            #               DRB.IPVol
            #         thp = ──────────              (2)
            #               DRB.IPTime
            #
            # -> set DRB.IPTime and its error to mean and δ of ΣT_hi and ΣT*_lo
            # so that (2) becomes (1).

            # FIXME we account whole PDCP instead of only IP traffic
            ΣB      = trx['%s_tx_bytes' % dir]
            ΣT      = trx['%s_tx_time'  % dir]
            ΣT_err  = trx['%s_tx_time_err'  % dir]
            ΣTT     = trx['%s_tx_time_notailtti' % dir]
            ΣTT_err = trx['%s_tx_time_notailtti_err' % dir]

            ΣT_hi   = ΣT + ΣT_err
            ΣTT_lo  = ΣTT - ΣTT_err

            qvol[qci]      = 8*ΣB   # in bits
            qtime[qci]     = (ΣT_hi + ΣTT_lo) / 2
            qtime_err[qci] = (ΣT_hi - ΣTT_lo) / 2


488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
# LogError(timestamp|None, *argv).
@func(LogError)
def __init__(e, τ, *argv):
    e.timestamp = τ
    super(LogError, e).__init__(*argv)

# __str__ returns human-readable form.
@func(LogError)
def __str__(e):
    t = "?"
    if e.timestamp is not None:
        t = "%s" % e.timestamp
    return "t%s: %s" % (t, super(LogError, e).__str__())

# LogError.EOF is special LogError value to represent EOF event.
LogError.EOF = LogError(None, "EOF")


# ----------------------------------------

_debug = False
def _trace(*argv):
    if _debug:
        print(*argv)