Commit 9d9d20f3 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: Unify start with sync

Let's use "sync(reason=start)" instead of dedicated "start" event for
uniformity. Periodic syncs are now "sync(reason=periodic)" and after
logrotation support there will be also "pre-logrotate" and
"post-logrotate" reasons. Emit "sync(reason=stop)" at xlog shutdown for
uniformity and to make it more clear from looking at just enb.xlog about
what is xlog state at the end.

Stop requiring "start" to be present in the header - we will soon rework
xlog reader to look around for nearby sync automatically so that reading
could be started from any position in the stream.
parent 67ece601
......@@ -154,7 +154,7 @@ def _read(logm):
x = LogError.EOF # represent EOF as LogError
# ignore sync events
if isinstance(x, xlog.Event) and x.event == "sync":
if isinstance(x, xlog.SyncEvent):
continue
# handle messages that update current Measurement
......
......@@ -40,12 +40,12 @@
# Queries are specific to monitored LTE service.
# Events are specific to xlog itself and can be as follows:
#
# - "start" when xlog starts
# - "service attach" when xlog successfully connects to monitored LTE service
# - "service detach" when xlog disconnects from monitored LTE service
# - "service connect failure" when xlog tries to connect to monitored LTE service
# with unsuccessful result.
# - "sync" emitted periodically with current state of
# - "sync" emitted periodically and when xlogs starts,
# stops (TODO and rotate logs). Comes with current state of
# connection to LTE service and xlog setup
# - "xlog failure" on internal xlog error
......@@ -128,6 +128,7 @@ class LogSpec:
# xlog queries service @wsuri periodically according to queries specified by
# logspecv and logs the result.
@func
def xlog(ctx, wsuri, logspecv):
# make sure we always have meta.sync - either the caller specifies it
# explicitly, or we add it automatically to come first with default
......@@ -155,8 +156,11 @@ def xlog(ctx, wsuri, logspecv):
xl = _XLogger(wsuri, logspecv, lsync.period)
slogspecv = ' '.join(['%s' % _ for _ in logspecv])
xl.jemit("start", {"generator": "xlog %s %s" % (wsuri, slogspecv)})
# emit sync at start/stop
xl.jemit_sync("detached", "start", {})
def _():
xl.jemit_sync("detached", "stop", {})
defer(_)
while 1:
try:
......@@ -185,8 +189,8 @@ class _XLogger:
def __init__(xl, wsuri, logspecv, δt_sync):
xl.wsuri = wsuri
xl.logspecv = logspecv
xl.δt_sync = δt_sync # = logspecv.get("meta.sync").period
xl.tsync = time.now() # first `start` serves as sync
xl.δt_sync = δt_sync # = logspecv.get("meta.sync").period
xl.tsync = float('-inf') # never yet
# emit saves line to the log.
def emit(xl, line):
......@@ -203,9 +207,10 @@ class _XLogger:
# jemit_sync emits line with sync event to the log.
# TODO logrotate at this point
def jemit_sync(xl, state, args_dict):
def jemit_sync(xl, state, reason, args_dict):
tnow = time.now()
d = {"state": state,
"reason": reason,
"generator": "xlog %s %s" % (xl.wsuri, ' '.join(['%s' % _ for _ in xl.logspecv]))}
d.update(args_dict)
xl.jemit("sync", d)
......@@ -217,7 +222,7 @@ class _XLogger:
# emit sync periodically even in detached state
# this is useful to still know e.g. intended logspec if the service is stopped for a long time
if time.now() - xl.tsync >= xl.δt_sync:
xl.jemit_sync("detached", {})
xl.jemit_sync("detached", "periodic", {})
# connect to the service
try:
......@@ -314,7 +319,7 @@ class _XLogger:
raise ctx.err()
if logspec.query == 'meta.sync':
xl.jemit_sync("attached", srv_info)
xl.jemit_sync("attached", "periodic", srv_info)
else:
resp_raw = req_(ctx, logspec.query, opts)
xl.emit(resp_raw)
......@@ -429,6 +434,13 @@ class Message(xdict):
# .timestamp seconds since epoch
pass
# SyncEvent specializes Event and represents "sync" event in xlog.
class SyncEvent(Event):
# .state
# .reason
# .generator
pass
# Reader(r) creates new reader that will read xlog data from r.
@func(Reader)
......@@ -436,23 +448,6 @@ def __init__(xr, r):
xr._r = r
xr._lineno = 0
# parse header
try:
head = xr._jread1()
if head is None:
raise xr._err("header: unexpected EOF")
meta = head.get1("meta", dict)
ev0, t0 = xr._parse_metahead(meta)
if ev0 != "start":
raise xr._err("header: starts with meta.event=%s ; expected `start`" % ev0)
gen = meta.get1("generator", str)
# TODO parse generator -> ._xlogspecv
except:
xr._r.close()
raise
# close release resources associated with the Reader.
@func(Reader)
def close(xr):
......@@ -469,6 +464,16 @@ def read(xr): # -> Event|Message|None
x.__class__ = Event
meta = x.get1("meta", dict)
x.event, x.timestamp = xr._parse_metahead(meta)
if x.event in {"sync", "start"}: # for backward compatibility with old logs meta:start
x.__class__ = SyncEvent # is reported to users as sync(start) event
x.generator = meta.get1("generator", str)
if x.event == "start":
x.state = "detached"
x.reason = "start"
else:
x.state = meta.get1("state", str)
x.reason = meta.get1("reason", str)
# TODO parse generator -> .logspecv
return x
if "message" in x:
......
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
# Copyright (C) 2022-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
......@@ -38,8 +38,14 @@ zzzqqqrrrr
xr = xlog.Reader(io.BytesIO(data))
defer(xr.close)
# TODO check xr.tstart == 0.01
# TODO check xr.xlogspecv == ue_get[]/3.0s erab_get[]/3.0s
# :1
_ = xr.read()
assert type(_) is xlog.SyncEvent
assert _.event == "start"
assert _.timestamp == 0.01
assert _ == {"meta": {"event": "start",
"time": 0.01,
"generator": "xlog ws://localhost:9001 ue_get[]/3.0s erab_get[]/3.0s"}}
# :2
_ = xr.read()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment