kpi: Add support for QCI to Measurements

Previously for Measurement fields with .QCI or .CAUSE suffix we had only
the .sum value and no per-QCI nor per-CAUSE values. In other words
support for QCI and CAUSE was stub. In this patch we add support for
QCI: every field X.QCI is now automatically expanded into X[256] array
and X.sum . For convenience we also provide X.<qci> aliases that alias
X[qci]. For example field DRB.IPVolDl.9 aliases 9'th element of
DRB.IPVolDl array.

We will need QCI support for E-UTRAN IP Throughput KPI which is required
to provide resulting values for every QCI individually.

CAUSE support remains stub for now.
......@@ -220,18 +220,24 @@ class Interval(np.void):
def __new__(cls):
m = _newscalar(cls, cls._dtype)
for field in m.dtype.names:
for field in m._dtype0.names:
fdtype = m.dtype.fields[field][0]
m[field] = NA(fdtype)
if fdtype.shape == ():
m[field] = NA(fdtype) # scalar
m[field][:] = NA(fdtype.base) # subarray
return m
# _all_qci expands <name>.QCI into <name>.sum and [] of <name>.<qci> for all possible qci values.
# TODO remove and use direct array access (after causes are expanded into array too)
nqci = 256 # all possible QCIs ∈ [0,255], standard ones are described in 23.203 Table 6.1.7
def _all_qci(name_qci: str): # -> name_sum, ()name_qciv
if not name_qci.endswith(".QCI"):
raise AssertionError("invalid name_qci %r: no .QCI suffix" % name_qci)
name = name_qci[:-len(".QCI")]
return name+".sum", () # TODO add all possible QCIs - TS 36.413 (S1AP)
name_qciv = tuple("%s.%d" % (name,q) for q in range(nqci))
return name+".sum", name_qciv
# _all_cause expands <name>.CAUSE into <name>.sum and [] of <name>.<cause> for all possible cause values.
def _all_cause(name_cause: str): # -> name_sum, ()name_causev
......@@ -242,13 +248,16 @@ def _all_cause(name_cause: str): # -> name_sum, ()name_causev
# expand all .QCI and .CAUSE in Measurement._dtype .
def _():
expv = [] # of (name, typ)
# expand X.QCI -> X.sum + X.QCI[nqci]
qnamev = [] # X from X.QCI
expv = [] # of (name, typ[, shape])
for name in Measurement._dtype .names:
typ = Measurement._dtype .fields[name][0].type
if name.endswith('.QCI'):
Σ, qciv = _all_qci(name)
for _ in (Σ,)+qciv:
expv.append((_, typ))
_ = name[:-len('.QCI')]
expv.append(('%s.sum' % _, typ)) # X.sum
expv.append((name, typ, nqci)) # X.QCI[nqci]
elif name.endswith('.CAUSE'):
Σ, causev = _all_cause(name)
......@@ -258,7 +267,33 @@ def _():
expv.append((name, typ))
Measurement._dtype = np.dtype(expv)
_dtype = np.dtype(expv)
# also provide .QCI aliases, e.g. X.5 -> X.QCI[5]
namev = []
formatv = []
offsetv = []
for name in _dtype.names:
fd, off = _dtype.fields[name]
namev .append(name)
for qname in qnamev:
qarr, off0 = _dtype.fields[qname+'.QCI']
assert len(qarr.shape) == 1
for qci in range(qarr.shape[0]):
namev .append('%s.%d' % (qname, qci))
offsetv.append(off0 + qci*qarr.base.itemsize)
Measurement._dtype0 = _dtype # ._dtype without aliases
Measurement._dtype = np.dtype({
'names': namev,
'formats': formatv,
'offsets': offsetv,
assert Measurement._dtype.itemsize == Measurement._dtype0.itemsize
del _
......@@ -268,22 +303,40 @@ del _
def __repr__(m):
initv = []
for field in m.dtype.names:
v = m[field]
if not isNA(v):
initv.append("%s=%r" % (field, v))
for field in m._dtype0.names:
vs = _vstr(m[field])
if vs != 'ø':
initv.append("%s=%s" % (field, vs))
return "Measurement(%s)" % ', '.join(initv)
# __str__ returns "(v1, v2, ...)".
# NA values are represented as "ø".
# .QCI arrays are represented as {qci₁:v₁ qci₂:v₂ ...} with zero values omitted.
# if all values are NA - then the whole array is represented as ø.
def __str__(m):
vv = []
for field in m.dtype.names:
v = m[field]
vv.append('ø' if isNA(v) else str(v))
for field in m._dtype0.names:
return "(%s)" % ', '.join(vv)
# _vstr returns string representation of scalar or subarray v.
def _vstr(v): # -> str
if v.shape == (): # scalar
return 'ø' if isNA(v) else str(v)
assert len(v.shape) == 1
if isNA(v).all(): # subarray full of ø
return 'ø'
va = [] # subarray with some non-ø data
for k in range(v.shape[0]):
if v[k] == 0:
va.append('%d:%s' % (k, 'ø' if isNA(v[k]) else str(v[k])))
return "{%s}" % ' '.join(va)
# ==, != for Measurement.
def __eq__(a, b):
......@@ -291,7 +344,10 @@ def __eq__(a, b):
# return np.array_equal(a, b, equal_nan=True) # for NA==NA
if not isinstance(b, Measurement):
return False
return ==
# cast to dtype without aliases to avoid
# "dtypes with overlapping or out-of-order fields are not representable as buffers"
return a.view(a._dtype0).data.tobytes() == \
def __ne__(a, b):
......@@ -314,6 +370,8 @@ def _check_valid(m):
for field in m.dtype.names:
v = m[field]
if v.shape != (): # skip subarrays - rely on aliases
if isNA(v):
......@@ -359,8 +417,9 @@ def append(mlog, m: Measurement):
if not (τ_ + δτ_ <= τ):
raise AssertionError(".Tstart overlaps with previous measurement: %s ∈ [%s, %s)" %
(τ, τ_, τ_ + δτ_))
_ = np.append(mlog._data, m)
_ = np.append(
mlog._data.view(Measurement._dtype0), # dtype0 because np.append does not handle aliased
m.view(Measurement._dtype0)) # fields as such and increases out itemsize
mlog._data = _.view((Measurement, Measurement._dtype)) # np.append looses Measurement from dtype
# forget_past deletes measurements with .Tstart ≤ Tcut
......@@ -627,6 +686,7 @@ def _newscalar(typ, dtype):
_ = np.zeros(shape=(), dtype=(typ, dtype))
s = _[()]
assert type(s) is typ
assert s.dtype == dtype
return s
......@@ -649,7 +709,9 @@ def NA(dtype):
# isNA returns whether value represent NA.
# value must be numpy scalar.
# returns True/False if value is scalar.
# returns array(True/False) if value is array.
def isNA(value):
na = NA(value.dtype)
if np.isnan(na):
......@@ -18,7 +18,7 @@
# See COPYING file for full licensing terms.
# See for rationale and options.
from xlte.kpi import Calc, MeasurementLog, Measurement, Interval, NA, isNA, Σqci, Σcause
from xlte.kpi import Calc, MeasurementLog, Measurement, Interval, NA, isNA, Σqci, Σcause, nqci
import numpy as np
from pytest import raises
......@@ -29,12 +29,18 @@ def test_Measurement():
# verify that all fields are initialized to NA
def _(name):
assert isNA(m[name])
v = m[name]
if v.shape == ():
assert isNA(v) # scalar
assert isNA(v).all() # array
# several fields explicitly
_('X.Tstart') # time
_('RRC.ConnEstabAtt.sum') # Tcc
_('DRB.PdcpSduBitrateDl.sum') # float32
_('DRB.IPVolDl.sum') # int64
_('DRB.IPTimeDl.7') # .QCI alias
_('DRB.IPTimeDl.QCI') # .QCI array
# everything automatically
for name in m.dtype.names:
......@@ -45,16 +51,29 @@ def test_Measurement():
assert m['S1SIG.ConnEstabAtt'] == 123
m['RRC.ConnEstabAtt.sum'] = 17
assert m['RRC.ConnEstabAtt.sum'] == 17
m['DRB.IPVolDl.QCI'][:] = 0
m['DRB.IPVolDl.5'] = 55
m['DRB.IPVolDl.7'] = NA(m['DRB.IPVolDl.7'].dtype)
m['DRB.IPVolDl.QCI'][9] = 99
assert m['DRB.IPVolDl.5'] == 55; assert m['DRB.IPVolDl.QCI'][5] == 55
assert isNA(m['DRB.IPVolDl.7']); assert isNA(m['DRB.IPVolDl.QCI'][7])
assert m['DRB.IPVolDl.9'] == 99; assert m['DRB.IPVolDl.QCI'][9] == 99
for k in range(len(m['DRB.IPVolDl.QCI'])):
if k in {5,7,9}:
assert m['DRB.IPVolDl.%d' % k] == 0
assert m['DRB.IPVolDl.QCI'][k] == 0
# str/repr
assert repr(m) == "Measurement(RRC.ConnEstabAtt.sum=17, S1SIG.ConnEstabAtt=123)"
assert repr(m) == "Measurement(RRC.ConnEstabAtt.sum=17, DRB.IPVolDl.QCI={5:55 7:ø 9:99}, S1SIG.ConnEstabAtt=123)"
s = str(m)
assert s[0] == '('
assert s[-1] == ')'
v = s[1:-1].split(', ')
vok = ['ø'] * len(m.dtype.names)
vok = ['ø'] * len(m._dtype0.names)
vok[m.dtype.names.index("RRC.ConnEstabAtt.sum")] = "17"
vok[m.dtype.names.index("S1SIG.ConnEstabAtt")] = "123"
vok[m.dtype.names.index("DRB.IPVolDl.QCI")] = "{5:55 7:ø 9:99}"
assert v == vok
# verify that time fields has enough precision
......@@ -431,7 +450,19 @@ def test_Σqci():
m[x+'.sum'] = 123
assert Σ() == 123
# TODO sum over individual causes (when implemented)
m[x+'.17'] = 17
m[x+'.23'] = 23
m[x+'.255'] = 255
assert Σ() == 123 # from .sum
m[x+'.sum'] = NA(m[x+'.sum'].dtype)
assert isNA(Σ()) # from array, but NA values lead to sum being NA
v = m[x+'.QCI']
l = len(v)
for i in range(l):
v[i] = 1 + i
assert Σ() == 1*l + (l-1)*l/2
# verify Σcause.
