Commit fd7870f4 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.kpi: Rework LogMeasure to prepare Measurement incrementally

We added LogMeasure in 71087f67 (amari.kpi: New package with driver for
Amarisoft LTE stack to retrieve KPI-related measurements from logs) and
its original logic is to read `stats` messages and to create Measurement
that covers [Sx, Sx+1) only after seeing Sx+1.

However in the next patch we will need to also take into account other
smaller messages besides stats, and for that messages we need
being-prepared Measurement to already exist to be able to amend it with
partial data we see. So we need to rework the process to create
Measurement that will cover [Sx, Sx+1) right after seeing Sx without
waiting for Sx+1 to come in.

This patch does that.

Along the way it unifies how events and stats are handled. Previously
events and stats were handled via different objects and the code had many
scattered places that tried to handle cases like event-event,
event-stats, stats-event and stats-stats. And for all those cases the
intent was that we still want to emit corresponding Measurement for all
of them, even if maybe if all NA data besides timestamps. Thus it does
not make sense to split events and stats into different flows - as we can
handle all combinations by considering just one flow of "stats or
events". This simplifies logic and removes several sporadic branches
of code to emit M(ø) around events. It also discovers several places
where we were not emitting such M(ø) even though the intent was to do
so. All this is fixed now with updated tests.
parent 5bf7dc1c
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors. # Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your # it under the terms of the GNU General Public License version 3, or (at your
...@@ -46,9 +46,11 @@ class LogMeasure: ...@@ -46,9 +46,11 @@ class LogMeasure:
# ._rxlog IO reader for enb.xlog # ._rxlog IO reader for enb.xlog
# ._rlog IO reader for enb.log # ._rlog IO reader for enb.log
# #
# ._event currently handled xlog.Event | LogError | None # ._estats \/ last xlog.Message with read stats result
# ._stats currently handled xlog.Message with last read stats result | None # \/ last xlog.Event | LogError
# ._m kpi.Measurement being prepared covering [_stats_prev, _stats) | None # \/ None
# ._m kpi.Measurement being prepared covering [_estats_prev, _estats) | None
# ._m_next kpi.Measurement being prepared covering [_estats, _estats_next) | None
pass pass
...@@ -61,9 +63,9 @@ class LogMeasure: ...@@ -61,9 +63,9 @@ class LogMeasure:
def __init__(logm, rxlog, rlog): def __init__(logm, rxlog, rlog):
logm._rxlog = xlog.Reader(rxlog) logm._rxlog = xlog.Reader(rxlog)
logm._rlog = rlog logm._rlog = rlog
logm._event = None logm._estats = None
logm._stats = None
logm._m = None logm._m = None
logm._m_next = None
# close releases resources associated with LogMeasure and closes underlying readers. # close releases resources associated with LogMeasure and closes underlying readers.
@func(LogMeasure) @func(LogMeasure)
...@@ -77,6 +79,7 @@ def close(logm): ...@@ -77,6 +79,7 @@ def close(logm):
# It reads data from enb.xlog (TODO and enb.log) as needed. # It reads data from enb.xlog (TODO and enb.log) as needed.
@func(LogMeasure) @func(LogMeasure)
def read(logm): # -> kpi.Measurement | None def read(logm): # -> kpi.Measurement | None
_trace('\n\n LogMeasure.read')
m = logm._read() m = logm._read()
_trace(' <-', m) _trace(' <-', m)
return m return m
...@@ -99,41 +102,41 @@ def _read(logm): ...@@ -99,41 +102,41 @@ def _read(logm):
# #
# #
# (*) see kpi.Measurement documentation for more details about init/fini correction. # (*) see kpi.Measurement documentation for more details about init/fini correction.
m = None # kpi.Measurement to return
while 1: while 1:
_trace() _trace()
_trace('._event:\t', logm._event) _trace('m: \t', m)
_trace('._stats:\t', logm._stats) _trace('._m: \t', logm._m)
_trace('._m: \t', logm._m) _trace('._estats:\t', logm._estats)
_trace('._m_next:\t', logm._m_next)
# flush the queue fully at an error or an event, e.g. at "service detach".
event = logm._event if m is not None:
if event is not None: return m
# <- M for [stats_prev, stats)
# flush the queue at an error or an event, e.g. at "service detach".
estats = logm._estats
if isinstance(estats, (xlog.Event, LogError)):
# <- M for [estats_prev, estats)
m = logm._m m = logm._m
if m is not None: if m is not None:
logm._m = None logm._m = None
return m return m
# <- M(ø) for [stats, event) # note ._m_next is not flushed:
stats = logm._stats # if ._m_next != None - it remains initialized with X.Tstart = estats.timestamp
if stats is not None:
logm._stats = None
if event.timestamp is not None:
m = kpi.Measurement()
m['X.Tstart'] = stats.timestamp
m['X.δT'] = event.timestamp - stats.timestamp
return m
# <- error|EOF # <- error|EOF
if isinstance(event, LogError): if isinstance(estats, LogError):
logm._event = None logm._estats = None
if event is LogError.EOF: if estats is LogError.EOF:
return None return None
raise event raise estats
# queue should be fully flushed now # queue should be flushed now till including estats with
assert logm._stats is None # event remaining non-none, e.g. "service detach", but not an error
assert logm._m is None assert logm._m is None
# event might remain non-none, e.g. "service detach", but not an error assert isinstance(logm._estats, xlog.Event)
assert isinstance(event, xlog.Event) assert isinstance(logm._m_next, kpi.Measurement)
assert logm._m_next['X.Tstart'] == logm._estats.timestamp
# fetch next entry from xlog # fetch next entry from xlog
...@@ -145,35 +148,49 @@ def _read(logm): ...@@ -145,35 +148,49 @@ def _read(logm):
if x is None: if x is None:
x = LogError.EOF # represent EOF as LogError x = LogError.EOF # represent EOF as LogError
if isinstance(x, LogError):
logm._event = x # it is ok to forget previous event after e.g. bad line with ParseError
continue # flush the queue
elif isinstance(x, xlog.Event):
event_prev = logm._event
logm._event = x
if event_prev is None:
continue # flush
# <- M(ø) for [event_prev, event)
assert event_prev.timestamp is not None # LogErrors are raised after queue flush
m = kpi.Measurement()
m['X.Tstart'] = event_prev.timestamp
m['X.δT'] = x.timestamp - event_prev.timestamp
return m
assert isinstance(x, xlog.Message) # handle messages that update current Measurement
if x.message != "stats": if isinstance(x, xlog.Message):
continue if x.message != "stats":
continue # ignore other messages
# it is an error, event or stats.
# if it is an event or stats -> finalize timestamp for _m_next.
# start building next _m_next covering [x, x_next).
# shift m <- ._m <- ._m_next <- (new Measurement | None for LogError)
# a LogError throws away preceding Measurement and does not start a new one after it
if logm._m_next is not None:
if not isinstance(x, LogError):
logm._m_next['X.δT'] = x.timestamp - logm._m_next['X.Tstart']
else:
logm._m_next = None # throw it away on seeing e.g. "stats, error"
m = logm._m
logm._m = logm._m_next
if not isinstance(x, LogError):
logm._m_next = kpi.Measurement()
logm._m_next['X.Tstart'] = x.timestamp # note X.δT remains NA until next stats|event
else:
logm._m_next = None
if isinstance(x, (xlog.Event, LogError)):
logm._estats = x # it is ok to forget previous event after e.g. bad line with ParseError
continue # flush the queue
m = logm._read_stats(x) assert isinstance(x, xlog.Message)
if m is not None: assert x.message == "stats"
return m logm._handle_stats(x, m)
# NOTE _handle_stats indicates logic error in x by setting ._estats to
# LogError instead of stats. However those LogErrors come with
# timestamp and are thus treated similarly to events: we do not throw
# away neither ._m, nor ._m_next like we do with LogErrors that
# represent errors at the log parsing level.
continue continue
# _read_stats handles next stats xlog entry upon _read request.
# _handle_stats handles next stats xlog entry upon _read request.
@func(LogMeasure) @func(LogMeasure)
def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry) def _handle_stats(logm, stats: xlog.Message, m_prev: kpi.Measurement):
# build Measurement from stats' counters. # build Measurement from stats' counters.
# #
# we take δ(stats_prev, stat) and process it mapping Amarisoft counters to # we take δ(stats_prev, stat) and process it mapping Amarisoft counters to
...@@ -207,37 +224,26 @@ def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry) ...@@ -207,37 +224,26 @@ def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry)
try: try:
_stats_check(stats) _stats_check(stats)
except LogError as e: except LogError as e:
event_prev = logm._event logm._estats = e # stays M(ø) for [estats_prev, bad_stats)
logm._event = e return
if event_prev is not None:
# <- M(ø) for [event, bad_stats)
m = kpi.Measurement()
m['X.Tstart'] = event_prev.timestamp
m['X.δT'] = stats.timestamp - event_prev.timestamp
return m
return None # flush
# stats is pre-checked to be good. push it to the queue. # stats is pre-checked to be good. push it to the queue.
stats_prev = logm._stats estats_prev = logm._estats
logm._stats = stats logm._estats = stats
# first stats after service attach -> M(ø) # first stats after e.g. service attach -> stays M(ø) for [event_prev, stats)
if stats_prev is None: if estats_prev is None:
event_prev = logm._event return
if event_prev is not None: if isinstance(estats_prev, (xlog.Event, LogError)):
# <- M(ø) for [event, stats) return
logm._event = None
m = kpi.Measurement()
m['X.Tstart'] = event_prev.timestamp
m['X.δT'] = stats.timestamp - event_prev.timestamp
return m
return None
# we have 2 adjacent stats. Start building new Measurement from their δ. assert isinstance(estats_prev, xlog.Message)
assert estats_prev.message == "stats"
stats_prev = estats_prev
# we have 2 adjacent stats. Adjust corresponding Measurement from their δ.
# do init/fini correction if there was also third preceding stats message. # do init/fini correction if there was also third preceding stats message.
m = kpi.Measurement() # [stats_prev, stats) m = logm._m.copy() # [stats_prev, stats)
m['X.Tstart'] = stats_prev.timestamp
m['X.δT'] = stats.timestamp - stats_prev.timestamp
# δcc(counter) tells how specified cumulative counter changed since last stats result. # δcc(counter) tells how specified cumulative counter changed since last stats result.
def δcc(counter): def δcc(counter):
...@@ -250,8 +256,8 @@ def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry) ...@@ -250,8 +256,8 @@ def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry)
# m_initfini populates m[init] and m[fini] from vinit and vfini values. # m_initfini populates m[init] and m[fini] from vinit and vfini values.
# copy of previous ._m[fini] is correspondingly adjusted for init/fini correction. # copy of previous ._m[fini] is correspondingly adjusted for init/fini correction.
p = None p = None
if logm._m is not None: if m_prev is not None:
p = logm._m.copy() p = m_prev.copy()
def m_initfini(init, vinit, fini, vfini): def m_initfini(init, vinit, fini, vfini):
m[init] = vinit m[init] = vinit
m[fini] = vfini m[fini] = vfini
...@@ -303,13 +309,14 @@ def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry) ...@@ -303,13 +309,14 @@ def _read_stats(logm, stats: xlog.Message): # -> kpi.Measurement|None(to retry)
_ = e _ = e
e = LogError(stats.timestamp, "internal failure") e = LogError(stats.timestamp, "internal failure")
e.__cause__ = _ e.__cause__ = _
logm._stats = None logm._estats = e
logm._event = e return
return None
# all adjustments and checks are over. # all adjustments and checks are over.
logm._m = m # we can now remember pre-built Measurement for current stats, logm._m = m # we can now remember our Measurement adjustments for current stats,
return p # and return adjusted previous measurement, if it was there. if m_prev is not None: # and commit adjustments to previous measurement, if it was there.
m_prev.put((0,), p) # copy m_prev <- p
return
# _stats_check verifies stats message to have required structure. # _stats_check verifies stats message to have required structure.
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors. # Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your # it under the terms of the GNU General Public License version 3, or (at your
...@@ -61,6 +61,7 @@ class tLogMeasure: ...@@ -61,6 +61,7 @@ class tLogMeasure:
# xlog appends one line to enb.xlog. # xlog appends one line to enb.xlog.
def xlog(t, line): def xlog(t, line):
trace('xlog += %s' % line)
line = b(line) line = b(line)
assert b'\n' not in line assert b'\n' not in line
pos = t._fxlog.tell() pos = t._fxlog.tell()
...@@ -121,24 +122,12 @@ def test_LogMeasure(): ...@@ -121,24 +122,12 @@ def test_LogMeasure():
_ = t.expect1 _ = t.expect1
# empty stats after first attach # empty stats after first attach
t.xlog( jstats(0.7, {}) ) t.xlog( jstats(1, {}) )
_('X.Tstart', 0.02) _('X.Tstart', 0.02)
_('X.δT', 0.7-0.02) _('X.δT', 1-0.02)
t.expect_nodata() t.expect_nodata()
t.read() # note: no t.read() - see tstats
# further empty stats
t.xlog( jstats(1.0, {}) )
_('X.Tstart', 0.7)
_('X.δT', 1-0.7)
_('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0)
_('S1SIG.ConnEstabAtt', 0)
_('S1SIG.ConnEstabSucc', 0)
_('ERAB.EstabInitAttNbr.sum', 0)
_('ERAB.EstabInitSuccNbr.sum', 0)
_('ERAB.EstabAddAttNbr.sum', 0)
_('ERAB.EstabAddSuccNbr.sum', 0)
# tstats is the verb to check handling of stats message. # tstats is the verb to check handling of stats message.
# #
...@@ -195,6 +184,21 @@ def test_LogMeasure(): ...@@ -195,6 +184,21 @@ def test_LogMeasure():
counters_prev = {} # reset counters_prev = {} # reset
# further empty stats
tstats({})
_('X.Tstart', 1)
_('X.δT', 1)
_('RRC.ConnEstabAtt.sum', 0)
_('RRC.ConnEstabSucc.sum', 0)
_('S1SIG.ConnEstabAtt', 0)
_('S1SIG.ConnEstabSucc', 0)
_('ERAB.EstabInitAttNbr.sum', 0)
_('ERAB.EstabInitSuccNbr.sum', 0)
_('ERAB.EstabAddAttNbr.sum', 0)
_('ERAB.EstabAddSuccNbr.sum', 0)
# RRC.ConnEstab # RRC.ConnEstab
# #
# For init/fini correction LogMeasure accounts termination events in the # For init/fini correction LogMeasure accounts termination events in the
...@@ -373,17 +377,25 @@ def test_LogMeasure_badinput(): ...@@ -373,17 +377,25 @@ def test_LogMeasure_badinput():
" but only single-cell configurations are supported"): " but only single-cell configurations are supported"):
t.read() t.read()
tbadcell(11, 0) tbadcell(11, 0)
read_nodata(11, 1)
tbadcell(12, 0) tbadcell(12, 0)
read_nodata(12, 1)
tbadcell(13, 2) tbadcell(13, 2)
read_nodata(13, 1)
tbadcell(14, 3) tbadcell(14, 3)
def tbadstats(τ, error): def tbadstats(τ, error):
with raises(LogError, match="t%s: stats: %s" % (τ, error)): with raises(LogError, match="t%s: stats: %s" % (τ, error)):
t.read() t.read()
read_nodata(14, 7)
tbadstats(21, ":10/cells/1 no `counters`") tbadstats(21, ":10/cells/1 no `counters`")
read_nodata(21, 1)
tbadstats(22, ":11/cells/1/counters no `messages`") tbadstats(22, ":11/cells/1/counters no `messages`")
read_nodata(22, 1)
tbadstats(23, ":12/ no `counters`") tbadstats(23, ":12/ no `counters`")
read_nodata(23, 1)
tbadstats(24, ":13/counters no `messages`") tbadstats(24, ":13/counters no `messages`")
read_nodata(24, 7)
readok(31, 5) # 31-32 readok(31, 5) # 31-32
def tbadline(): def tbadline():
...@@ -414,18 +426,19 @@ def test_LogMeasure_cc_wraparound(): ...@@ -414,18 +426,19 @@ def test_LogMeasure_cc_wraparound():
def readok(τ, CC_value): def readok(τ, CC_value):
_('X.Tstart', τ) _('X.Tstart', τ)
_('X.δT', 1) _('X.δT', int(τ+1)-τ)
_(CC, CC_value) if CC_value is not None:
_(CC, CC_value)
else:
t.expect_nodata()
t.read() t.read()
_('X.Tstart', 0.02) # attach-1 readok(0.02, None) # attach-1
_('X.δT', 0.98)
t.expect_nodata()
t.read()
readok(1, 13) # 1-2 readok(1, 13) # 1-2
readok(2, None) # 2-3 M(ø)
with raises(LogError, match=r"t3: cc %s↓ \(13 → 12\)" % cc): with raises(LogError, match=r"t3: cc %s↓ \(13 → 12\)" % cc):
t.read() # 2-3 t.read() # 2-3 raise
readok(3, None) # 3-4 M(ø)
readok(4, 10) # 4-5 readok(4, 10) # 4-5
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment