Commit 3a35162b authored by Kirill Smelkov's avatar Kirill Smelkov

kpi: Establish data model for DRB.IPLatDl and DRB.UEActive

3GPP says that this values are averages over collected samples and over
observed time period. But if we are to follow 3GPP as-is there is a problem how
to aggregate two Measurements corresponding to two different periods into one
bigger Mesurement that corresponds to combined time period.

-> Solve that by introducing "statistical profile" types and use them for DRB.IPLatDl and DRB.UEActive;
-> Teach Calc.aggregate to aggregate such statistical profiles via corresponding math.

See individual patches for details.

/reviewed-by @paul.graydon
/reviewed-on kirr/xlte!9
parents 7164e99c 6d2694d8
......@@ -355,7 +355,7 @@
"source": [
"Let's now look at <u>raw counters</u>.\n",
"\n",
"Each Measurement comes with counters measured during particular interval. To get total values we need to aggregate them throughout all observation time via `Calc.sum`. Let's use already-loaded MeasurementLog data to showcase this:"
"Each Measurement comes with counters measured during particular interval. To get total values we need to aggregate them throughout all observation time via `Calc.aggregate`. Let's use already-loaded MeasurementLog data to showcase this:"
]
},
{
......@@ -371,7 +371,7 @@
"mhead = mlog.data()[0]\n",
"mtail = mlog.data()[-1]\n",
"calc_total = kpi.Calc(mlog, mhead['X.Tstart'], mtail['X.Tstart']+mtail['X.δT'])\n",
"Σ = calc_total.sum()"
"Σ = calc_total.aggregate()"
]
},
{
......
......@@ -136,7 +136,7 @@ def main():
mhead = mlog.data()[0]
mtail = mlog.data()[-1]
calc_total = kpi.Calc(mlog, mhead['X.Tstart'], mtail['X.Tstart']+mtail['X.δT'])
Σ = calc_total.sum()
Σ = calc_total.aggregate()
print_ΣMeasurement(Σ)
......
......@@ -21,7 +21,7 @@
- Calc is KPI calculator. It can be instantiated on MeasurementLog and time
interval over which to perform computations. Use Calc methods such as
.erab_accessibility() and .eutran_ip_throughput() to compute KPIs, and .sum()
.erab_accessibility() and .eutran_ip_throughput() to compute KPIs, and .aggregate()
to compute aggregated measurements.
- MeasurementLog maintains journal with result of measurements. Use .append()
......@@ -54,6 +54,8 @@ from __future__ import print_function, division, absolute_import
import numpy as np
from golang import func
import warnings
# Calc provides way to compute KPIs over given measurement data and time interval.
#
......@@ -71,7 +73,7 @@ from golang import func
# ──────|─────|────[────|────)──────|──────|────────>
# ←─ τ_lo τ_hi ──→ time
#
# It is also possible to merely aggregate measured values via .sum() .
# It is also possible to merely aggregate measured values via .aggregate() .
#
# See also: MeasurementLog, Measurement, ΣMeasurement.
class Calc:
......@@ -119,6 +121,41 @@ class MeasurementLog:
pass
# Stat[dtype] represents result of statistical profiling with arbitrary sampling
# for a value with specified dtype.
#
# It is organized as NumPy structured scalar with avg, min, max and n fields.
#
# It is used inside Measurement for e.g. DRB.IPLatDl.QCI .
class Stat(np.void):
# _dtype_for returns dtype that Stat[dtype] will use.
@classmethod
def _dtype_for(cls, dtype):
return np.dtype((cls, [
('avg', np.float64), # NOTE even int becomes float on averaging
('min', dtype),
('max', dtype),
('n', np.int64)]))
# StatT[dtype] represents result of statistical profiling with time-based sampling
# for a value with specified dtype.
#
# It is organized as NumPy structured scalar with avg, min and max fields.
#
# NOTE contrary to Stat there is no n field and containing Measurement.X.δT
# should be taken to know during which time period the profile was collected.
#
# It is used inside Measurement for e.g. DRB.UEActive .
class StatT(np.void):
# _dtype_for returns dtype that StatT[dtype] will use.
@classmethod
def _dtype_for(cls, dtype):
return np.dtype((cls, [
('avg', np.float64), # see avg note in Stat
('min', dtype),
('max', dtype)]))
# Measurement represents set of measured values and events observed and counted
# during one particular period of time.
#
......@@ -155,16 +192,22 @@ class MeasurementLog:
class Measurement(np.void):
Tcc = np.int32 # cumulative counter
Ttime = np.float64 # time is represented in seconds since epoch
S = Stat ._dtype_for # statistical profile with arbitrary sampling
St = StatT._dtype_for # statistical profile with time-based sampling
# _dtype defines measured values and events.
_dtype = np.dtype([
('X.Tstart', Ttime), # when the measurement started
('X.δT', Ttime), # time interval during which the measurement was made
# below come values/events as specified by TS 32.425 and TS 32.450
# NOTE all .QCI and .CAUSE are expanded from outside.
# below comes definition of values/events as specified by TS 32.425 and TS 32.450
#
# - .QCI suffix means a value comes as array of per-QCI values.
# - .CAUSE suffix means a value comes as array of per-CAUSE values.
#
# NOTE both .QCI and .CAUSE are expanded from outside.
#
# NAME TYPE UNIT TS 32.425 reference + ...
# NAME TYPE/DTYPE UNIT TS 32.425 reference + ...
('RRC.ConnEstabAtt.CAUSE', Tcc), # 1 4.1.1.1
('RRC.ConnEstabSucc.CAUSE', Tcc), # 1 4.1.1.2
......@@ -179,9 +222,10 @@ class Measurement(np.void):
('DRB.PdcpSduBitrateUl.QCI', np.float64),# bit/s 4.4.1.1 NOTE not kbit/s
('DRB.PdcpSduBitrateDl.QCI', np.float64),# bit/s 4.4.1.2 NOTE not kbit/s
# XXX mean is not good for our model
# TODO mean -> total + npkt?
#('DRB.IPLatDl.QCI', Ttime), # s 4.4.5.1 32.450:6.3.2 NOTE not ms
('DRB.UEActive', St(np.int32)), # 1 4.4.2.4 36.314:4.1.3.3
('DRB.IPLatDl.QCI', S(Ttime)), # s 4.4.5.1 32.450:6.3.2 NOTE not ms
# DRB.IPThpX.QCI = DRB.IPVolX.QCI / DRB.IPTimeX.QCI 4.4.6.1-2 32.450:6.3.1
('DRB.IPVolDl.QCI', np.int64), # bit 4.4.6.3 32.450:6.3.1 NOTE not kbit
......@@ -206,6 +250,8 @@ class Measurement(np.void):
('PEE.Energy', np.float64),# J 4.12.2 NOTE not kWh
])
del S, St
# Interval is NumPy structured scalar that represents [lo,hi) interval.
#
......@@ -224,18 +270,18 @@ class Interval(np.void):
# It is similar to Measurement, but each value comes accompanied with
# information about how much time there was no data for that field:
#
# Σ[f].value = Σ Mi[f] if Mi[f] ≠ NA
# i
# Σ[f].value = Aggregate Mi[f] if Mi[f] ≠ NA
# i
#
# Σ[f].τ_na = Σ Mi[X.δT] if Mi[f] = NA
# i
# Σ[f].τ_na = Σ Mi[X.δT] if Mi[f] = NA
# i
class ΣMeasurement(np.void):
_ = []
for name in Measurement._dtype.names:
typ = Measurement._dtype.fields[name][0].type
dtyp = Measurement._dtype.fields[name][0]
if not name.startswith('X.'): # X.Tstart, X.δT
typ = np.dtype([('value', typ), ('τ_na', Measurement.Ttime)])
_.append((name, typ))
dtyp = np.dtype([('value', dtyp), ('τ_na', Measurement.Ttime)])
_.append((name, dtyp))
_dtype = np.dtype(_)
del _
......@@ -273,6 +319,25 @@ def __new__(cls):
Σ[field]['τ_na'] = 0
return Σ
# Stat() creates new Stat instance with specified values and dtype.
@func(Stat)
def __new__(cls, min, avg, max, n, dtype=np.float64):
s = _newscalar(cls, cls._dtype_for(dtype))
s['min'] = min
s['avg'] = avg
s['max'] = max
s['n'] = n
return s
# StatT() creates new StatT instance with specified values and dtype.
@func(StatT)
def __new__(cls, min, avg, max, dtype=np.float64):
s = _newscalar(cls, cls._dtype_for(dtype))
s['min'] = min
s['avg'] = avg
s['max'] = max
return s
# _all_qci expands <name>.QCI into <name>.sum and [] of <name>.<qci> for all possible qci values.
# TODO remove and use direct array access (after causes are expanded into array too)
......@@ -366,6 +431,34 @@ def __str__(m):
vv.append(_vstr(m[field]))
return "(%s)" % ', '.join(vv)
# __repr__ returns Stat(min, avg, max, n, dtype=...)
# NA values are represented as "ø".
@func(Stat)
def __repr__(s):
return "Stat(%s, %s, %s, %s, dtype=%s)" % (_vstr(s['min']), _vstr(s['avg']),
_vstr(s['max']), _vstr(s['n']), s['min'].dtype)
# __repr__ returns StatT(min, avg, max, dtype=...)
# NA values are represented as "ø".
@func(StatT)
def __repr__(s):
return "StatT(%s, %s, %s, dtype=%s)" % (_vstr(s['min']), _vstr(s['avg']),
_vstr(s['max']), s['min'].dtype)
# __str__ returns "<min avg max>·n"
# NA values are represented as "ø".
@func(Stat)
def __str__(s):
return "<%s %s %s>·%s" % (_vstr(s['min']), _vstr(s['avg']), _vstr(s['max']), _vstr(s['n']))
# __str__ returns "<min avg max>"
# NA values are represented as "ø".
@func(StatT)
def __str__(s):
return "<%s %s %s>" % (_vstr(s['min']), _vstr(s['avg']), _vstr(s['max']))
# _vstr returns string representation of scalar or subarray v.
def _vstr(v): # -> str
if v.shape == (): # scalar
......@@ -377,9 +470,17 @@ def _vstr(v): # -> str
va = [] # subarray with some non-ø data
for k in range(v.shape[0]):
if v[k] == 0:
continue
va.append('%d:%s' % (k, 'ø' if isNA(v[k]) else str(v[k])))
vk = v[k]
if isinstance(vk, np.void):
for name in vk.dtype.names:
if vk[name] != 0:
break
else:
continue
else:
if vk == 0:
continue
va.append('%d:%s' % (k, 'ø' if isNA(vk) else str(vk)))
return "{%s}" % ' '.join(va)
......@@ -422,8 +523,14 @@ def _check_valid(m):
continue
# * ≥ 0
if v < 0:
bad(".%s < 0 (%s)" % (field, v))
if not isinstance(v, np.void):
if v < 0:
bad(".%s < 0 (%s)" % (field, v))
else:
for vfield in v.dtype.names:
vf = v[vfield]
if not isNA(vf) and vf < 0:
bad(".%s.%s < 0 (%s)" % (field, vfield, vf))
# fini ≤ init
if "Succ" in field:
......@@ -696,14 +803,32 @@ def eutran_ip_throughput(calc): # -> IPThp[QCI][dl,ul]
return thp
# sum aggregates values of all Measurements in covered time interval.
# TODO tests
# aggregate aggregates values of all Measurements in covered time interval.
@func(Calc)
def sum(calc): # -> ΣMeasurement
def aggregate(calc): # -> ΣMeasurement
Σ = ΣMeasurement()
Σ['X.Tstart'] = calc.τ_lo
Σ['X.δT'] = calc.τ_hi - calc.τ_lo
def xmin(a, b):
if isNA(a): return b
if isNA(b): return a
return min(a, b)
def xmax(a, b):
if isNA(a): return b
if isNA(b): return a
return max(a, b)
def xavg(a, na, b, nb): # -> <ab>, na+nb
if isNA(a) or isNA(na):
return b, nb
if isNA(b) or isNA(nb):
return a, na
nab = na+nb
ab = (a*na + b*nb)/nab
return ab, nab
for m in calc._miter():
for field in m.dtype.names:
if field.startswith('X.'): # X.Tstart, X.δT
......@@ -713,15 +838,45 @@ def sum(calc): # -> ΣMeasurement
if v.shape != (): # skip subarrays - rely on aliases
continue
Σf = Σ[field] # view to Σ[field]
Σv = Σf['value'] # view to Σ[field]['value']
if isNA(v):
Σ[field]['τ_na'] += m['X.δT']
Σf['τ_na'] += m['X.δT']
continue
if isNA(Σv):
Σf['value'] = v
continue
if isinstance(v, np.number):
Σf['value'] += v
elif isinstance(v, StatT):
Σv['min'] = xmin(Σv['min'], v['min'])
Σv['max'] = xmax(Σv['max'], v['max'])
# TODO better sum everything and then divide as a whole to avoid loss of precision
Σv['avg'], _ = xavg(Σv['avg'], m['X.Tstart'] - Σ['X.Tstart'] - Σf['τ_na'],
v['avg'], m['X.δT'])
elif isinstance(v, Stat):
Σv['min'] = xmin(Σv['min'], v['min'])
Σv['max'] = xmax(Σv['max'], v['max'])
# TODO better sum everything and then divide as a whole to avoid loss of precision
Σv['avg'], Σv['n'] = xavg(Σv['avg'], Σv['n'],
v['avg'], v['n'])
else:
if isNA(Σ[field]['value']):
Σ[field]['value'] = 0
Σ[field]['value'] += v
raise AssertionError("Calc.aggregate: unexpected type %r" % type(v))
return Σ
# sum is deprecated alias to aggregate.
@func(Calc)
def sum(calc):
warnings.warn("Calc.sum is deprecated -> use Calc.aggregate instead", DeprecationWarning, stacklevel=4)
return calc.aggregate()
# _miter iterates through [.τ_lo, .τ_hi) yielding Measurements.
#
......@@ -819,10 +974,13 @@ def _i2pc(x: Interval): # -> Interval
# _newscalar creates new NumPy scalar instance with specified type and dtype.
def _newscalar(typ, dtype):
_ = np.zeros(shape=(), dtype=(typ, dtype))
dtyp = np.dtype((typ, dtype)) # dtype with .type adjusted to be typ
assert dtyp == dtype
assert dtyp.type is typ
_ = np.zeros(shape=(), dtype=dtyp)
s = _[()]
assert type(s) is typ
assert s.dtype == dtype
assert s.dtype is dtyp
return s
......@@ -833,15 +991,20 @@ def NA(dtype):
typ = dtype.type
# float
if issubclass(typ, np.floating):
na = np.nan
na = typ(np.nan) # return the same type as dtype has, e.g. np.int32, not int
# int: NA is min value
elif issubclass(typ, np.signedinteger):
na = np.iinfo(typ).min
na = typ(np.iinfo(typ).min)
# structure: NA is combination of NAs for fields
elif issubclass(typ, np.void):
na = _newscalar(typ, dtype)
for field in dtype.names:
na[field] = NA(dtype.fields[field][0])
else:
raise AssertionError("NA not defined for dtype %s" % (dtype,))
return typ(na) # return the same type as dtype has, e.g. np.int32, not int
assert type(na) is typ
return na
# isNA returns whether value represent NA.
......@@ -850,6 +1013,26 @@ def NA(dtype):
# returns array(True/False) if value is array.
def isNA(value):
na = NA(value.dtype)
if np.isnan(na):
return np.isnan(value) # `nan == nan` gives False
# `nan == nan` gives False
# work it around by checking for nan explicitly
if isinstance(na, np.void): # items are structured scalars
vna = None
for field in value.dtype.names:
nf = na[field]
vf = value[field]
if np.isnan(nf):
x = np.isnan(vf)
else:
x = (vf == nf)
if vna is None:
vna = x
else:
vna &= x
return vna
else:
if np.isnan(na):
return np.isnan(value)
return value == na
# -*- coding: utf-8 -*-
# Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Copyright (C) 2022-2024 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -20,10 +20,13 @@
from __future__ import print_function, division, absolute_import
from xlte.kpi import Calc, MeasurementLog, Measurement, Interval, NA, isNA, Σqci, Σcause, nqci
from xlte.kpi import Calc, MeasurementLog, Measurement, ΣMeasurement, Interval, \
Stat, StatT, NA, isNA, Σqci, Σcause, nqci
import numpy as np
from pytest import raises
ms = 1e-3
def test_Measurement():
m = Measurement()
......@@ -43,6 +46,8 @@ def test_Measurement():
_('DRB.IPVolDl.sum') # int64
_('DRB.IPTimeDl.7') # .QCI alias
_('DRB.IPTimeDl.QCI') # .QCI array
_('DRB.IPLatDl.7') # .QCI alias to Stat
_('DRB.IPLatDl.QCI') # .QCI array of Stat
# everything automatically
for name in m.dtype.names:
_(name)
......@@ -53,6 +58,12 @@ def test_Measurement():
assert m['S1SIG.ConnEstabAtt'] == 123
m['RRC.ConnEstabAtt.sum'] = 17
assert m['RRC.ConnEstabAtt.sum'] == 17
m['DRB.UEActive']['min'] = 1
m['DRB.UEActive']['avg'] = 6
m['DRB.UEActive']['max'] = 8
assert m['DRB.UEActive']['min'] == 1
assert m['DRB.UEActive']['avg'] == 6
assert m['DRB.UEActive']['max'] == 8
m['DRB.IPVolDl.QCI'][:] = 0
m['DRB.IPVolDl.5'] = 55
m['DRB.IPVolDl.7'] = NA(m['DRB.IPVolDl.7'].dtype)
......@@ -65,17 +76,32 @@ def test_Measurement():
continue
assert m['DRB.IPVolDl.%d' % k] == 0
assert m['DRB.IPVolDl.QCI'][k] == 0
m['DRB.IPLatDl.QCI'][:]['avg'] = 0
m['DRB.IPLatDl.QCI'][:]['min'] = 0
m['DRB.IPLatDl.QCI'][:]['max'] = 0
m['DRB.IPLatDl.QCI'][:]['n'] = 0
m['DRB.IPLatDl.QCI'][3]['avg'] = 33
m['DRB.IPLatDl.QCI'][3]['n'] = 123
m['DRB.IPLatDl.4']['avg'] = 44
m['DRB.IPLatDl.4']['n'] = 432
m['DRB.IPLatDl.8']['avg'] = NA(m['DRB.IPLatDl.8']['avg'].dtype)
m['DRB.IPLatDl.8']['n'] = NA(m['DRB.IPLatDl.8']['n'] .dtype)
# str/repr
assert repr(m) == "Measurement(RRC.ConnEstabAtt.sum=17, DRB.IPVolDl.QCI={5:55 7:ø 9:99}, S1SIG.ConnEstabAtt=123)"
assert repr(m) == "Measurement(RRC.ConnEstabAtt.sum=17, DRB.UEActive=<1 6.0 8>, DRB.IPLatDl.QCI={3:<0.0 33.0 0.0>·123 4:<0.0 44.0 0.0>·432 8:<0.0 ø 0.0>·ø}, DRB.IPVolDl.QCI={5:55 7:ø 9:99}, S1SIG.ConnEstabAtt=123)"
assert repr(m['DRB.UEActive']) == "StatT(1, 6.0, 8, dtype=int32)"
assert repr(m['DRB.IPLatDl.3']) == "Stat(0.0, 33.0, 0.0, 123, dtype=float64)"
s = str(m)
assert s[0] == '('
assert s[-1] == ')'
v = s[1:-1].split(', ')
vok = ['ø'] * len(m._dtype0.names)
vok[m.dtype.names.index("RRC.ConnEstabAtt.sum")] = "17"
vok[m.dtype.names.index("DRB.UEActive")] = "<1 6.0 8>"
vok[m.dtype.names.index("S1SIG.ConnEstabAtt")] = "123"
vok[m.dtype.names.index("DRB.IPVolDl.QCI")] = "{5:55 7:ø 9:99}"
vok[m.dtype.names.index("DRB.IPLatDl.QCI")] = "{3:<0.0 33.0 0.0>·123 4:<0.0 44.0 0.0>·432 8:<0.0 ø 0.0>·ø}"
assert v == vok
# verify that time fields has enough precision
......@@ -496,6 +522,65 @@ def test_Calc_eutran_ip_throughput():
assert thp[qci]['ul'] == I(0)
# verify Calc.aggregate .
def test_Calc_aggregate():
mlog = MeasurementLog()
m1 = Measurement()
m1['X.Tstart'] = 1
m1['X.δT'] = 2
m1['S1SIG.ConnEstabAtt'] = 12 # Tcc
m1['ERAB.SessionTimeUE'] = 1.2 # Ttime
m1['DRB.UEActive'] = StatT(1, 3.7, 5) # StatT
m1['DRB.IPLatDl.7'] = Stat(4*ms, 7.32*ms, 25*ms, 17) # Stat
m2 = Measurement()
m2['X.Tstart'] = 5 # NOTE [3,5) is NA hole
m2['X.δT'] = 3
m2['S1SIG.ConnEstabAtt'] = 11
m2['ERAB.SessionTimeUE'] = 0.7
m2['DRB.UEActive'] = StatT(2, 3.2, 7)
m2['DRB.IPLatDl.7'] = Stat(3*ms, 5.23*ms, 11*ms, 11)
mlog.append(m1)
mlog.append(m2)
calc = Calc(mlog, 0, 10)
assert calc.τ_lo == 0
assert calc.τ_hi == 10
M = calc.aggregate()
assert isinstance(M, ΣMeasurement)
assert M['X.Tstart'] == 0
assert M['X.δT'] == 10
assert M['S1SIG.ConnEstabAtt']['value'] == 12 + 11
assert M['S1SIG.ConnEstabAtt']['τ_na'] == 5 # [0,1) [3,5) [8,10)
assert M['ERAB.SessionTimeUE']['value'] == 1.2 + 0.7
assert M['ERAB.SessionTimeUE']['τ_na'] == 5
assert M['DRB.UEActive']['value'] == StatT(1, (3.7*2 + 3.2*3)/(2+3), 7)
assert M['DRB.UEActive']['τ_na'] == 5
assert M['DRB.IPLatDl.7']['value'] == Stat(3*ms, (7.32*17 + 5.23*11)/(17+11)*ms, 25*ms, 17+11)
assert M['DRB.IPLatDl.7']['τ_na'] == 5
# assert that everything else is NA with τ_na == 10
def _(name):
f = M[name]
if f.shape != ():
return # don't check X.QCI - rely on aliases
assert isNA(f['value'])
assert f['τ_na'] == 10
for name in M.dtype.names:
if name not in ('X.Tstart', 'X.δT', 'S1SIG.ConnEstabAtt',
'ERAB.SessionTimeUE', 'DRB.UEActive', 'DRB.IPLatDl.7'):
_(name)
# verify Σqci.
def test_Σqci():
m = Measurement()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment