Commit 71087f67 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.kpi: New package with driver for Amarisoft LTE stack to retrieve...

amari.kpi: New package with driver for Amarisoft LTE stack to retrieve KPI-related measurements from logs

amari.kpi provides LogMeasure that takes enb.xlog (TODO and enb.log) as
input, and produces kpi.Measurements on output.

    enb.xlog     ─────────
    ─────────>  │   Log   │
                │         │ ────> []kpi.Measurement
    ─────────>  │ Measure │
    enb.log      ─────────

We read log data organizing periods around stats queries, and for now we
build Measurement from stats' counters. To do so we take δ(stats_prev, stat)
and process it mapping Amarisoft counters to 3GPP ones specified by
kpi.Measurement.

We emit measurement X after reading stats X+2 - i.e. we emit measurement
for a period after reading data covering _next_ period. It is organized
this way to account for init/fini correction:

             fini adjust
            -------------
           '             '
     Sx    v     Sx+1    '   Sx+2
  ────|───────────|───────────|────
       Measurement Measurement
            X          X+1

This approach has following limitations:

- for most of the counters there is no direct mapping in between
  Amarisoft and 3GPP. For example we currently use s1_erab_setup_request for
  ERAB.EstabAddAtt.sum, but this mapping is not strictly correct and will
  break if corresponding S1 E-RAB SETUP REQUEST message contains multiple
  ERABs. The code has corresponding FIXME marks where such approximations
  are used.

- it is not possible to implement init/fini correction precisely. From
  aggregated statistics we only get total amount for a fini value for a
  period - without knowing which part of it corresponds to init events
  from previous period, and which part to init events from current one.
  With that it is only possible to make a reasonable guess and try to
  preserve statistical properties, but not more. See m_initfini in the
  code for details.

- it is possible to handle eNB with single cell only. This limitation
  comes from the fact that in Amarisoft LTE stack S1-related counters
  come as "globals" ones, while e.g. RRC-related counters are "per-cell".
  It is thus not possible to see how much S1 connection establishments
  are associated with one particular cell if there are several of them.

TODO also parse enb.log to fix those issues.
parent 0633d26f
......@@ -5,5 +5,6 @@
XLTE repository provides assorted tools and packages with functionality related to LTE:
- `kpi` - process measurements and compute KPIs from them.
- `amari.kpi` - driver for Amarisoft LTE stack to retrieve KPI-related measurements from logs.
- `amari.xlog` - extra logging facilities for Amarisoft LTE stack.
- `xamari` - supplementary tool for managing Amarisoft LTE services.
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Package amari.kpi provides driver for KPI-related measurements for Amarisoft LTE stack.
Use LogMeasure to convert enb.xlog (TODO and enb.log) to Measurements.
The KPIs themselves can be computed from Measurements via package xlte.kpi .
"""
from xlte import kpi
from xlte.amari import xlog
from golang import func
# LogMeasure takes enb.xlog (TODO and enb.log) as input, and produces kpi.Measurements on output.
#
# enb.xlog ─────────
# ─────────> │ Log │
# │ │ ────> []kpi.Measurement
# ─────────> │ Measure │
# enb.log ─────────
#
# Use LogMeasure(rxlog, rlog) to create it.
# Use .read() to retrieve Measurements.
# Use .close() when done.
class LogError(RuntimeError):
# .timestamp s | None for invalid input
pass
class LogMeasure:
# ._rxlog IO reader for enb.xlog
# ._rlog IO reader for enb.log
#
# ._event currently handled xlog.Event | LogError | None
# ._stats currently handled xlog.Message with last read stats result | None
# ._m kpi.Measurement being prepared covering [_stats_prev, _stats) | None
pass
# LogMeasure(rxlog, rlog) creates new LogMeasure object that will read
# enb.xlog and enb.log data from IO readers rxlog and rlog.
#
# The readers must provide .readline() and .read() methods.
# The ownership of rxlog and rlog is transferred to LogMeasure.
@func(LogMeasure)
def __init__(logm, rxlog, rlog):
logm._rxlog = xlog.Reader(rxlog)
logm._rlog = rlog
logm._event = None
logm._stats = None
logm._m = None
# close releases resources associated with LogMeasure and closes underlying readers.
@func(LogMeasure)
def close(logm):
logm._rxlog.close()
logm._rlog .close()
# read retrieves and returns next Measurement or None at EOF.
#
# It reads data from enb.xlog (TODO and enb.log) as needed.
@func(LogMeasure)
def read(logm): # -> kpi.Measurement | None
m = logm._read()
_trace(' <-', m)
return m
@func(LogMeasure)
def _read(logm):
# read log data organizing periods around stats queries.
#
# we emit measurement X after reading stats X+2 - i.e. we emit measurement
# for a period after reading data covering _next_ period. It is organized
# this way to account for init/fini correction(*):
#
# fini adjust
# -------------
# ' '
# Sx v Sx+1 ' Sx+2
# ────|───────────|───────────|────
# Measurement Measurement
# X X+1
#
#
# (*) see kpi.Measurement documentation for more details about init/fini correction.
while 1:
_trace()
_trace('._event:\t', logm._event)
_trace('._stats:\t', logm._stats)
_trace('._m: \t', logm._m)
# flush the queue fully at an error or an event, e.g. at "service detach".
event = logm._event
if event is not None:
# <- M for [stats_prev, stats)
m = logm._m
if m is not None:
logm._m = None
return m
# <- M(ø) for [stats, event)
stats = logm._stats
if stats is not None:
logm._stats = None
if event.timestamp is not None:
m = kpi.Measurement()
m['X.Tstart'] = stats.timestamp
m['X.δT'] = event.timestamp - stats.timestamp
return m
# <- error|EOF
if isinstance(event, LogError):
logm._event = None
if event is LogError.EOF:
return None
raise event
# queue should be fully flushed now
assert logm._stats is None
assert logm._m is None
# event might remain non-none, e.g. "service detach", but not an error
assert isinstance(event, xlog.Event)
# fetch next entry from xlog
try:
x = logm._rxlog.read()
except Exception as e:
x = LogError(None, str(e)) # e.g. it was xlog.ParseError(...)
_trace(' xlog:', x)
if x is None:
x = LogError.EOF # represent EOF as LogError
if isinstance(x, LogError):
logm._event = x # it is ok to forget previous event after e.g. bad line with ParseError
continue # flush the queue
elif isinstance(x, xlog.Event):
event_prev = logm._event
logm._event = x
if event_prev is None:
continue # flush
# <- M(ø) for [event_prev, event)
assert event_prev.timestamp is not None # LogErrors are raised after queue flush
m = kpi.Measurement()
m['X.Tstart'] = event_prev.timestamp
m['X.δT'] = x.timestamp - event_prev.timestamp
return m
assert isinstance(x, xlog.Message)
if x.message != "stats":
continue
m = logm._read_stats(x)
if m is not None:
return m
continue
# _read_stats handles next stats xlog entry upon _read request.
@func(LogMeasure)
def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry)
# build Measurement from stats' counters.
#
# we take δ(stats_prev, stat) and process it mapping Amarisoft counters to
# 3GPP ones specified by kpi.Measurement. This approach has following limitations:
#
# - for most of the counters there is no direct mapping in between
# Amarisoft and 3GPP. For example we currently use s1_erab_setup_request for
# ERAB.EstabAddAtt.sum, but this mapping is not strictly correct and will
# break if corresponding S1 E-RAB SETUP REQUEST message contains multiple
# ERABs. The code has corresponding FIXME marks where such approximations
# are used.
#
# - it is not possible to implement init/fini correction precisely. From
# aggregated statistics we only get total amount for a fini value for a
# period - without knowing which part of it corresponds to init events
# from previous period, and which part to init events from current one.
# With that it is only possible to make a reasonable guess and try to
# preserve statistical properties, but not more. See m_initfini below for
# details.
#
# - it is possible to handle eNB with single cell only. This limitation
# comes from the fact that in Amarisoft LTE stack S1-related counters
# come as "globals" ones, while e.g. RRC-related counters are "per-cell".
# It is thus not possible to see how much S1 connection establishments
# are associated with one particular cell if there are several of them.
#
# TODO also parse enb.log to fix those issues.
# check if new stats follows required structure.
# handle it as an error event if it is not.
try:
_stats_check(stats)
except LogError as e:
event_prev = logm._event
logm._event = e
if event_prev is not None:
# <- M(ø) for [event, bad_stats)
m = kpi.Measurement()
m['X.Tstart'] = event_prev.timestamp
m['X.δT'] = stats.timestamp - event_prev.timestamp
return m
return None # flush
# stats is pre-checked to be good. push it to the queue.
stats_prev = logm._stats
logm._stats = stats
# first stats after service attach -> M(ø)
if stats_prev is None:
event_prev = logm._event
if event_prev is not None:
# <- M(ø) for [event, stats)
logm._event = None
m = kpi.Measurement()
m['X.Tstart'] = event_prev.timestamp
m['X.δT'] = stats.timestamp - event_prev.timestamp
return m
return None
# we have 2 adjacent stats. Start building new Measurement from their δ.
# do init/fini correction if there was also third preceding stats message.
m = kpi.Measurement() # [stats_prev, stats)
m['X.Tstart'] = stats_prev.timestamp
m['X.δT'] = stats.timestamp - stats_prev.timestamp
# δcc(counter) tells how specified cumulative counter changed since last stats result.
def δcc(counter):
old = _stats_cc(stats_prev, counter)
new = _stats_cc(stats, counter)
if new < old:
raise LogError(stats.timestamp, "cc %s↓ (%s → %s)" % (counter, old, new))
return new - old
# m_initfini populates m[init] and m[fini] from vinit and vfini values.
# copy of previous ._m[fini] is correspondingly adjusted for init/fini correction.
p = None
if logm._m is not None:
p = logm._m.copy()
def m_initfini(init, vinit, fini, vfini):
m[init] = vinit
m[fini] = vfini
# take as much as possible from current fini to populate prev fini.
# this way we expose moved fini events as appearing in previous
# period, and, with correct values coming from xlog, will have to
# throw-away (see below for "too much" case) as minimum as possible
# fini events. And even though we don't know exactly how many moved fini
# was from previous period, and how much was actually from current
# period, tossing fini values in between those periods should not change
# overall statistics if it is computed taking both periods into account.
if p is not None:
if p[fini] < p[init]:
δ = min(p[init]-p[fini], m[fini])
p[fini] += δ
m[fini] -= δ
# if we still have too much fini - throw it away pretending that it
# came from even older uncovered period
if m[fini] > m[init]:
m[fini] = m[init]
# compute δ for counters.
# any logic error in data will be reported via LogError.
try:
# RRC: connection establishment
m_initfini(
'RRC.ConnEstabAtt.sum', δcc('rrc_connection_request'),
'RRC.ConnEstabSucc.sum', δcc('rrc_connection_setup_complete'))
# S1: connection establishment
m_initfini(
'S1SIG.ConnEstabAtt', δcc('s1_initial_context_setup_request'),
'S1SIG.ConnEstabSucc', δcc('s1_initial_context_setup_response'))
# ERAB: Initial establishment
# FIXME not correct if multiple ERABs are present in one message
m_initfini(
'ERAB.EstabInitAttNbr.sum', δcc('s1_initial_context_setup_request'),
'ERAB.EstabInitSuccNbr.sum', δcc('s1_initial_context_setup_response'))
# ERAB: Additional establishment
# FIXME not correct if multiple ERABs are present in one message
m_initfini(
'ERAB.EstabAddAttNbr.sum', δcc('s1_erab_setup_request'),
'ERAB.EstabAddSuccNbr.sum', δcc('s1_erab_setup_response'))
except Exception as e:
if not isinstance(e, LogError):
_ = e
e = LogError(stats.timestamp, "internal failure")
e.__cause__ = _
logm._stats = None
logm._event = e
return None
# all adjustments and checks are over.
logm._m = m # we can now remember pre-built Measurement for current stats,
return p # and return adjusted previous measurement, if it was there.
# _stats_check verifies stats message to have required structure.
#
# only configurations with one single cell are supported.
# ( because else it would not be clear to which cell to associate e.g. global
# counters for S1 messages )
def _stats_check(stats: xlog.Message):
cells = stats['cells']
if len(cells) != 1:
raise LogError(stats.timestamp, "stats describes %d cells; but only single-cell configurations are supported" % len(cells))
cellname = list(cells.keys())[0]
try:
stats.get1("counters", dict).get1("messages", dict)
stats.get1("cells", dict).get1(cellname, dict).get1("counters", dict).get1("messages", dict)
except Exception as e:
raise LogError(stats.timestamp, "stats: %s" % e) from None
return
# _stats_cc returns specified cumulative counter from stats result.
#
# counter may be both "global" or "per-cell".
# stats is assumed to be already verified by _stats_check.
def _stats_cc(stats: xlog.Message, counter: str):
cells = stats['cells']
cell = list(cells.values())[0]
if counter.startswith("rrc_"):
cc_dict = cell ['counters']
else:
cc_dict = stats['counters']
return cc_dict['messages'].get(counter, 0)
# LogError(timestamp|None, *argv).
@func(LogError)
def __init__(e, τ, *argv):
e.timestamp = τ
super(LogError, e).__init__(*argv)
# __str__ returns human-readable form.
@func(LogError)
def __str__(e):
t = "?"
if e.timestamp is not None:
t = "%s" % e.timestamp
return "t%s: %s" % (t, super(LogError, e).__str__())
# LogError.EOF is special LogError value to represent EOF event.
LogError.EOF = LogError(None, "EOF")
# ----------------------------------------
_debug = False
def _trace(*argv):
if _debug:
print(*argv)
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from xlte.amari.kpi import LogMeasure, LogError, _trace as trace
from xlte.kpi import Measurement
from golang import func, defer, b
import io, json
from pytest import raises
# tLogMeasure provides LogMeasure testing environment.
#
# It organizes IO streams for enb.xlog and (TODO) enb.log, and connects those
# streams to LogMeasure. It then allows to verify Measurements read from LogMeasure.
#
# Use .xlog() to append a line to enb.xlog.
# Use .expect*() to assert that the next read Measurement should have particular values.
# Use .read() to read next measurement and to verify it to match previous expect* calls.
#
# tLogMeasure must be explicitly closed once no longer used.
class tLogMeasure:
# ._fxlog IO stream with enb.xlog data
# ._logm LogMeasure(._fxlog)
# ._mok None|Measurement built via expect* calls
def __init__(t):
t._fxlog = io.BytesIO(b"""\
{"meta": {"event": "start", "time": 0.01, "generator": "xlog ws://localhost:9001 stats[]/30.0s"}}
{"meta": {"event": "service attach", "time": 0.02, "srv_name": "ENB", "srv_type": "ENB", "srv_version": "2022-12-01"}}
""")
t._logm = LogMeasure(t._fxlog, ionone())
t._mok = None
# close performs last scheduled checks and makes sure further reading from
# LogMeasure results in EOF. It then closes it.
@func
def close(t):
defer(t._logm.close)
t.read() # verify last expects, if any
for i in range(10): # now should get EOF
assert t._mok == None
t.read()
# xlog appends one line to enb.xlog.
def xlog(t, line):
line = b(line)
assert b'\n' not in line
pos = t._fxlog.tell()
t._fxlog.seek(0, io.SEEK_END)
t._fxlog.write(b'%s\n' % line)
t._fxlog.seek(pos, io.SEEK_SET)
# _mok_init reinitializes ._mok with Measurement defaults.
def _mok_init(t):
t._mok = Measurement()
# init fields handled by amari.kpi to 0
# this will be default values to verify against
for field in (
'RRC.ConnEstabAtt.sum',
'RRC.ConnEstabSucc.sum',
'S1SIG.ConnEstabAtt',
'S1SIG.ConnEstabSucc',
'ERAB.EstabInitAttNbr.sum',
'ERAB.EstabInitSuccNbr.sum',
'ERAB.EstabAddAttNbr.sum',
'ERAB.EstabAddSuccNbr.sum',
):
t._mok[field] = 0
# expect1 requests to verify one field to have expected value.
# the verification itself will happen on next read() call.
def expect1(t, field, vok):
if t._mok is None:
t._mok_init()
t._mok[field] = vok
# expect_nodata requests to verify all fields besides timestamp-related to be NA.
def expect_nodata(t):
if t._mok is None:
t._mok = Measurement()
tstart = t._mok['X.Tstart']
δt = t._mok['X.δT']
t._mok = Measurement() # reinit with all NA
t._mok['X.Tstart'] = tstart
t._mok['X.δT'] = δt
# read retrieves next measurement from LogMeasure and verifies it to be as expected.
def read(t): # -> Measurement
try:
m = t._logm.read()
assert type(m) is type(t._mok)
assert m == t._mok
return m
finally:
t._mok = None
# verify LogMeasure works ok on normal input.
@func
def test_LogMeasure():
t = tLogMeasure()
defer(t.close)
_ = t.expect1
# empty stats after first attach
t.xlog( jstats(0.7, {}) )
_('X.Tstart', 0.02)
_('X.δT', 0.7-0.02)
t.expect_nodata()
t.read()
# further empty stats
t.xlog( jstats(1.0, {}) )
_('X.Tstart', 0.7)
_('X.δT', 1-0.7)
_('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0)
_('S1SIG.ConnEstabAtt', 0)
_('S1SIG.ConnEstabSucc', 0)
_('ERAB.EstabInitAttNbr.sum', 0)
_('ERAB.EstabInitSuccNbr.sum', 0)
_('ERAB.EstabAddAttNbr.sum', 0)
_('ERAB.EstabAddSuccNbr.sum', 0)
# tstats is the verb to check handling of stats message.
#
# it xlogs next stats(counters) and reads back new measurement via t.read().
#
# NOTE t.read goes 2 steps behind corresponding t.xlog call. This is on
# purpose to sync emitting xlog entries with corresponding checks in test
# code, because, as illustrated on the following figure, Measurement₁ is
# emitted only after xlog₃ entry becomes available:
#
# xlog₁ xlog₂ xlog₃
# ──|───────|───────|─────
# |Measure|Measure|
# | ment₁ | ment₂ |
#
# As the result it allows to write testing code as:
#
# tstats(counters)
# _(...) # verify effect on Measurements returned with period
# _(...) # ending by timestamp of the above stats call.
# _(...) # i.e. Measurement₁ if tstats call corresponds to xlog₂.
τ_xlog = 1 # timestamp of last emitted xlog entry
τ_logm = τ_xlog-2+1 # timestamp of next measurement to be read from logm
counters_prev = {}
def tstats(counters):
nonlocal τ_xlog, τ_logm, counters_prev
trace('\n>>> tstats τ_xlog: %s τ_logm: %s' % (τ_xlog, τ_logm))
t.xlog( jstats(τ_xlog+1, counters) ) # xlog τ+1
t.read() # read+assert M for τ-1
_('X.Tstart', τ_logm+1) # start preparing next expected M at τ
_('X.δT', 1)
τ_xlog += 1
τ_logm += 1
counters_prev = counters
# tδstats is like tstats but takes δ for counters.
def tδstats(δcounters):
counters = counters_prev.copy()
for k,δv in δcounters.items():
counters[k] = counters.get(k,0) + δv
tstats(counters)
# tevent is the verb to verify handling of events.
# its logic is similar to tstats.
def tevent(event):
nonlocal τ_xlog, τ_logm, counters_prev
trace('\n>>> tstats τ_xlog: %s τ_logm: %s' % (τ_xlog, τ_logm))
t.xlog(json.dumps({"meta": {"event": event, "time": τ_xlog+1}}))
t.read()
_('X.Tstart', τ_logm+1)
_('X.δT', 1)
τ_xlog += 1
τ_logm += 1
counters_prev = {} # reset
# RRC.ConnEstab
#
# For init/fini correction LogMeasure accounts termination events in the
# period of corresponding initiation event. We check this in detail for
# RRC.ConnEstab, but only lightly for other measurements. This should be ok
# since LogMeasure internally uses the same m_initfini function for
# init/fini correcting all values.
#
# ₁ p1 ₂ p2 ₃ p3 ₄ p4 ₅ p5 ₆
# ──|──────|─────|─────|─────|─────|─────
# init 0 3 2 5 0
# fini ø ←─── 2 1←─── 2←─── 4←─── 1
# fini' 0 3 ² 2 ² 3 ¹ 0
tstats({'rrc_connection_request': 0,
'rrc_connection_setup_complete': 2}) # completions for previous uncovered period
_('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0) # not 2
# p2
tstats({'rrc_connection_request': 0 +3, # 3 new initiations
'rrc_connection_setup_complete': 2 +1}) # 1 new completion
_('RRC.ConnEstabAtt.sum', 3)
_('RRC.ConnEstabSucc.sum', 3) # not 1
# p3
tstats({'rrc_connection_request': 0+3 +2, # 2 new initiations
'rrc_connection_setup_complete': 2+1 +2}) # 2 completions for p2
_('RRC.ConnEstabAtt.sum', 2)
_('RRC.ConnEstabSucc.sum', 2) # 2, but it is 2 - 2(for_p2) + 2(from_p4)
# p4
tstats({'rrc_connection_request': 0+3+2 +5, # 5 new initiations
'rrc_connection_setup_complete': 2+1+2 +4}) # 2 completions for p3 + 2 new
_('RRC.ConnEstabAtt.sum', 5)
_('RRC.ConnEstabSucc.sum', 3)
# p5
tstats({'rrc_connection_request': 0+3+2+5 +0, # no new initiations
'rrc_connection_setup_complete': 2+1+2+4 +1}) # 1 completion for p4
_('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0)
# S1SIG.ConnEstab, ERAB.InitEstab
tδstats({'s1_initial_context_setup_request': +3,
's1_initial_context_setup_response': +2})
_('S1SIG.ConnEstabAtt', 3)
_('S1SIG.ConnEstabSucc', 3) # 2 + 1(from_next)
_('ERAB.EstabInitAttNbr.sum', 3) # currently same as S1SIG.ConnEstab
_('ERAB.EstabInitSuccNbr.sum', 3) # ----//----
tδstats({'s1_initial_context_setup_request': +4,
's1_initial_context_setup_response': +3})
_('S1SIG.ConnEstabAtt', 4)
_('S1SIG.ConnEstabSucc', 2) # 3 - 1(to_prev)
_('ERAB.EstabInitAttNbr.sum', 4) # currently same as S1SIG.ConnEstab
_('ERAB.EstabInitSuccNbr.sum', 2) # ----//----
# ERAB.EstabAdd
tδstats({'s1_erab_setup_request': +1,
's1_erab_setup_response': +1})
_('ERAB.EstabAddAttNbr.sum', 1)
_('ERAB.EstabAddSuccNbr.sum', 1)
tδstats({'s1_erab_setup_request': +3,
's1_erab_setup_response': +2})
_('ERAB.EstabAddAttNbr.sum', 3)
_('ERAB.EstabAddSuccNbr.sum', 2)
# service detach/attach, connect failure, xlog failure
tδstats({}) # untie from previous history
i, f = 'rrc_connection_request', 'rrc_connection_setup_complete'
I, F = 'RRC.ConnEstabAtt.sum', 'RRC.ConnEstabSucc.sum'
tδstats({i:2, f:1})
_(I, 2)
_(F, 2) # +1(from_next)
tδstats({i:2, f:2})
_(I, 2)
_(F, 1) # -1(to_prev)
tevent("service detach")
t.expect_nodata()
t.read() # LogMeasure flushes its queue on "service detach".
_('X.Tstart', τ_logm+1) # After the flush t.read will need to go only 1 step behind
_('X.δT', 1) # corresponding t.xlog call instead of previously going 2 steps beyond.
t.expect_nodata() # Do one t.read step manually to catch up.
τ_logm += 1
tevent("service connect failure")
t.expect_nodata()
tevent("service connect failure")
t.expect_nodata()
tevent("xlog failure")
t.expect_nodata()
tevent("xlog failure")
t.expect_nodata()
tevent("service attach")
t.expect_nodata()
t.xlog( jstats(τ_xlog+1, {i:1000, f:1000}) ) # LogMeasure restarts the queue after data starts to
τ_xlog += 1 # come in again. Do one t.xlog step manually to
# increase t.read - t.xlog distance back to 2.
tstats({i:1000+2, f:1000+2})
_(I, 2) # no "extra" events even if counters start with jumped values after reattach
_(F, 2) # and no fini correction going back through detach
tevent("service detach") # detach right after attach
t.expect_nodata()
tevent("service attach")
t.expect_nodata()
tevent("service detach")
t.expect_nodata()
# verify that only stats with single cell and expected structure are accepted.
@func
def test_LogMeasure_badinput():
t = tLogMeasure()
defer(t.close)
_ = t.expect1
cc = 'rrc_connection_request'
CC = 'RRC.ConnEstabAtt.sum'
# initial ok entries
t.xlog( jstats(1, {}) )
t.xlog( jstats(2, {cc: 2}) )
t.xlog( jstats(3, {cc: 2+3}) )
# bad: not single cell
t.xlog('{"message":"stats", "utc":11, "cells": {}}')
t.xlog('{"message":"stats", "utc":12, "cells": {}}')
t.xlog('{"message":"stats", "utc":13, "cells": {"a": {}, "b": {}}}')
t.xlog('{"message":"stats", "utc":14, "cells": {"a": {}, "b": {}, "c": {}}}')
# bad: no counters
t.xlog('{"message":"stats", "utc":21, "counters": {"messages": {}}, "cells": {"1": {}}}')
t.xlog('{"message":"stats", "utc":22, "counters": {"messages": {}}, "cells": {"1": {"counters": {}}}}')
t.xlog('{"message":"stats", "utc":23, "cells": {"1": {"counters": {"messages": {}}}}}')
t.xlog('{"message":"stats", "utc":24, "counters": {}, "cells": {"1": {"counters": {"messages": {}}}}}')
# follow-up ok entries
t.xlog( jstats(31, {cc: 30+4}) )
t.xlog( jstats(32, {cc: 30+4+5}) )
# badline 1
t.xlog( "zzzqqqrrr" )
# more ok entries
t.xlog( jstats(41, {cc: 40+6}) )
t.xlog( jstats(42, {cc: 40+6+7}) )
# badline 2 + followup event
t.xlog( "hello world" )
t.xlog( '{"meta": {"event": "service attach", "time": 50}}' )
# more ok entries
t.xlog( jstats(51, {cc: 50+8}) )
t.xlog( jstats(52, {cc: 50+8+9}) )
def readok(τ, CC_value):
_('X.Tstart', τ)
_('X.δT', 1)
_(CC, CC_value)
t.read()
def read_nodata(τ, δτ=1):
_('X.Tstart', τ)
_('X.δT', δτ)
t.expect_nodata()
t.read()
read_nodata(0.02, 0.98) # attach-1
readok(1, 2) # 1-2
readok(2, 3) # 2-3
read_nodata(3, 8) # 3-11
def tbadcell(τ, ncell):
with raises(LogError, match="t%s: stats describes %d cells;" % (τ, ncell) +
" but only single-cell configurations are supported"):
t.read()
tbadcell(11, 0)
tbadcell(12, 0)
tbadcell(13, 2)
tbadcell(14, 3)
def tbadstats(τ, error):
with raises(LogError, match="t%s: stats: %s" % (τ, error)):
t.read()
tbadstats(21, ":10/cells/1 no `counters`")
tbadstats(22, ":11/cells/1/counters no `messages`")
tbadstats(23, ":12/ no `counters`")
tbadstats(24, ":13/counters no `messages`")
readok(31, 5) # 31-32
def tbadline():
with raises(LogError, match="t?: invalid json"):
t.read()
tbadline() # badline 1
readok(41, 7) # 41-42
tbadline() # badline 2
read_nodata(50) # 50-51
readok(51, 9) # 51-52
# verify that counter wrap-arounds are reported as errors.
@func
def test_LogMeasure_cc_wraparound():
t = tLogMeasure()
defer(t.close)
_ = t.expect1
cc = 'rrc_connection_request'
CC = 'RRC.ConnEstabAtt.sum'
t.xlog( jstats(1, {}) )
t.xlog( jstats(2, {cc: 13}) )
t.xlog( jstats(3, {cc: 12}) ) # cc↓ - should be reported
t.xlog( jstats(4, {cc: 140}) ) # cc↑↑ - should should start afresh
t.xlog( jstats(5, {cc: 150}) )
def readok(τ, CC_value):
_('X.Tstart', τ)
_('X.δT', 1)
_(CC, CC_value)
t.read()
_('X.Tstart', 0.02) # attach-1
_('X.δT', 0.98)
t.expect_nodata()
t.read()
readok(1, 13) # 1-2
with raises(LogError, match=r"t3: cc %s↓ \(13 → 12\)" % cc):
t.read() # 2-3
readok(4, 10) # 4-5
# jstats returns json-encoded stats message corresponding to counters dict.
# τ goes directly to stats['utc'] as is.
def jstats(τ, counters): # -> str
g_cc = {} # global
cell_cc = {} # per-cell
for cc, value in counters.items():
if cc.startswith("rrc_"):
cell_cc[cc] = value
else:
g_cc[cc] = value
s = {
"message": "stats",
"utc": τ,
"cells": {"1": {"counters": {"messages": cell_cc}}},
"counters": {"messages": g_cc},
}
return json.dumps(s)
def test_jstats():
assert jstats(0, {}) == '{"message": "stats", "utc": 0, "cells": {"1": {"counters": {"messages": {}}}}, "counters": {"messages": {}}}'
assert jstats(123.4, {"rrc_x": 1, "s1_y": 2, "rrc_z": 3, "x2_zz": 4}) == \
'{"message": "stats", "utc": 123.4, "cells": {"1": {"counters": {"messages": {"rrc_x": 1, "rrc_z": 3}}}}, "counters": {"messages": {"s1_y": 2, "x2_zz": 4}}}'
# ionone returns empty data source.
def ionone():
return io.BytesIO(b'')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment