Commit 81b1907a authored by Kirill Smelkov's avatar Kirill Smelkov

Propagate cancellation to spawned test jobs

So that if a test run is canceled in ERP5 UI, nxdtest stops its run
soon, instead of after several hours in case of
SlapOS.SoftwareReleases.IntegrationTest-* tests.

See the main (938b5455) and other patches for details.

/helped-and-reviewed-by @jerome
/reviewed-on !14

* y/cancel:
  Add test for cancel propagation
  tests: Run nxdtest.main for each test in a separate thread, so that pytest.timeout generally works
  Propagate cancellation to spawned test jobs
  Raise ctx.err() if test run was cancelled
  Stop spawned process softly on ctx cancel
parents 5acd1359 5d656ccf
Pipeline #18940 failed with stage
in 0 seconds
...@@ -57,13 +57,13 @@ from __future__ import print_function, absolute_import ...@@ -57,13 +57,13 @@ from __future__ import print_function, absolute_import
from erp5.util.taskdistribution import TaskDistributor from erp5.util.taskdistribution import TaskDistributor
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from time import time, sleep, strftime, gmtime, localtime from time import strftime, gmtime, localtime
import os, sys, argparse, logging, traceback, re, pwd, socket import os, sys, argparse, logging, traceback, re, pwd, socket
from errno import ESRCH, EPERM from errno import ESRCH, EPERM
from os.path import dirname from os.path import dirname
import six import six
from golang import b, defer, func, select, default from golang import b, defer, func, select, default
from golang import context, sync from golang import errors, context, sync, time
import psutil import psutil
# trun.py is a helper via which we run tests. # trun.py is a helper via which we run tests.
...@@ -115,6 +115,10 @@ def emit(*message): ...@@ -115,6 +115,10 @@ def emit(*message):
print(*message) print(*message)
sys.stdout.flush() sys.stdout.flush()
# interval we will poll master periodically for test_alive.isAlive.
# NOTE it is not e.g. one second not to overload master.
_tmasterpoll = 5*time.minute
@func @func
def main(): def main():
# testnode executes us giving URL to master results collecting instance and other details # testnode executes us giving URL to master results collecting instance and other details
...@@ -215,9 +219,35 @@ def main(): ...@@ -215,9 +219,35 @@ def main():
bstdout = sys.stdout.buffer bstdout = sys.stdout.buffer
bstderr = sys.stderr.buffer bstderr = sys.stderr.buffer
# setup context that is canceled when/if test_result is canceled on master
# we will use this context as the base for all spawned jobs
ctx, cancel = context.with_cancel(context.background())
cancelWG = sync.WorkGroup(ctx)
@func
def _(ctx):
defer(cancel)
while 1:
_, _rx = select(
ctx.done().recv, # 0
time.after(_tmasterpoll).recv, # 1
)
if _ == 0:
break
if not test_result.isAlive():
emit("# master asks to cancel test run")
break
cancelWG.go(_)
defer(cancelWG.wait)
defer(cancel)
# run the tests # run the tests
devnull = open(os.devnull) devnull = open(os.devnull)
while 1: while 1:
if ctx.err() is not None:
emit("# test run canceled")
break
# ask master for next test to run; stop if no more. # ask master for next test to run; stop if no more.
test_result_line = test_result.start() test_result_line = test_result.start()
if test_result_line is None: if test_result_line is None:
...@@ -225,7 +255,7 @@ def main(): ...@@ -225,7 +255,7 @@ def main():
# run tenv[name] # run tenv[name]
t = tenv.byname[test_result_line.name] t = tenv.byname[test_result_line.name]
tstart = time() tstart = time.now()
emit('\n>>> %s' % t.name) emit('\n>>> %s' % t.name)
emit('$ %s' % t.command_str()) emit('$ %s' % t.command_str())
...@@ -263,41 +293,56 @@ def main(): ...@@ -263,41 +293,56 @@ def main():
# (explicit teeing instead of p.communicate() to be able to see incremental progress) # (explicit teeing instead of p.communicate() to be able to see incremental progress)
buf_out = [] buf_out = []
buf_err = [] buf_err = []
wg = sync.WorkGroup(context.background()) wg = sync.WorkGroup(ctx)
wg.go(tee, p.stdout, bstdout, buf_out) wg.go(tee, p.stdout, bstdout, buf_out)
wg.go(tee, p.stderr, bstderr, buf_err) wg.go(tee, p.stderr, bstderr, buf_err)
# wait for p to exit # wait for p to exit
def _(ctx): def _(ctx):
err = None
while 1: while 1:
done = p.poll() done = p.poll()
if done is not None: if done is not None:
break break
# cancel -> kill p # cancel -> terminate p
_, _rx = select( _, _rx = select(
default, # 0 default, # 0
ctx.done().recv, # 1 ctx.done().recv, # 1
) )
if _ == 1: if _ == 1:
p.kill() emit("# stopping due to cancel")
p.terminate()
err = ctx.err()
break break
sleep(0.1) time.sleep(0.1)
# p is done - check if it leaked processes and kill them # p should be done - check if it leaked processes and terminate/kill them
# kill p in the end if it does not stop from just SIGTERM.
while 1: while 1:
procv = session_proclist(sid=p.pid) procv = session_proclist(sid=p.pid)
if len(procv) == 0: if len(procv) == 0:
break break
for proc in procv: for proc in procv:
emit('# leaked pid=%d %r %s' % (proc.pid, proc.name(), proc.cmdline())) if proc.pid != p.pid:
proc.terminate() emit('# leaked pid=%d %r %s' % (proc.pid, proc.name(), proc.cmdline()))
proc.terminate()
gone, alive = psutil.wait_procs(procv, timeout=5) gone, alive = psutil.wait_procs(procv, timeout=5)
for proc in alive: for proc in alive:
p.kill() p.kill()
if err is not None:
raise err
wg.go(_) wg.go(_)
wg.wait() try:
wg.wait()
except Exception as e:
if errors.Is(e, context.canceled):
pass # ok, finish current test_result_line
else:
raise
stdout = b''.join(buf_out) stdout = b''.join(buf_out)
stderr = b''.join(buf_err) stderr = b''.join(buf_err)
...@@ -318,7 +363,7 @@ def main(): ...@@ -318,7 +363,7 @@ def main():
status.update(summary) status.update(summary)
tend = time() tend = time.now()
# print summary and report result of test run back to master # print summary and report result of test run back to master
tres = { tres = {
...@@ -450,6 +495,9 @@ class LocalTestResult: ...@@ -450,6 +495,9 @@ class LocalTestResult:
test_result_line.name = t.name test_result_line.name = t.name
return test_result_line return test_result_line
def isAlive(self): # -> bool (whether still running)
return True # don't need to handle SIGINT - CTRL+C interrupts whole process
class LocalTestResultLine: class LocalTestResultLine:
def stop(self, **kw): def stop(self, **kw):
# XXX + dump .json ? # XXX + dump .json ?
......
...@@ -30,7 +30,8 @@ import shutil ...@@ -30,7 +30,8 @@ import shutil
import subprocess import subprocess
from os.path import dirname, exists, devnull from os.path import dirname, exists, devnull
from golang import chan, select, default, func, defer from golang import chan, select, default, func, defer
from golang import context, sync from golang import context, sync, time
import psutil
import pytest import pytest
...@@ -44,16 +45,38 @@ def run_nxdtest(tmpdir): ...@@ -44,16 +45,38 @@ def run_nxdtest(tmpdir):
passed as `argv`. passed as `argv`.
""" """
@func
def _run_nxdtest(nxdtest_file_content, argv=("nxdtest",)): def _run_nxdtest(nxdtest_file_content, argv=("nxdtest",)):
with tmpdir.as_cwd(): with tmpdir.as_cwd():
with open(".nxdtest", "w") as f: with open(".nxdtest", "w") as f:
f.write(nxdtest_file_content) f.write(nxdtest_file_content)
sys_argv = sys.argv sys_argv = sys.argv
sys.argv = argv sys.argv = argv
try: def _():
main()
finally:
sys.argv = sys_argv sys.argv = sys_argv
defer(_)
# run nxdtest in thread so that timeout handling works
# ( if nxdtest is run on main thread, then non-py wait in WorkGroup.wait, if
# stuck, prevents signals from being handled at python-level )
wg = sync.WorkGroup(context.background())
done = chan()
@func
def _(ctx):
defer(done.close)
main()
wg.go(_)
while 1:
_, _rx = select(
default, # 0
done.recv, # 1
)
if _ == 0:
time.sleep(0.1)
continue
wg.wait()
break
return _run_nxdtest return _run_nxdtest
...@@ -178,32 +201,11 @@ TestCase('TEST10', ['echo', 'TEST10']) ...@@ -178,32 +201,11 @@ TestCase('TEST10', ['echo', 'TEST10'])
def test_run_procleak(run_nxdtest, capsys): def test_run_procleak(run_nxdtest, capsys):
procleak = "%s/testprog/procleak" % (dirname(__file__),) procleak = "%s/testprog/procleak" % (dirname(__file__),)
# run nxdtest in thread so that timeout handling works run_nxdtest(
# ( if nxdtest is run on main thread, then non-py wait in WorkGroup.wait, if """\
# stuck, prevents signals from being handled at python-level )
wg = sync.WorkGroup(context.background())
done = chan()
@func
def _(ctx):
defer(done.close)
run_nxdtest(
"""\
TestCase('TEST_WITH_PROCLEAK', ['%s', 'AAA', 'BBB', 'CCC']) TestCase('TEST_WITH_PROCLEAK', ['%s', 'AAA', 'BBB', 'CCC'])
""" % procleak """ % procleak
) )
wg.go(_)
while 1:
_, _rx = select(
default, # 0
done.recv, # 1
)
if _ == 0:
time.sleep(0.1)
continue
wg.wait()
break
captured = capsys.readouterr() captured = capsys.readouterr()
assert "AAA: terminating" in captured.out assert "AAA: terminating" in captured.out
...@@ -292,3 +294,67 @@ TestCase('TESTNAME', ['%s']) ...@@ -292,3 +294,67 @@ TestCase('TESTNAME', ['%s'])
captured = capsys.readouterr() captured = capsys.readouterr()
output_lines = captured.out.splitlines() output_lines = captured.out.splitlines()
assert re.match(u"# ran 1 test case: 1·ok", output_lines[-1]) assert re.match(u"# ran 1 test case: 1·ok", output_lines[-1])
@pytest.fixture
def distributor_with_cancelled_test(mocker):
"""A distributor for a test result with one test result line named TEST1.
test_result.isAlive() will return False after 2 invocations, to simulate
a test_result that was cancelled by distributor.
"""
def _retryRPC(func_id, args=()):
if func_id == 'getProtocolRevision':
return 1
assert False, ('unexpected RPC call', (func_id, args))
mocker.patch(
'erp5.util.taskdistribution.RPCRetry._retryRPC',
side_effect=_retryRPC)
test_result_line_proxy = mocker.patch(
'erp5.util.taskdistribution.TestResultLineProxy',
autospec=True)
type(test_result_line_proxy).name = mocker.PropertyMock(return_value='TEST1')
test_result_proxy = mocker.patch(
'erp5.util.taskdistribution.TestResultProxy',
autospec=True)
test_result_proxy.start.side_effect = [test_result_line_proxy, None]
test_result_proxy.isAlive.side_effect = [True, True, False]
mocked_createTestResult = mocker.patch(
'erp5.util.taskdistribution.TaskDistributor.createTestResult',
return_value=test_result_proxy)
yield
mocked_createTestResult.assert_called_once()
test_result_proxy.start.assert_called()
test_result_proxy.isAlive.assert_called()
test_result_line_proxy.stop.assert_called()
# verify that nxdtest cancels test run when master reports that test_result is no longer alive.
@pytest.mark.timeout(timeout=10)
def test_cancel_from_master(run_nxdtest, capsys, tmp_path, distributor_with_cancelled_test, mocker):
# nxdtest polls every 5 minutes, but in test we don't want to wait so long.
# set master poll interval to small, but enough time for spawned hang to
# setup its signal handler.
mocker.patch('nxdtest._tmasterpoll', 0.1*time.second)
hang = "%s/testprog/hang" % (dirname(__file__),)
run_nxdtest(
"""\
TestCase('TEST1', ['%s'])
""" % (hang),
argv=[
"nxdtest",
"--master_url", "http://localhost",
],
)
captured = capsys.readouterr()
assert "TEST1" in captured.out
assert "# master asks to cancel test run" in captured.out
assert "# test run canceled" in captured.out
assert "hang: terminating" in captured.out
assert captured.err == ''
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2021 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Program hang helps to verify that nxdtest terminates processes when interrupted."""
from __future__ import print_function, absolute_import
import time
from signal import signal, SIGTERM
from setproctitle import setproctitle
def main():
job = 'hang'
setproctitle(job)
def _(sig, frame):
print('%s: terminating' % job)
raise SystemExit
signal(SIGTERM, _)
while 1:
print('%s: hanging ...' % job)
time.sleep(1)
if __name__ == '__main__':
main()
...@@ -15,7 +15,7 @@ setup( ...@@ -15,7 +15,7 @@ setup(
packages = find_packages(), packages = find_packages(),
install_requires = ['erp5.util', 'six', 'pygolang', 'psutil', 'python-prctl'], install_requires = ['erp5.util', 'six', 'pygolang', 'psutil', 'python-prctl'],
extras_require = { extras_require = {
'test': ['pytest', 'pytest-timeout', 'setproctitle'], 'test': ['pytest', 'pytest-mock', 'pytest-timeout', 'setproctitle'],
}, },
entry_points= {'console_scripts': ['nxdtest = nxdtest:main']}, entry_points= {'console_scripts': ['nxdtest = nxdtest:main']},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment