Commit 79d10eb9 authored by Kirill Smelkov's avatar Kirill Smelkov

amari.xlog: Move main logger to a thread

We will soon need to run 2 threads:

- one with the main logger, and
- another one to serve requests for synthetic x.drb_stats queries

Both main and the second thread will be run via sync.WorkGroup to cancel
each other in case of failure somewhere. So since WorkGroup.wait(),
similarly to all pygolang operations, is not interrupted by signals(*),
we need to wire ctx to be passed through all operations and manage to
cancel that context on SIGINT/SIGTERM.

This patch:

1. adjusts xlog to wire ctx through all call chains and moves ._xlog1()
   to be run in the thread.
2. adjusts amari.Conn to take ctx as argument on all operations and
   react reasonably on that ctx cancel. We need to do it here because
   xlog uses Conn internally.
3. adjusts xamari main driver to setup root context that is canceled on
   SIGINT/SIGTERM similarly e.g. to how nxdtest does it in
   nexedi/nxdtest@b0cf277d .

(*) see nexedi/pygolang@e18adbab for details.
parent c967c8b5
...@@ -45,11 +45,12 @@ class ConnClosedError(ConnError): ...@@ -45,11 +45,12 @@ class ConnClosedError(ConnError):
# connect connects to a service via WebSocket. # connect connects to a service via WebSocket.
def connect(wsuri): # -> Conn def connect(ctx, wsuri): # -> Conn
#websocket.enableTrace(True) # TODO on $XLTE_AMARI_WS_DEBUG=y ? #websocket.enableTrace(True) # TODO on $XLTE_AMARI_WS_DEBUG=y ?
ws = websocket.WebSocket() ws = websocket.WebSocket()
ws.settimeout(5) # reasonable default ws.settimeout(5) # reasonable default
try: try:
# FIXME handle ctx cancel (but it won't stuck forever due to ._ws own timeout)
ws.connect(wsuri) ws.connect(wsuri)
except Exception as ex: except Exception as ex:
raise ConnError("connect") from ex raise ConnError("connect") from ex
...@@ -169,13 +170,13 @@ class Conn: ...@@ -169,13 +170,13 @@ class Conn:
# req sends request and waits for response. # req sends request and waits for response.
def req(conn, msg, args_dict): # -> response def req(conn, ctx, msg, args_dict): # -> response
rx, _ = conn.req_(msg, args_dict) rx, _ = conn.req_(ctx, msg, args_dict)
return rx return rx
@func @func
def req_(conn, msg, args_dict): # -> response, raw_response def req_(conn, ctx, msg, args_dict): # -> response, raw_response
rxq = conn._send_msg(msg, args_dict) rxq = conn._send_msg(ctx, msg, args_dict)
# handle rx timeout ourselves. We cannot rely on global rx timeout # handle rx timeout ourselves. We cannot rely on global rx timeout
# since e.g. other replies might be coming in again and again. # since e.g. other replies might be coming in again and again.
...@@ -187,10 +188,13 @@ class Conn: ...@@ -187,10 +188,13 @@ class Conn:
rxt = _.c rxt = _.c
_, _rx = select( _, _rx = select(
rxt.recv, # 0 ctx.done().recv, # 0
rxq.recv_, # 1 rxt.recv, # 1
rxq.recv_, # 2
) )
if _ == 0: if _ == 0:
raise ctx.err()
if _ == 1:
raise websocket.WebSocketTimeoutException("timed out waiting for response") raise websocket.WebSocketTimeoutException("timed out waiting for response")
_, ok = _rx _, ok = _rx
...@@ -203,7 +207,7 @@ class Conn: ...@@ -203,7 +207,7 @@ class Conn:
# _send_msg sends message to the service. # _send_msg sends message to the service.
def _send_msg(conn, msg, args_dict): # -> rxq def _send_msg(conn, ctx, msg, args_dict): # -> rxq
assert isinstance(args_dict, dict) assert isinstance(args_dict, dict)
assert 'message' not in args_dict assert 'message' not in args_dict
assert 'message_id' not in args_dict assert 'message_id' not in args_dict
...@@ -219,6 +223,7 @@ class Conn: ...@@ -219,6 +223,7 @@ class Conn:
d.update(args_dict) d.update(args_dict)
jmsg = json.dumps(d) jmsg = json.dumps(d)
try: try:
# FIXME handle ctx cancel (but it won't stuck forever due to ._ws own timeout)
conn._ws.send(jmsg) conn._ws.send(jmsg)
except Exception as ex: except Exception as ex:
raise ConnError("send") from ex raise ConnError("send") from ex
......
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Based on https://lab.nexedi.com/nexedi/zodbtools/blob/master/zodbtools/zodb.py # Based on https://lab.nexedi.com/nexedi/zodbtools/blob/master/zodbtools/zodb.py
# Copyright (C) 2017-2022 Nexedi SA and Contributors. # Copyright (C) 2017-2023 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# Jérome Perrin <jerome@nexedi.com> # Jérome Perrin <jerome@nexedi.com>
# #
...@@ -30,6 +30,10 @@ import getopt ...@@ -30,6 +30,10 @@ import getopt
import importlib import importlib
import sys import sys
from golang import func, defer, chan, go
from golang import context, os as gos, syscall
from golang.os import signal
# command_name -> command_module # command_name -> command_module
command_dict = {} command_dict = {}
...@@ -97,6 +101,7 @@ def help(argv): ...@@ -97,6 +101,7 @@ def help(argv):
sys.exit(2) sys.exit(2)
@func
def main(): def main():
try: try:
optv, argv = getopt.getopt(sys.argv[1:], "h", ["help"]) optv, argv = getopt.getopt(sys.argv[1:], "h", ["help"])
...@@ -127,7 +132,24 @@ def main(): ...@@ -127,7 +132,24 @@ def main():
print("Run 'xamari help' for usage.", file=sys.stderr) print("Run 'xamari help' for usage.", file=sys.stderr)
sys.exit(2) sys.exit(2)
return command_module.main(argv) # SIGINT/SIGTERM -> ctx cancel
ctx, cancel = context.with_cancel(context.background())
sigq = chan(1, dtype=gos.Signal)
signal.Notify(sigq, syscall.SIGINT, syscall.SIGTERM)
def _():
signal.Stop(sigq)
sigq.close()
defer(_)
def _(cancel):
sig, ok = sigq.recv_()
if not ok:
return
print("# %s" % sig, file=sys.stderr)
cancel()
go(_, cancel)
defer(cancel)
return command_module.main(ctx, argv)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -62,8 +62,8 @@ from xlte import amari ...@@ -62,8 +62,8 @@ from xlte import amari
import json import json
import traceback import traceback
from golang import func, defer from golang import func, defer, chan, select
from golang import time from golang import context, sync, time
from golang.gcompat import qq from golang.gcompat import qq
import logging; log = logging.getLogger('xlte.amari.xlog') import logging; log = logging.getLogger('xlte.amari.xlog')
...@@ -124,7 +124,7 @@ class LogSpec: ...@@ -124,7 +124,7 @@ class LogSpec:
# xlog queries service @wsuri periodically according to queries specified by # xlog queries service @wsuri periodically according to queries specified by
# logspecv and logs the result. # logspecv and logs the result.
def xlog(wsuri, logspecv): def xlog(ctx, wsuri, logspecv):
xl = _XLogger(wsuri, logspecv) xl = _XLogger(wsuri, logspecv)
slogspecv = ' '.join(['%s' % _ for _ in logspecv]) slogspecv = ' '.join(['%s' % _ for _ in logspecv])
...@@ -132,8 +132,10 @@ def xlog(wsuri, logspecv): ...@@ -132,8 +132,10 @@ def xlog(wsuri, logspecv):
while 1: while 1:
try: try:
xl.xlog1() xl.xlog1(ctx)
except Exception as ex: except Exception as ex:
if ctx.err() is not None:
raise
if not isinstance(ex, amari.ConnError): if not isinstance(ex, amari.ConnError):
log.exception('xlog failure:') log.exception('xlog failure:')
try: try:
...@@ -164,10 +166,10 @@ class _XLogger: ...@@ -164,10 +166,10 @@ class _XLogger:
# xlog1 performs one cycle of attach/log,log,log.../detach. # xlog1 performs one cycle of attach/log,log,log.../detach.
@func @func
def xlog1(xl): def xlog1(xl, ctx):
# connect to the service # connect to the service
try: try:
conn = amari.connect(xl.wsuri) conn = amari.connect(ctx, xl.wsuri)
except Exception as ex: except Exception as ex:
xl.jemit("service connect failure", {"reason": str(ex)}) xl.jemit("service connect failure", {"reason": str(ex)})
if not isinstance(ex, amari.ConnError): if not isinstance(ex, amari.ConnError):
...@@ -191,15 +193,19 @@ class _XLogger: ...@@ -191,15 +193,19 @@ class _XLogger:
raise raise
defer(_) defer(_)
xl._xlog1(conn) wg = sync.WorkGroup(ctx)
defer(wg.wait)
# spawn main logger
wg.go(xl._xlog1, conn)
def _xlog1(xl, conn): def _xlog1(xl, ctx, conn):
# emit config_get after attach # emit config_get after attach
_, cfg_raw = conn.req_('config_get', {}) _, cfg_raw = conn.req_('config_get', {})
xl.emit(cfg_raw) xl.emit(cfg_raw)
# loop emitting requested logspecs # loop emitting requested logspecs
t0 = time.now() t0 = time.now()
tnextv = [0]*len(xl.logspecv) # [i] - next time to arm for logspecv[i] relative to t0 tnextv = [0]*len(xl.logspecv) # [i] - next time to arm for logspecv[i] relative to t0
...@@ -234,7 +240,12 @@ class _XLogger: ...@@ -234,7 +240,12 @@ class _XLogger:
tarm = t0 + tmin tarm = t0 + tmin
δtsleep = tarm - tnow δtsleep = tarm - tnow
if δtsleep > 0: if δtsleep > 0:
time.sleep(δtsleep) _, _rx = select(
ctx.done().recv, # 0
time.after(δtsleep).recv, # 1
)
if _ == 0:
raise ctx.err()
_, resp_raw = conn.req_(logspec.query, opts) _, resp_raw = conn.req_(logspec.query, opts)
xl.emit(resp_raw) xl.emit(resp_raw)
...@@ -431,7 +442,7 @@ Options: ...@@ -431,7 +442,7 @@ Options:
""" % LogSpec.DEFAULT_PERIOD, file=out) """ % LogSpec.DEFAULT_PERIOD, file=out)
def main(argv): def main(ctx, argv):
try: try:
optv, argv = getopt.getopt(argv[1:], "h", ["help"]) optv, argv = getopt.getopt(argv[1:], "h", ["help"])
except getopt.GetoptError as e: except getopt.GetoptError as e:
...@@ -454,4 +465,4 @@ def main(argv): ...@@ -454,4 +465,4 @@ def main(argv):
for arg in argv[1:]: for arg in argv[1:]:
logspecv.append( LogSpec.parse(arg) ) logspecv.append( LogSpec.parse(arg) )
xlog(wsuri, logspecv) xlog(ctx, wsuri, logspecv)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment