Commit 964f954a authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: Emit sync events periodically

So that xlog stream becomes self-synchronized and could be used even if
we start reading it from some intermediate point instead of only from
the beginning.

We will need this in general - to be able to start reading long log not
only from its beginning, and also in particular for Wendelin systems
where logs are uploaded by Fluentd in chunks and some chunks could be
potentially lost.

Sync events are emitted always unconditionally with default sync
interval being 10x the longest specified period. We also provide users a
way to control sync periodicity via explicitly specifying
"meta.sync/period" query in the logspec.

See kirr/xlte!3 (comment 175796) and
further for related discussion.

This is change of xlog protocol. But it is early days and the only
direct consumer of xlog is amari.kpi which we adjust accordingly. So it
should be ok.
parent 271fad82
...@@ -47,7 +47,7 @@ class LogMeasure: ...@@ -47,7 +47,7 @@ class LogMeasure:
# ._rlog IO reader for enb.log # ._rlog IO reader for enb.log
# #
# ._estats \/ last xlog.Message with read stats result # ._estats \/ last xlog.Message with read stats result
# \/ last xlog.Event | LogError # \/ last xlog.Event\sync | LogError
# \/ None # \/ None
# ._m kpi.Measurement being prepared covering [_estats_prev, _estats) | None # ._m kpi.Measurement being prepared covering [_estats_prev, _estats) | None
# ._m_next kpi.Measurement being prepared covering [_estats, _estats_next) | None # ._m_next kpi.Measurement being prepared covering [_estats, _estats_next) | None
...@@ -153,6 +153,10 @@ def _read(logm): ...@@ -153,6 +153,10 @@ def _read(logm):
if x is None: if x is None:
x = LogError.EOF # represent EOF as LogError x = LogError.EOF # represent EOF as LogError
# ignore sync events
if isinstance(x, xlog.Event) and x.event == "sync":
continue
# handle messages that update current Measurement # handle messages that update current Measurement
if isinstance(x, xlog.Message): if isinstance(x, xlog.Message):
if x.message == "x.drb_stats": if x.message == "x.drb_stats":
...@@ -162,7 +166,7 @@ def _read(logm): ...@@ -162,7 +166,7 @@ def _read(logm):
continue # ignore other messages continue # ignore other messages
# it is an error, event or stats. # it is an error, event\sync or stats.
# if it is an event or stats -> finalize timestamp for _m_next. # if it is an event or stats -> finalize timestamp for _m_next.
# start building next _m_next covering [x, x_next). # start building next _m_next covering [x, x_next).
# shift m <- ._m <- ._m_next <- (new Measurement | None for LogError) # shift m <- ._m <- ._m_next <- (new Measurement | None for LogError)
......
...@@ -536,6 +536,35 @@ def test_LogMeasure_cc_wraparound(): ...@@ -536,6 +536,35 @@ def test_LogMeasure_cc_wraparound():
readok(4, 10) # 4-5 readok(4, 10) # 4-5
# verify that LogMeasure ignores syncs in xlog stream.
@func
def test_LogMeasure_sync():
t = tLogMeasure()
defer(t.close)
_ = t.expect1
cc = 'rrc_connection_request'
CC = 'RRC.ConnEstabAtt.sum'
t.xlog( jstats(1, {}) )
t.xlog( jstats(2, {cc: 4}) )
t.xlog( '{"meta": {"event": "sync", "time": 2.5, "state": "attached", "reason": "periodic", "generator": "xlog ws://localhost:9001 stats[]/30.0s"}}' )
t.xlog( jstats(3, {cc: 7}) )
def readok(τ, CC_value):
_('X.Tstart', τ)
_('X.δT', int(τ+1)-τ)
if CC_value is not None:
_(CC, CC_value)
else:
t.expect_nodata()
t.read()
readok(0.02, None) # attach-1
readok(1, 4) # 1-2
readok(2, 3) # 2-3 jumping over sync
# jstats returns json-encoded stats message corresponding to counters dict. # jstats returns json-encoded stats message corresponding to counters dict.
# τ goes directly to stats['utc'] as is. # τ goes directly to stats['utc'] as is.
def jstats(τ, counters): # -> str def jstats(τ, counters): # -> str
......
...@@ -45,6 +45,8 @@ ...@@ -45,6 +45,8 @@
# - "service detach" when xlog disconnects from monitored LTE service # - "service detach" when xlog disconnects from monitored LTE service
# - "service connect failure" when xlog tries to connect to monitored LTE service # - "service connect failure" when xlog tries to connect to monitored LTE service
# with unsuccessful result. # with unsuccessful result.
# - "sync" emitted periodically with current state of
# connection to LTE service and xlog setup
# - "xlog failure" on internal xlog error # - "xlog failure" on internal xlog error
...@@ -127,7 +129,22 @@ class LogSpec: ...@@ -127,7 +129,22 @@ class LogSpec:
# xlog queries service @wsuri periodically according to queries specified by # xlog queries service @wsuri periodically according to queries specified by
# logspecv and logs the result. # logspecv and logs the result.
def xlog(ctx, wsuri, logspecv): def xlog(ctx, wsuri, logspecv):
xl = _XLogger(wsuri, logspecv) # make sure we always have meta.sync - either the caller specifies it
# explicitly, or we add it automatically to come first with default
# 10x·longest periodicity.
lsync = None
pmax = 1
for (i,l) in enumerate(logspecv):
pmax = max(pmax, l.period)
if l.query == "meta.sync":
lsync = l
logspecv = logspecv[:] # keep caller's intact
if lsync is None:
lsync = LogSpec("meta.sync", [], pmax*10)
logspecv.insert(0, lsync)
xl = _XLogger(wsuri, logspecv, lsync.period)
slogspecv = ' '.join(['%s' % _ for _ in logspecv]) slogspecv = ' '.join(['%s' % _ for _ in logspecv])
xl.jemit("start", {"generator": "xlog %s %s" % (wsuri, slogspecv)}) xl.jemit("start", {"generator": "xlog %s %s" % (wsuri, slogspecv)})
...@@ -146,18 +163,21 @@ def xlog(ctx, wsuri, logspecv): ...@@ -146,18 +163,21 @@ def xlog(ctx, wsuri, logspecv):
# e.g. disk full in xl.jemit itself # e.g. disk full in xl.jemit itself
log.exception('xlog failure (second level):') log.exception('xlog failure (second level):')
δt_reconnect = min(3, lsync.period)
_, _rx = select( _, _rx = select(
ctx.done().recv, # 0 ctx.done().recv, # 0
time.after(3).recv, # 1 time.after(δt_reconnect).recv, # 1
) )
if _ == 0: if _ == 0:
raise ctx.err() raise ctx.err()
# _XLogger serves xlog implementation. # _XLogger serves xlog implementation.
class _XLogger: class _XLogger:
def __init__(xl, wsuri, logspecv): def __init__(xl, wsuri, logspecv, δt_sync):
xl.wsuri = wsuri xl.wsuri = wsuri
xl.logspecv = logspecv xl.logspecv = logspecv
xl.δt_sync = δt_sync # = logspecv.get("meta.sync").period
xl.tsync = time.now() # first `start` serves as sync
# emit saves line to the log. # emit saves line to the log.
def emit(xl, line): def emit(xl, line):
...@@ -172,9 +192,24 @@ class _XLogger: ...@@ -172,9 +192,24 @@ class _XLogger:
d = {"meta": d} d = {"meta": d}
xl.emit(json.dumps(d)) xl.emit(json.dumps(d))
# jemit_sync emits line with sync event to the log.
# TODO logrotate at this point
def jemit_sync(xl, state, args_dict):
tnow = time.now()
d = {"state": state,
"generator": "xlog %s %s" % (xl.wsuri, ' '.join(['%s' % _ for _ in xl.logspecv]))}
d.update(args_dict)
xl.jemit("sync", d)
xl.tsync = tnow
# xlog1 performs one cycle of attach/log,log,log.../detach. # xlog1 performs one cycle of attach/log,log,log.../detach.
@func @func
def xlog1(xl, ctx): def xlog1(xl, ctx):
# emit sync periodically even in detached state
# this is useful to still know e.g. intended logspec if the service is stopped for a long time
if time.now() - xl.tsync >= xl.δt_sync:
xl.jemit_sync("detached", {})
# connect to the service # connect to the service
try: try:
conn = amari.connect(ctx, xl.wsuri) conn = amari.connect(ctx, xl.wsuri)
...@@ -215,10 +250,10 @@ class _XLogger: ...@@ -215,10 +250,10 @@ class _XLogger:
xsrv_ready.recv() xsrv_ready.recv()
# spawn main logger # spawn main logger
wg.go(xl._xlog1, conn, xmsgsrv_dict) wg.go(xl._xlog1, conn, xmsgsrv_dict, srv_info)
def _xlog1(xl, ctx, conn, xmsgsrv_dict): def _xlog1(xl, ctx, conn, xmsgsrv_dict, srv_info):
# req_ queries either amari service directly, or an extra message service. # req_ queries either amari service directly, or an extra message service.
def req_(ctx, query, opts): # -> resp_raw def req_(ctx, query, opts): # -> resp_raw
if query in xmsgsrv_dict: if query in xmsgsrv_dict:
...@@ -273,6 +308,9 @@ class _XLogger: ...@@ -273,6 +308,9 @@ class _XLogger:
if _ == 0: if _ == 0:
raise ctx.err() raise ctx.err()
if logspec.query == 'meta.sync':
xl.jemit_sync("attached", srv_info)
else:
resp_raw = req_(ctx, logspec.query, opts) resp_raw = req_(ctx, logspec.query, opts)
xl.emit(resp_raw) xl.emit(resp_raw)
...@@ -539,6 +577,11 @@ following synthetic queries is also provided: ...@@ -539,6 +577,11 @@ following synthetic queries is also provided:
%s %s
Additionally the following queries are used to control xlog itself:
meta.sync specify how often synchronization events are emitted
default is 10x the longest period
Options: Options:
-h --help show this help -h --help show this help
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment