Commit 5bf7dc1c authored by Kirill Smelkov's avatar Kirill Smelkov

amari.{drb,xlog}: Provide aggregated DRB statistics in the form of synthetic x.drb_stats message

This patch provides next building block for E-UTRAN IP Throughput KPI
and continues d102ffaa (drb: Start of the package). Quoting that patch

    The scheme to compute E-UTRAN IP Throughput is thus as follows: poll eNB at
    100Hz frequency for `ue_get[stats]` and retrieve information about per-UE/QCI
    streams and the number of transport blocks dl/ul-ed to the UE in question
    during that 10ms frame. Estimate `tx_time` taking into account
    the number of transmitted transport blocks. And estimate whether eNB is congested or
    not based on `dl_use_avg`/`ul_use_avg` taken from `stats`. For the latter we
    also need to poll for `stats` at 100Hz frequency and synchronize
    `ue_get[stats]` and `stats` requests in time so that they both cover the same
    time interval of particular frame.

    Then organize the polling process to provide aggregated statistics in the form of	<-- NOTE
    new `x.drb_stats` message, and teach `xamari xlog` to save that messages to		<-- NOTE
    `enb.xlog` together with `stats`.							<-- NOTE

    Then further adjust `amari.kpi.LogMeasure` and generic
    `kpi.Measurement` and `kpi.Calc` to handle DRB-related data.

So here we implement the noted step:

- add drv._x_stats_srv server that polls eNB at 100Hz rate, uses Sampler
  to extract bursts and aggregates information about those bursts.

- teach xlog to organize servers for synthetic messages and communicate
  with them, and register drv._x_stats_srv as such server to handle
  generation of x.drb_stats message.
parent 78f26e3a
...@@ -21,11 +21,23 @@ ...@@ -21,11 +21,23 @@
- Sampler converts information about data flows obtained via ue_get[stats] into - Sampler converts information about data flows obtained via ue_get[stats] into
Samples that represent bursts of continuous transmissions. Samples that represent bursts of continuous transmissions.
- _x_stats_srv uses Sampler to process data flows at 100Hz rate and aggregates
results into information needed to compute E-UTRAN IP Throughput KPI. The
information is emitted in the form of synthetic x.drb_stats message whose
generation is integrated into amari.xlog package.
See the following related 3GPP standards references:
- TS 32.450 6.3.1 "E-UTRAN IP Throughput"
- TS 32.425 4.4.6 "IP Throughput measurements"
""" """
from golang import func from xlte import amari
from golang import time
from golang import chan, select, default, nilchan, func, defer
from golang import sync, time
import math import math
import sys import sys
...@@ -623,6 +635,292 @@ def __repr__(s): ...@@ -623,6 +635,292 @@ def __repr__(s):
# ---------------------------------------- # ----------------------------------------
# _x_stats_srv provides server for x.drb_stats queries.
#
# To do so it polls eNB every 10ms at 100Hz frequency with `ue_get[stats]`
# and tries to further improve accuracy of retrieved DL/UL samples timing
# towards 1ms via heuristic on how much transport blocks were tx/rx'ed
# during each observation.
#
# This heuristic can be used unless eNB is congested. To detect congestion
# _x_stats_srv also polls eNB with `stats` at the same 100Hz frequency and
# synchronized in time with `ue_get[stats]`. The congestion is detected by
# dl_use_avg / ul_use_avg being close to 1.
#
# Since we can detect only the fact of likely congestion, but not the level
# of congestion, nor other details related to QCIs priorities, for congested
# case the heuristic is not used and throughput is reported via rough, but
# relatively true, interval estimates.
#
# NOTE we cannot go polling to higher than 100Hz frequency, since enb
# rate-limits websocket requests to execute not faster than 10ms each.
@func
def _x_stats_srv(ctx, reqch: chan, conn: amari.Conn):
δt_rate = 10*tti
# rx_ue_get_stats sends `ue_get[stats]` request and returns server response.
rtt_ue_stats = _IncStats() # time it takes to send ue_get and to receive response
δt_ue_stats = _IncStats() # δ(ue_stats.timestamp)
t_ue_stats = None # last ue_stats.timestamp
def rx_ue_get_stats(ctx): # -> ue_stats
nonlocal t_ue_stats
t_tx = time.now()
ue_stats = conn.req(ctx, 'ue_get', {'stats': True})
t_rx = time.now()
rtt_ue_stats.add(t_rx-t_tx)
t = ue_stats['utc']
if t_ue_stats is not None:
δt_ue_stats.add(t-t_ue_stats)
t_ue_stats = t
return ue_stats
# rx_stats sends `stats` request and returns server response.
# we need to query stats to get dl_use/ul_use.
# Establish separate connection for that since if we use the same conn for
# both ue_get and stats queries, due to overall 100Hz rate-limiting, ue_get
# would be retrieved at only 50Hz rate. With separate connection for stats
# we can retrieve both ue_get and stats each at 100Hz simultaneously.
conn_stats = amari.connect(ctx, conn.wsuri)
defer(conn_stats.close)
rtt_stats = _IncStats() # like rtt_ue_stats but for stat instead of ue_get
δt_stats = _IncStats() # δ(stats.timestamp)
t_stats = None # last stats.timestamp
def rx_stats(ctx): # -> stats
nonlocal t_stats
t_tx = time.now()
stats = conn_stats.req(ctx, 'stats', {})
t_rx = time.now()
rtt_stats.add(t_rx-t_tx)
t = stats['utc']
if t_stats is not None:
δt_stats.add(t-t_stats)
t_stats = t
return stats
# issue first dummy stats. It won't report most of statistics due to
# initial_delay=0, but it will make the next stats query avoid pausing for 0.4s.
conn_stats.req(ctx, 'stats', {'initial_delay': 0})
# rx_all simultaneously issues `ue_get[stats]` and `stats` requests and returns server responses.
# the requests are issued synchronized in time.
δ_ue_stats = _IncStats() # ue_stats.timestamp - stats.timestamp
def rx_all(ctx): # -> ue_stats, stats
uq = chan(1)
sq = chan(1)
_, _rx = select(
ctx.done().recv, # 0
(ueget_reqch.send, uq), # 1
)
if _ == 0:
raise ctx.err()
_, _rx = select(
ctx.done().recv, # 0
(stats_reqch.send, sq), # 1
)
if _ == 0:
raise ctx.err()
ue_stats = stats = None
while ue_stats is None or stats is None:
_, _rx = select(
ctx.done().recv, # 0
uq.recv, # 1
sq.recv, # 2
)
if _ == 0:
raise ctx.err()
if _ == 1:
ue_stats = _rx
uq = nilchan
if _ == 2:
stats = _rx
sq = nilchan
δ_ue_stats.add(ue_stats['utc'] - stats['utc'])
return ue_stats, stats
ueget_reqch = chan()
def Trx_ue_get(ctx):
while 1:
_, _rx = select(
ctx.done().recv, # 0
ueget_reqch.recv, # 1
)
if _ == 0:
raise ctx.err()
retq = _rx
ue_stats = rx_ue_get_stats(ctx)
retq.send(ue_stats) # cap = 1
stats_reqch = chan()
def Trx_stats(ctx):
while 1:
_, _rx = select(
ctx.done().recv, # 0
stats_reqch.recv, # 1
)
if _ == 0:
raise ctx.err()
retq = _rx
stats = rx_stats(ctx)
retq.send(stats) # cap = 1
# Tmain is the main thread that drives the process overall
def Tmain(ctx):
nonlocal rtt_ue_stats, δt_ue_stats
nonlocal rtt_stats, δt_stats
nonlocal δ_ue_stats
t_req = time.now()
ue_stats, stats = rx_all(ctx)
S = Sampler(ue_stats, stats)
qci_Σdl = {} # qci -> _Σ for dl
qci_Σul = {} # ----//---- for ul
class _Σ:
__slots__ = (
'tx_bytes',
'tx_time',
'tx_time_err',
'tx_time_notailtti',
'tx_time_notailtti_err',
'tx_nsamples',
)
def __init__(Σ):
for x in Σ.__slots__:
setattr(Σ, x, 0)
# account accounts samples into Σtx_time/Σtx_bytes in qci_Σ.
def account(qci_Σ, qci_samples):
for qci, samplev in qci_samples.items():
Σ = qci_Σ.get(qci)
if Σ is None:
Σ = qci_Σ[qci] = _Σ()
for s in samplev:
# do not account short transmissions
# ( tx with 1 tti should be ignored per standard, but it is
# also that small ICMP messages span 2 transport blocks sometimes )
t_lo = s.tx_time - s.tx_time_err
t_hi = s.tx_time + s.tx_time_err
if t_hi <= 1*tti or (t_hi <= 2 and s.tx_bytes < 1000):
continue
Σ.tx_nsamples += 1
Σ.tx_bytes += s.tx_bytes
Σ.tx_time += s.tx_time
Σ.tx_time_err += s.tx_time_err
# also aggregate .tx_time without tail tti (IP Throughput KPI needs this)
tt_hi = math.ceil(t_hi/tti - 1) # in tti
tt_lo = t_lo / tti # in tti
if tt_lo > 1:
tt_lo = math.ceil(tt_lo - 1)
tt = (tt_lo + tt_hi) / 2
tt_err = (tt_hi - tt_lo) / 2
Σ.tx_time_notailtti += tt * tti
Σ.tx_time_notailtti_err += tt_err * tti
while 1:
# TODO explicitly detect underrun?
_, _rx = select(
ctx.done().recv, # 0
reqch.recv, # 1
default, # 2
)
if _ == 0:
raise ctx.err()
if _ == 1:
# client requests to retrieve message for accumulated data
opts, respch = _rx
# TODO verify/handle opts?
# wrap-up flows and account finalized samples
qci_dl, qci_ul = S.finish()
account(qci_Σdl, qci_dl)
account(qci_Σul, qci_ul)
_debug()
_debug('rtt_ue: %s ms' % rtt_ue_stats .str('%.2f', time.millisecond))
_debug('δt_ue: %s ms' % δt_ue_stats .str('%.2f', time.millisecond))
_debug('rtt_stats: %s ms' % rtt_stats .str('%.2f', time.millisecond))
_debug('δt_stats: %s ms' % δt_stats .str('%.2f', time.millisecond))
_debug('δ(ue,stat): %s ms' % δ_ue_stats .str('%.2f', time.millisecond))
qci_dict = {}
Σ0 = _Σ()
for qci in set(qci_Σdl.keys()) .union(qci_Σul.keys()):
Σdl = qci_Σdl.get(qci, Σ0)
Σul = qci_Σul.get(qci, Σ0)
qci_dict[qci] = {
'dl_tx_bytes': Σdl.tx_bytes,
'dl_tx_time': Σdl.tx_time,
'dl_tx_time_err': Σdl.tx_time_err,
'dl_tx_time_notailtti': Σdl.tx_time_notailtti,
'dl_tx_time_notailtti_err': Σdl.tx_time_notailtti_err,
'dl_tx_nsamples': Σdl.tx_nsamples,
'ul_tx_bytes': Σul.tx_bytes,
'ul_tx_time': Σul.tx_time,
'ul_tx_time_err': Σul.tx_time_err,
'ul_tx_time_notailtti': Σul.tx_time_notailtti,
'ul_tx_time_notailtti_err': Σul.tx_time_notailtti_err,
'u;_tx_nsamples': Σul.tx_nsamples,
}
r = {'time': ue_stats['time'],
'utc': ue_stats['utc'],
'qci_dict': qci_dict,
'δt_ueget': {
'min': δt_ue_stats.min,
'avg': δt_ue_stats.avg(),
'max': δt_ue_stats.max,
'std': δt_ue_stats.std(),
},
'δ_ueget_vs_stats': {
'min': δ_ue_stats.min,
'avg': δ_ue_stats.avg(),
'max': δ_ue_stats.max,
'std': δ_ue_stats.std(),
},
}
respch.send(r)
# reset
qci_Σdl = {}
qci_Σul = {}
rtt_ue_stats = _IncStats()
δt_ue_stats = _IncStats()
rtt_stats = _IncStats()
δt_stats = _IncStats()
δ_ue_stats = _IncStats()
# sync time to keep t_req' - t_req ≈ δt_rate
# this should automatically translate to δt(ue_stats) ≈ δt_rate
t = time.now()
δtsleep = δt_rate - (t - t_req)
if δtsleep > 0:
time.sleep(δtsleep)
# retrieve ue_get[stats] and stats data for next frame from enb
t_req = time.now()
ue_stats, stats = rx_all(ctx)
# pass data to sampler and account already detected samples
qci_dl, qci_ul = S.add(ue_stats, stats)
account(qci_Σdl, qci_dl)
account(qci_Σul, qci_ul)
# run everything
wg = sync.WorkGroup(ctx)
wg.go(Trx_ue_get)
wg.go(Trx_stats)
wg.go(Tmain)
wg.wait()
# _IncStats incrementally computes statistics on provided values. # _IncStats incrementally computes statistics on provided values.
# #
# Provide values via .add(). # Provide values via .add().
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
- use Reader to read logged information from xlog. - use Reader to read logged information from xlog.
(*) for example result of stats, ue_get and erab_get queries. (*) for example result of stats, ue_get, erab_get and synthetic queries.
""" """
# XLog protocol # XLog protocol
...@@ -59,6 +59,7 @@ ...@@ -59,6 +59,7 @@
from xlte import amari from xlte import amari
from xlte.amari import drb
import json import json
import traceback import traceback
...@@ -196,14 +197,32 @@ class _XLogger: ...@@ -196,14 +197,32 @@ class _XLogger:
wg = sync.WorkGroup(ctx) wg = sync.WorkGroup(ctx)
defer(wg.wait) defer(wg.wait)
# spawn servers to handle queries with synthetic messages
xmsgsrv_dict = {}
for l in xl.logspecv:
if l.query in _xmsg_registry:
xsrv = _XMsgServer(l.query, _xmsg_registry[l.query])
xmsgsrv_dict[l.query] = xsrv
xsrv_ready = chan() # wait for xmsg._runCtx to be initialized
wg.go(xsrv.run, conn, xsrv_ready)
xsrv_ready.recv()
# spawn main logger # spawn main logger
wg.go(xl._xlog1, conn) wg.go(xl._xlog1, conn, xmsgsrv_dict)
def _xlog1(xl, ctx, conn): def _xlog1(xl, ctx, conn, xmsgsrv_dict):
# req_ queries either amari service directly, or an extra message service.
def req_(ctx, query, opts): # -> resp_raw
if query in xmsgsrv_dict:
query_xsrv = xmsgsrv_dict[query]
_, resp_raw = query_xsrv.req_(ctx, opts)
else:
_, resp_raw = conn.req_(ctx, query, opts)
return resp_raw
# emit config_get after attach # emit config_get after attach
_, cfg_raw = conn.req_('config_get', {}) cfg_raw = req_(ctx, 'config_get', {})
xl.emit(cfg_raw) xl.emit(cfg_raw)
# loop emitting requested logspecs # loop emitting requested logspecs
...@@ -247,10 +266,83 @@ class _XLogger: ...@@ -247,10 +266,83 @@ class _XLogger:
if _ == 0: if _ == 0:
raise ctx.err() raise ctx.err()
_, resp_raw = conn.req_(logspec.query, opts) resp_raw = req_(ctx, logspec.query, opts)
xl.emit(resp_raw) xl.emit(resp_raw)
# _XMsgServer represents a server for handling particular synthetic requests.
#
# for example the server for synthetic x.drb_stats query.
class _XMsgServer:
def __init__(xsrv, name, f):
xsrv.name = name # str message name, e.g. "x.drb_stats"
xsrv._func = f # func(ctx, conn) to run the service
xsrv._reqch = chan() # chan<respch> to send requests to the service
xsrv._runCtx = None # context not done while .run is running
# run runs the extra server on amari service attached to via conn.
@func
def run(xsrv, ctx, conn: amari.Conn, ready: chan):
xsrv._runCtx, cancel = context.with_cancel(ctx)
defer(cancel)
ready.close()
# establish dedicated conn2 so that server does not semantically
# affect requests issued by main logger. For example if we do not and
# main logger queries stats, and x.drb_stats server also queries stats
# internally, then data received by main logger will cover only small
# random period of time instead of full wanted period.
conn2 = amari.connect(ctx, conn.wsuri)
defer(conn2.close)
xsrv._func(ctx, xsrv._reqch, conn2)
# req queries the server and returns its response.
@func
def req_(xsrv, ctx, opts): # -> resp, resp_raw
origCtx = ctx
ctx, cancel = context.merge(ctx, xsrv._runCtx) # need only merge_cancel
defer(cancel)
respch = chan(1)
_, _rx = select(
ctx.done().recv, # 0
(xsrv._reqch.send, (opts, respch)), # 1
)
if _ == 0:
if xsrv._runCtx.err() and not origCtx.err():
raise RuntimeError("%s server is down" % xsrv.name)
raise ctx.err()
_, _rx = select(
ctx.done().recv, # 0
respch.recv, # 1
)
if _ == 0:
if xsrv._runCtx.err() and not origCtx.err():
raise RuntimeError("%s server is down" % xsrv.name)
raise ctx.err()
resp = _rx
r = {'message': xsrv.name} # place 'message' first
r.update(resp)
resp = r
resp_raw = json.dumps(resp,
separators=(',', ':'), # most compact, like Amari does
ensure_ascii=False) # so that e.g. δt comes as is
return resp, resp_raw
# @_xmsg registers func f to provide server for extra messages with specified name.
_xmsg_registry = {} # name -> xsrv_func(ctx, reqch, conn)
def _xmsg(name, f, doc1):
assert name not in _xmsg_registry
f.xlog_doc1 = doc1
_xmsg_registry[name] = f
_xmsg("x.drb_stats", drb._x_stats_srv, "retrieve statistics about data radio bearers")
# ---------------------------------------- # ----------------------------------------
# Reader wraps IO reader to read information generated by xlog. # Reader wraps IO reader to read information generated by xlog.
...@@ -435,11 +527,18 @@ Example for <logspec>+: ...@@ -435,11 +527,18 @@ Example for <logspec>+:
stats[samples,rf]/30s ue_get[stats] erab_get/10s qos_flow_get stats[samples,rf]/30s ue_get[stats] erab_get/10s qos_flow_get
Besides queries supported by Amarisoft LTE stack natively, support for the
following synthetic queries is also provided:
%s
Options: Options:
-h --help show this help -h --help show this help
""" % LogSpec.DEFAULT_PERIOD, file=out) """ % (LogSpec.DEFAULT_PERIOD,
'\n'.join(" %-14s %s" % (q, f.xlog_doc1)
for q, f in sorted(_xmsg_registry.items()))),
file=out)
def main(ctx, argv): def main(ctx, argv):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment