Commit 5acd1359 authored by Kirill Smelkov's avatar Kirill Smelkov

Run each testcase with its own /tmp and /dev/shm

to detect after each test run leaked temporary files, leaked mount
entries, to isolate different test runs from each other, and to provide
tmpfs on /tmp for every test.

The main change and description is in patch1 (a191468f); the other
patches fix that up step-by-step to work for real for all our tests.

/helped-by @tomo
/helped-and-reviewed-by @jerome
/reviewed-on !13

* y/unshare:
  trun: Require FUSE to be working inside user-namespaces to activate them
  Factor checking whether user-namespaces are available into trun.userns_available()
  trun: Add test for how /etc/{passwd,group} is setup for spawned job
  trun: Spawn user test with sole regular uid/gid in /etc/{passwd,group} database
  trun: Deactivate most capabilities before spawning user test
  Run each testcase with its own /tmp and /dev/shm
parents 4fe9ee16 f5f52434
Pipeline #18938 passed with stage
in 0 seconds
......@@ -60,11 +60,15 @@ from subprocess import Popen, PIPE
from time import time, sleep, strftime, gmtime, localtime
import os, sys, argparse, logging, traceback, re, pwd, socket
from errno import ESRCH, EPERM
from os.path import dirname
import six
from golang import b, defer, func, select, default
from golang import context, sync
import psutil
# trun.py is a helper via which we run tests.
trun_py = "%s/trun.py" % dirname(__file__)
# loadNXDTestFile loads .nxdtest file located @path.
def loadNXDTestFile(path): # -> TestEnv
t = TestEnv()
......@@ -249,7 +253,7 @@ def main():
# TODO session -> cgroup, because a child process could create another new session.
def newsession():
os.setsid()
p = Popen(t.argv, env=env, stdin=devnull, stdout=PIPE, stderr=PIPE, bufsize=0, preexec_fn=newsession, **kw)
p = Popen([sys.executable, trun_py] + t.argv, env=env, stdin=devnull, stdout=PIPE, stderr=PIPE, bufsize=0, preexec_fn=newsession, **kw)
except:
stdout, stderr = b'', b(traceback.format_exc())
bstderr.write(stderr)
......
......@@ -19,16 +19,22 @@
# verify general functionality
import grp
import os
import pwd
import sys
import re
import time
from os.path import dirname
import tempfile
import shutil
import subprocess
from os.path import dirname, exists, devnull
from golang import chan, select, default, func, defer
from golang import context, sync
import pytest
from nxdtest import main
from nxdtest import main, trun
@pytest.fixture
......@@ -52,6 +58,30 @@ def run_nxdtest(tmpdir):
return _run_nxdtest
# run all tests twice:
# 1) with user namespaces disabled,
# 2) with user namespaces potentially enabled.
@pytest.fixture(autouse=True, params=('userns_disabled', 'userns_default'))
def with_and_without_userns(tmp_path, monkeypatch, request):
if request.param == 'userns_disabled':
if request.node.get_closest_marker("userns_only"):
pytest.skip("test is @userns_only")
with open(str(tmp_path / 'unshare'), 'w') as f:
f.write('#!/bin/sh\nexit 1')
os.chmod(f.name, 0o755)
monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep)
else:
assert request.param == 'userns_default'
request.node.add_marker(
pytest.mark.xfail(not userns_works,
reason="this functionality needs user-namespaces to work"))
# @userns_only marks test as requiring user-namespaces to succeed.
userns_works, _ = trun.userns_available()
userns_only = pytest.mark.userns_only
def test_main(run_nxdtest, capsys):
run_nxdtest(
"""\
......@@ -68,7 +98,7 @@ TestCase('TESTNAME', ['echo', 'TEST OUPUT'])
assert re.match(u"# ran 1 test case: 1·ok", output_lines[-1])
def test_error_invoking_command(run_nxdtest, capsys):
def test_command_does_not_exist(run_nxdtest, capsys):
run_nxdtest(
"""\
TestCase('TESTNAME', ['not exist command'])
......@@ -76,7 +106,21 @@ TestCase('TESTNAME', ['not exist command'])
)
captured = capsys.readouterr()
assert "No such file or directory" in captured.err
assert 'Traceback' not in captured.out
assert 'Traceback' not in captured.err
assert captured.err == "not exist command: No such file or directory\n"
def test_command_exit_with_non_zero(run_nxdtest, capsys):
run_nxdtest(
"""\
TestCase('TESTNAME', ['false'])
"""
)
captured = capsys.readouterr()
assert 'Traceback' not in captured.out
assert 'Traceback' not in captured.err
def test_error_invoking_summary(run_nxdtest, capsys):
......@@ -165,3 +209,86 @@ TestCase('TEST_WITH_PROCLEAK', ['%s', 'AAA', 'BBB', 'CCC'])
assert "AAA: terminating" in captured.out
assert "BBB: terminating" in captured.out
assert "CCC: terminating" in captured.out
# verify that files leaked on /tmp are detected.
@userns_only
@func
def test_run_tmpleak(run_nxdtest, capsys):
xtouch = "%s/testprog/xtouch" % (dirname(__file__),)
tmpd = tempfile.mkdtemp("", "nxdtest-leak.", "/tmp")
def _():
shutil.rmtree(tmpd)
defer(_)
tmpleakv = list('%s/%d' % (tmpd, i) for i in range(10))
for f in tmpleakv:
assert not exists(f)
run_nxdtest(
"""
TestCase('TESTCASE', ['%s'] + %r)
""" % (xtouch, tmpleakv,)
)
captured = capsys.readouterr()
for f in tmpleakv:
assert ("# leaked %s" % f) in captured.out
assert not exists(f)
# verify that leaked mounts are detected.
@userns_only
def test_run_mountleak(run_nxdtest, capsys):
run_nxdtest(
"""
TestCase('TESTCASE', ['mount', '-t', 'tmpfs', 'none', '/etc'])
""")
captured = capsys.readouterr()
assert "# leaked mount: none /etc tmpfs" in captured.out
# verify that inside environment, that nxdtest creates, user/group database is
# minimal.
@userns_only
def test_run_usermap(run_nxdtest, capsys):
tdumpusergroups = "%s/testprog/tdumpusergroups" % (dirname(__file__),)
run_nxdtest(
"""
TestCase('TESTCASE', %r)
""" % [tdumpusergroups])
captured = capsys.readouterr()
assert captured.err == ''
# we expect only current user, root and nobody/nogroup to be present
uok = [repr(u) for u in [
pwd.getpwuid(os.getuid()),
pwd.getpwnam('root'),
pwd.getpwnam('nobody')]]
gok = [repr(g) for g in [
grp.getgrgid(os.getgid()),
grp.getgrnam('root'),
grp.getgrnam('nogroup')]]
want = '---- 8< ----\n' # XXX won't need this scissors, if we would test trun directly
for _ in sorted(uok) + sorted(gok):
want += _+'\n'
want += '---- 8< ----'
assert want in captured.out
# verify that inside environment, that nxdtest creates, file permissions are
# still respected.
def test_run_writero(run_nxdtest, capsys):
twritero = "%s/testprog/twritero" % (dirname(__file__),)
run_nxdtest(
"""\
TestCase('TESTNAME', ['%s'])
""" % twritero)
captured = capsys.readouterr()
output_lines = captured.out.splitlines()
assert re.match(u"# ran 1 test case: 1·ok", output_lines[-1])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2021 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Program tdumpusergroups helps to verify that nxdtest maps users and groups
in namespace.
It dumps content of user and group databases in raw form.
Database entries are printed in sorted order.
"""
from __future__ import absolute_import, print_function
import grp
import pwd
import sys
def main():
print('---- 8< ----')
uall = [repr(u) for u in pwd.getpwall()]
gall = [repr(g) for g in grp.getgrall()]
for u in sorted(uall):
print(u)
for g in sorted(gall):
print(g)
print('---- 8< ----')
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2021 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Program twritero helps to verify that nxdtest runs tests in sane environment.
It verifies that write to a file with readonly permission is rejected.
"""
from __future__ import print_function, absolute_import
import os, tempfile
from errno import EACCES
from golang import func, defer
@func
def main():
fd, path = tempfile.mkstemp(prefix='twritero.')
def _():
os.remove(path)
defer(_)
os.close(fd)
os.chmod(path, 0o444)
try:
with open(path, "w+") as f:
f.write("zzz")
except IOError as e:
if e.errno != EACCES:
raise
print("write to r/o file rejected ok")
else:
raise AssertionError("write to r/o file not rejected")
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2021 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""Program xtouch helps to verify that nxdtest detects files leaked on /tmp.
It is similar to touch(1), but creates leading directories automatically.
It also always exits with non-zero status to simulate failure.
"""
from __future__ import print_function, absolute_import
import os, sys
from os.path import dirname
from errno import EEXIST
def main():
for f in sys.argv[1:]:
mkdir_p(dirname(f))
with open(f, "a"):
pass
sys.exit(1)
# mkdir_p mimics `mkdir -p`
def mkdir_p(path):
try:
os.makedirs(path)
except OSError as e:
if e.errno != EEXIST:
raise
if __name__ == '__main__':
main()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2021 Nexedi SA and Contributors.
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
""" `trun ...` - run test specified by `...`
The test is run in dedicated environment, which, after test completes, is
checked for leaked files, leaked mount entries, etc.
The environment is activated only if user namespaces are available(*).
If user namespaces are not available, the test is still run but without the checks.
(*) see https://man7.org/linux/man-pages/man7/user_namespaces.7.html
"""
from __future__ import print_function, absolute_import
import errno, os, sys, stat, difflib, pwd, grp, shutil
from subprocess import check_call as xrun, CalledProcessError
from os.path import join, devnull
from golang import func, defer
import prctl
# userns_available detects if user-namespaces and necessary features are provided by OS kernel.
def userns_available(): # -> (yes|no, {details})
have = {"userns": False, "userns/fuse": False}
try:
# check if user namespaces are available
with open(devnull, "w") as null:
xrun(["unshare"] + _unshare_argv + ["true"], stdout=null, stderr=null)
except (OSError, CalledProcessError):
pass
else:
have["userns"] = True
if have["userns"]:
# check if FUSE works inside user namespaces.
# Using FUSE inside user namespaces requires Linux >= 4.18 (see
# https://git.kernel.org/linus/da315f6e0398 and
# https://git.kernel.org/linus/8cb08329b080). For simplicity we check
# for that kernel version instead of actually trying to mount a test
# FUSE filesystem.
sysname, _, release, _, _ = os.uname()
if sysname == "Linux":
major, minor, _ = release.split('.', 2) # 5.10.0-9-amd64 -> 5 10 0-9-amd64
version = (int(major), int(minor))
if version >= (4, 18):
have["userns/fuse"] = True
ok = True
for _, haveit in have.items():
if not haveit:
ok = False
return (ok, have)
_unshare_argv = ["-Umc", "--keep-caps"]
def main():
# Try to respawn ourselves in user-namespace where we can mount things, e.g. new /tmp.
# Keep current uid/gid the same for better traceability. In other words current user
# stays the same. Activate ambient capabilities(*) so that mounting filesystems,
# including FUSE-based ones for wendelin.core, still works under regular non-zero uid.
#
# (*) see https://man7.org/linux/man-pages/man7/capabilities.7.html
# and git.kernel.org/linus/58319057b784.
in_userns = True
mypid = str(os.getpid())
_ = os.environ.get("_NXDTEST_TRUN_RESPAWNED", "")
if mypid != _:
userns_works, details = userns_available()
if not userns_works:
in_userns = False
details_str = [] # of +feat, -feat
for feat, haveit in details.items():
details_str.append('%s%s' % ('+' if haveit else '-', feat))
print("# user namespaces not available (%s)." % " ".join(details_str))
print("# isolation and many checks will be deactivated.")
else:
os.environ["_NXDTEST_TRUN_RESPAWNED"] = mypid
os.execvp("unshare", ["unshare"] + _unshare_argv + [sys.executable] + sys.argv)
raise AssertionError("unreachable")
# either respawned in new namespace, or entered here without respawn with in_userns=n.
# run the test via corresponding driver.
run = run_in_userns if in_userns else run_no_userns
def _():
try:
xrun(sys.argv[1:])
except OSError as e:
if e.errno != errno.ENOENT:
raise
#print(e.strerror, file=sys.stderr) # e.strerror does not include filename on py2
print("%s: %s" % (sys.argv[1], os.strerror(e.errno)), # e.filename is also ø on py2
file=sys.stderr)
sys.exit(127)
except CalledProcessError as e:
sys.exit(e.returncode)
run(_)
# run_in_userns runs f with checks assuming that we are in a user namespace.
@func
def run_in_userns(f):
# leave only capabilities that are needed for mount/fusermount.
# in particular drop cap_dac_override so that file permissions are still
# respected (e.g. write to read/only file is rejected).
prctl.cap_inheritable.limit('sys_admin')
# mount new /tmp and /dev/shm to isolate this run from other programs and to detect
# leaked temporary files at the end.
tmpreg = {
"/tmp": [], # mountpoint -> extra options
"/dev/shm": []
}
for tmp, optv in tmpreg.items():
xrun(["mount", "-t", "tmpfs", "none", tmp] + optv)
# in the end: check file leakage on /tmp and friends.
def _():
for root in tmpreg:
for d, dirs, files in os.walk(root):
if d != root:
st = os.stat(d)
if st.st_mode & stat.S_ISVTX:
# sticky wcfs/ alike directories are used as top of registry for
# multiple users. It is kind of normal not to delete such
# directories by default.
print("# found sticky %s/" % d)
else:
print("# leaked %s/" % d)
for f in files:
print("# leaked %s" % join(d, f))
defer(_)
# in the end: check fstab changes.
fstab_before = mounts()
def _():
fstab_after = mounts()
for d in difflib.ndiff(fstab_before, fstab_after):
if d.startswith("- "):
print("# gone mount: %s" % d[2:])
if d.startswith("+ "):
print("# leaked mount: %s" % d[2:])
defer(_)
# pretend we don't have tty or any other special group to avoid issues with e.g. pseudo-terminals.
#
# POSIX requires /dev/pts/* slaves to be chown'ed to tty group (gid=5) on
# grantpt. However we do not have that gid mapped and chown fails. Glibc
# stopped insisting on such chown and delegates proper setup to the kernel
# expecting /dev/pts to be mounted with gid=5,mode=0620 options:
# https://sourceware.org/git/?p=glibc.git;a=commitdiff;h=77356912e836
#
# However e.g. openssh still wants to do the chown(group=tty):
# https://github.com/openssh/openssh-portable/blob/V_8_8_P1-120-ga2188579/sshpty.c#L165-L205
#
# Avoid that by adjusting the system view so that there is only one sole
# single regular user and group with uid/gid of current user. We anyway
# mapped only single uid/gid to parent namespace.
#
# Still include root and nobody/nogroup as an exception since several software
# expect to have zero ID and those names in the user/group database - see e.g.
# https://lab.nexedi.com/nexedi/slapos/merge_requests/1095#note_147177
# https://lab.nexedi.com/nexedi/slapos/merge_requests/1095#note_147201
xetc = "/tmp/xetc"
os.mkdir(xetc)
ustr = lambda u: "%s:%s:%d:%d:%s:%s:%s\n" % (u.pw_name, u.pw_passwd, u.pw_uid, u.pw_gid, u.pw_gecos, u.pw_dir, u.pw_shell)
gstr = lambda g: "%s:%s:%d:%s\n" % (g.gr_name, g.gr_passwd, g.gr_gid, ','.join(g.gr_mem))
writefile("%s/passwd" % xetc,
ustr(pwd.getpwuid(os.getuid())) +
ustr(pwd.getpwnam("root")) +
ustr(pwd.getpwnam("nobody")))
writefile("%s/group" % xetc,
gstr(grp.getgrgid(os.getgid())) +
gstr(grp.getgrnam("root")) +
gstr(grp.getgrnam("nogroup")))
xrun(["mount", "--bind", xetc+"/passwd", "/etc/passwd"])
xrun(["mount", "--bind", xetc+"/group", "/etc/group"])
def _():
xrun(["umount", "-n", "/etc/passwd"])
xrun(["umount", "-n", "/etc/group"])
shutil.rmtree(xetc)
defer(_)
# run the test
f()
# run_no_userns runs f assuming that we are not in a user namespace.
def run_no_userns(f):
f()
# mounts returns current mount entries.
def mounts(): # -> []str
return readfile("/proc/mounts").split('\n')
# readfile returns content of file @path.
def readfile(path): # -> str
with open(path, "r") as f:
return f.read()
# writefile creates file @path and fills it with data.
def writefile(path, data):
with open(path, "w") as f:
f.write(data)
if __name__ == '__main__':
main()
[pytest]
markers =
userns_only: test os run only when user namespaces are available
......@@ -13,7 +13,7 @@ setup(
keywords = 'Nexedi testing infrastructure tool tox',
packages = find_packages(),
install_requires = ['erp5.util', 'six', 'pygolang', 'psutil'],
install_requires = ['erp5.util', 'six', 'pygolang', 'psutil', 'python-prctl'],
extras_require = {
'test': ['pytest', 'pytest-timeout', 'setproctitle'],
},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment