Commit 0633d26f authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog += Reader

xlog.Reader could be used to parse and read back data previously saved
by xlog. In the next patch we will use it in Amarisoft driver for KPI
measurements.
parent dc1d5481
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
- use xlog and LogSpec to organize logging of information available via WebSocket access(*). - use xlog and LogSpec to organize logging of information available via WebSocket access(*).
The information is logged in JSON Lines format. See 'xamari help xlog' for details. The information is logged in JSON Lines format. See 'xamari help xlog' for details.
- use Reader to read logged information from xlog.
(*) for example result of stats, ue_get and erab_get queries. (*) for example result of stats, ue_get and erab_get queries.
...@@ -56,8 +57,6 @@ ...@@ -56,8 +57,6 @@
# reason for rejection: on every rotation we want to emit "end of file" # reason for rejection: on every rotation we want to emit "end of file"
# entries to old file + header to new file. # entries to old file + header to new file.
# TODO log loading -> DataFrame
from xlte import amari from xlte import amari
...@@ -237,6 +236,159 @@ class _XLogger: ...@@ -237,6 +236,159 @@ class _XLogger:
xl.emit(resp_raw) xl.emit(resp_raw)
# ----------------------------------------
# Reader wraps IO reader to read information generated by xlog.
#
# Use .read() to retrieve xlog entries.
# Use .close() when done.
#
# The reader must provide .readline() method.
# The ownership of wrapped reader is transferred to the Reader.
class ParseError(RuntimeError): pass
class Reader:
# ._r underlying IO reader
# ._lineno current line number
pass
# xdict represents dict loaded from xlog entry.
#
# Besides usual dict properties it also has information about file position of
# the entry, and the path to the dict - e.g. /message/stats/counters.
class xdict(dict):
# .pos (ioname, lineno)
# .path ()
pass
# Event represents one event in xlog.
class Event(xdict):
# .event
# .timestamp seconds since epoch
pass
# Message represents result of one query in xlog.
class Message(xdict):
# .message
# .timestamp seconds since epoch
pass
# Reader(r) creates new reader that will read xlog data from r.
@func(Reader)
def __init__(xr, r):
xr._r = r
xr._lineno = 0
# parse header
try:
head = xr._jread1()
if head is None:
raise xr._err("header: unexpected EOF")
meta = head.get1("meta", dict)
ev0, t0 = xr._parse_metahead(meta)
if ev0 != "start":
raise xr._err("header: starts with meta.event=%s ; expected `start`" % ev0)
gen = meta.get1("generator", str)
# TODO parse generator -> ._xlogspecv
except:
xr._r.close()
raise
# close release resources associated with the Reader.
@func(Reader)
def close(xr):
xr._r.close()
# read returns next xlog entry or None at EOF.
@func(Reader)
def read(xr): # -> Event|Message|None
x = xr._jread1()
if x is None:
return None
if "meta" in x:
x.__class__ = Event
meta = x.get1("meta", dict)
x.event, x.timestamp = xr._parse_metahead(meta)
return x
if "message" in x:
x.__class__ = Message
x.message = x.get1("message", str)
# NOTE .time is internal eNB time using clock originating at eNB startup.
# .utc is seconds since epoch counted using OS clock.
# .utc field was added in 2022-12-01 - see https://support.amarisoft.com/issues/21934
x.timestamp = x.get1("utc", (float,int))
return x
raise xr._err("invalid xlog entry")
# _err returns ParseError with lineno prefix.
@func(Reader)
def _err(xr, text):
return ParseError("%s:%d : %s" % (_ioname(xr._r), xr._lineno, text))
# _jread1 reads next line and JSON-decodes it.
# None is returned at EOF.
@func(Reader)
def _jread1(xr): # -> xdict|None
xr._lineno += 1
try:
l = xr._r.readline()
except Exception as e:
raise xr._err("read") from e
if len(l) == 0:
return None # EOF
try:
d = json.loads(l)
except Exception as e:
raise xr._err("invalid json: %s" % e) from None
if not isinstance(d, dict):
raise xr._err("got %s instead of dict" % type(d))
xd = xdict(d)
xd.pos = (_ioname(xr._r), xr._lineno)
xd.path = ()
return xd
# _parse_metahead extracts and validates event/time from "meta" entry.
@func(Reader)
def _parse_metahead(xr, meta): # -> event, t
event = meta.get1("event", str)
t = meta.get1("time", (float,int))
return event, t
# get1 retrieves d[key] and verifies it is instance of typeok.
@func(xdict)
def get1(xd, key, typeok):
if key not in xd:
raise ParseError("%s:%d/%s no `%s`" %
(xd.pos[0], xd.pos[1], '/'.join(xd.path), key))
val = xd[key]
if not isinstance(val, typeok):
raise ParseError("%s:%d/%s : got %s ; expected `%s`" %
(xd.pos[0], xd.pos[1], '/'.join(xd.path + (key,)), type(val), typeok))
if type(val) is dict:
val = xdict(val)
val.pos = xd.pos
val.path = xd.path + (key,)
return val
# _ioname returns name of a file-like f.
def _ioname(f):
if hasattr(f, 'name'):
return f.name
else:
return ''
# ---------------------------------------- # ----------------------------------------
import sys, getopt import sys, getopt
......
# -*- coding: utf-8 -*-
# Copyright (C) 2022 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from xlte.amari import xlog
from golang import func, defer
import io
from pytest import raises
@func
def test_Reader():
data = b"""\
{"meta": {"event": "start", "time": 0.01, "generator": "xlog ws://localhost:9001 ue_get[]/3.0s erab_get[]/3.0s"}}
{"meta": {"event": "service attach", "time": 0.02, "srv_name": "ENB", "srv_type": "ENB", "srv_version": "2022-12-01"}}
{"message":"ue_get","ue_list":[],"message_id":2,"time":123.4,"utc":9613.347}
zzzqqqrrrr
{"message":"hello","message_id":3,"utc":10000}
"""
xr = xlog.Reader(io.BytesIO(data))
defer(xr.close)
# TODO check xr.tstart == 0.01
# TODO check xr.xlogspecv == ue_get[]/3.0s erab_get[]/3.0s
# :2
_ = xr.read()
assert type(_) is xlog.Event
assert _.event == "service attach"
assert _.timestamp == 0.02
assert _ == {"meta": {"event": "service attach",
"time": 0.02,
"srv_name": "ENB",
"srv_type": "ENB",
"srv_version": "2022-12-01"}}
# :3
_ = xr.read()
assert type(_) is xlog.Message
assert _.message == "ue_get"
assert _.timestamp == 9613.347
assert _ == {"message": "ue_get",
"ue_list": [],
"message_id": 2,
"time": 123.4,
"utc": 9613.347}
# :4 (bad input)
with raises(xlog.ParseError, match=":4 : invalid json"):
_ = xr.read()
# :5 (restore after bad input)
_ = xr.read()
assert type(_) is xlog.Message
assert _.message == "hello"
assert _.timestamp == 10000
assert _ == {"message": "hello",
"message_id": 3,
"utc": 10000}
# EOF
_ = xr.read()
assert _ is None
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment