Commit bcfd82dd authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: Add support for arbitrary query options

Before this patch we were supporting only boolean option flags - with, for
example, stats[rf] meaning stats query with {"rf": True} arguments. Now we add
support for arbitrary types, so that it is possible to specify e.g. integer or
string query options, as well as some boolean flag set to false.

This should be good for generality.

For backward compatibility the old way to implicitly specify "on" flags is
continued to be supported.
parent 70b4b71c
...@@ -85,18 +85,24 @@ log = logging.getLogger('xlte.amari.xlog') ...@@ -85,18 +85,24 @@ log = logging.getLogger('xlte.amari.xlog')
# For example stats[rf]/10s. # For example stats[rf]/10s.
class LogSpec: class LogSpec:
# .query e.g. 'stats' # .query e.g. 'stats'
# .optv [] with flags to send with query # .opts {} opt -> value to send with query
# .period how often to issue the query (seconds) # .period how often to issue the query (seconds)
DEFAULT_PERIOD = 60 DEFAULT_PERIOD = 60
def __init__(spec, query, optv, period): def __init__(spec, query, opts, period):
spec.query = query spec.query = query
spec.optv = optv spec.opts = opts
spec.period = period spec.period = period
def __str__(spec): def __str__(spec):
return "%s[%s]/%gs" % (spec.query, ','.join(spec.optv), spec.period) optv = []
for opt, val in spec.opts.items():
if val is True:
optv.append(opt)
else:
optv.append('%s=%s' % (opt, json.dumps(val)))
return "%s[%s]/%gs" % (spec.query, ','.join(optv), spec.period)
# LogSpec.parse parses text into LogSpec. # LogSpec.parse parses text into LogSpec.
@staticmethod @staticmethod
...@@ -104,7 +110,7 @@ class LogSpec: ...@@ -104,7 +110,7 @@ class LogSpec:
def bad(reason): def bad(reason):
raise ValueError("invalid logspec %s: %s" % (qq(text), reason)) raise ValueError("invalid logspec %s: %s" % (qq(text), reason))
optv = [] opts = {}
period = LogSpec.DEFAULT_PERIOD period = LogSpec.DEFAULT_PERIOD
query = text query = text
_ = query.rfind('/') _ = query.rfind('/')
...@@ -126,13 +132,19 @@ class LogSpec: ...@@ -126,13 +132,19 @@ class LogSpec:
if _ == -1: if _ == -1:
bad("missing closing ]") bad("missing closing ]")
optv = tail[1:_].split(',') optv = tail[1:_].split(',')
for opt in optv:
val = True
if '=' in opt:
opt, val = opt.split('=', 1)
val = json.loads(val)
opts[opt] = val
tail = tail[_+1:] tail = tail[_+1:]
for c in '[]/ ': for c in '[]/ ':
if c in query: if c in query:
bad("invalid query") bad("invalid query")
return LogSpec(query, optv, period) return LogSpec(query, opts, period)
# IWriter represents output to where xlog writes its data. # IWriter represents output to where xlog writes its data.
...@@ -166,10 +178,10 @@ def xlog(ctx, wsuri, w: IWriter, logspecv): ...@@ -166,10 +178,10 @@ def xlog(ctx, wsuri, w: IWriter, logspecv):
logspecv = logspecv[:] # keep caller's intact logspecv = logspecv[:] # keep caller's intact
if lsync is None: if lsync is None:
isync = 0 isync = 0
lsync = LogSpec("meta.sync", [], pmax*10) lsync = LogSpec("meta.sync", {}, pmax*10)
logspecv.insert(0, lsync) logspecv.insert(0, lsync)
if lconfig_get is None: if lconfig_get is None:
logspecv.insert(isync+1, LogSpec("config_get", [], lsync.period)) logspecv.insert(isync+1, LogSpec("config_get", {}, lsync.period))
# verify that sync will come at least every LOS_window records # verify that sync will come at least every LOS_window records
ns = 0 ns = 0
...@@ -354,10 +366,6 @@ class _XLogger: ...@@ -354,10 +366,6 @@ class _XLogger:
logspec = xl.logspecv[imin] logspec = xl.logspecv[imin]
tnextv[imin] += logspec.period tnextv[imin] += logspec.period
opts = {}
for opt in logspec.optv:
opts[opt] = True
# issue queries with planned schedule # issue queries with planned schedule
# TODO detect time overruns and correct schedule correspondingly # TODO detect time overruns and correct schedule correspondingly
tnow = time.now() tnow = time.now()
...@@ -382,7 +390,7 @@ class _XLogger: ...@@ -382,7 +390,7 @@ class _XLogger:
xl.jemit_sync("attached", "periodic", isync) xl.jemit_sync("attached", "periodic", isync)
else: else:
t_rx, resp, resp_raw = req_(ctx, logspec.query, opts) t_rx, resp, resp_raw = req_(ctx, logspec.query, logspec.opts)
srv_time = resp["time"] srv_time = resp["time"]
srv_utc = resp.get("utc") srv_utc = resp.get("utc")
xl.emit(resp_raw) xl.emit(resp_raw)
......
...@@ -300,11 +300,11 @@ def test_Reader_timestamp_from_sync_wo_utc(): ...@@ -300,11 +300,11 @@ def test_Reader_timestamp_from_sync_wo_utc():
def test_LogSpec(): def test_LogSpec():
logspec = "stats[samples,rf]/60s" logspec = 'stats[samples,rf,abc=123,def="hello world"]/60s'
spec = xlog.LogSpec.parse(logspec) spec = xlog.LogSpec.parse(logspec)
assert spec.query == "stats" assert spec.query == "stats"
assert spec.optv == ["samples", "rf"] assert spec.opts == {"samples": True, "rf": True, "abc": 123, "def": "hello world"}
assert spec.period == 60.0 assert spec.period == 60.0
assert str(spec) == logspec assert str(spec) == logspec
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment