Commit 546ba331 authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: tests: Move client to be pinkill'ed into separate process

If we don't the whole testing process will become killed when wcfs
becomes taught to kill clients that do not handle pin notifications
well.

Use multiprocessing to do so and to be able to interoperate with spawned
test process by sending/receiving objects to/from it.

Preliminary history:

    levin.zimmermann/wendelin.core@aef0f0e1Co-authored-by: Levin Zimmermann's avatarLevin Zimmermann <levin.zimmermann@nexedi.com>
parent 0b20be5c
...@@ -22,14 +22,24 @@ protection against slow/faulty clients in isolation protocol.""" ...@@ -22,14 +22,24 @@ protection against slow/faulty clients in isolation protocol."""
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from wendelin.lib.zodb import zstor_2zurl
from wendelin import wcfs
import sys, os, subprocess, traceback
import six
from golang import select, func, defer from golang import select, func, defer
from golang import context, sync, time from golang import context, sync, time
import pytest; xfail = pytest.mark.xfail import pytest; xfail = pytest.mark.xfail
from pytest import fail, fixture from pytest import fixture
from wendelin.wcfs.wcfs_test import tDB, h, \ from wendelin.wcfs.wcfs_test import tDB, h, tAt, eprint, \
setup_module, teardown_module, setup_function, teardown_function setup_module, teardown_module, setup_function, teardown_function
if six.PY2:
from _multiprocessing import Connection as MPConnection
else:
from multiprocessing.connection import Connection as MPConnection
# tests in this module require WCFS to promptly react to pin handler # tests in this module require WCFS to promptly react to pin handler
# timeouts so that verifying WCFS killing logic does not take a lot of time. # timeouts so that verifying WCFS killing logic does not take a lot of time.
...@@ -39,40 +49,222 @@ def with_prompt_pintimeout(monkeypatch): ...@@ -39,40 +49,222 @@ def with_prompt_pintimeout(monkeypatch):
return monkeypatch.setenv("WENDELIN_CORE_WCFS_OPTIONS", "-pintimeout %.1fs" % tkill, prepend=" ") return monkeypatch.setenv("WENDELIN_CORE_WCFS_OPTIONS", "-pintimeout %.1fs" % tkill, prepend=" ")
# verify that wcfs kills slow/faulty client who does not reply to pin in time. # tSubProcess provides infrastructure to run a function in separate process.
@xfail # protection against faulty/slow clients #
# It runs f(cin, cout, *argv, **kw) in subprocess with cin and cout
# connected to parent via multiprocessing.Connection .
#
# It is similar to multiprocessing.Process in spawn mode that is available on py3.
# We need to use spawn mode - not fork - because fork does not work well when
# parent process is multithreaded, as many things, that are relying on the
# additional threads in the original process, stop to function in the forked
# child without additional care. For example pygolang timers and signals
# currently stop to work after the fork, and in general it is believed that in
# multithreaded programs the only safe thing to do after the fork is exec.
# Please see section "NOTES" in
#
# https://man7.org/linux/man-pages/man3/pthread_atfork.3.html
#
# for details about this issue.
class tSubProcess(object):
def __init__(proc, f, *argv, **kw):
exev = [sys.executable, '-c', 'from wendelin.wcfs import wcfs_faultyprot_test as t; '
't.tSubProcess._start(%r)' % f.__name__]
proc.popen = subprocess.Popen(exev, stdin=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True)
try:
proc.cin = MPConnection(proc.popen.stdin.fileno(), readable=False)
proc.cout = MPConnection(proc.popen.stdout.fileno(), writable=False)
proc.send(argv)
proc.send(kw)
except:
proc.popen.kill()
raise
# _start is trampoline ran in the subprocess to launch to user function.
@staticmethod
def _start(funcname):
cin = MPConnection(sys.stdin.fileno(), writable=False)
cout = MPConnection(sys.stdout.fileno(), readable=False)
argv = cin.recv()
kw = cin.recv()
f = globals()[funcname]
procname = kw.pop('_procname', f.__name__)
try:
f(cin, cout, *argv, **kw)
_ = 'END'
except BaseException as exc:
# dump traceback so it appears in the log because Traceback objects are not picklable
eprint("\nException in subprocess %s (pid%d):" % (procname, os.getpid()))
traceback.print_exc()
_ = exc
cout.send(_)
cout.close()
# close releases resources associated with subprocess.
def close(proc):
if proc.popen.returncode is None:
proc.popen.kill()
# exitcode returns subprocess exit code or None if subprocess has not yet terminated.
@property
def exitcode(proc):
return proc.popen.returncode
# join waits for the subprocess to end.
def join(proc, ctx):
gotend = False
goteof = False
joined = False
while not (goteof and joined):
if ctx.err() is not None:
raise ctx.err()
if not joined:
joined = (proc.popen.poll() is not None)
# recv from proc to see if it was END or exception
# make sure to recv at least once after joined to read buffered messages / exception
if goteof:
time.sleep(0.1*time.second)
else:
try:
_, ok = proc.tryrecv()
except EOFError:
goteof = True
else:
if ok:
if not gotend:
assert _ == 'END'
gotend = True
else:
raise AssertionError("got %r after END" % (_,))
# send sends object to subprocess input.
def send(proc, obj):
proc.cin.send(obj)
# recv receives object/exception from subprocess output.
def recv(proc, ctx): # -> obj | raise exception | EOFError
while 1:
if ctx.err() is not None:
raise ctx.err()
_, ok = proc.tryrecv()
if ok:
return _
# tryrecv tries to receive an object/exception from subprocess output.
# It does so without blocking.
def tryrecv(proc): # -> (obj, ok) | raise exception | EOFError
_ = proc.cout.poll(0.1*time.second)
if not _:
return None, False
_ = proc.cout.recv()
if isinstance(_, BaseException):
raise _
return _, True
# tFaultySubProcess runs f(tFaultyClient, *argv, *kw) in subprocess.
# It's a small convenience wrapper over tSubProcess - please see its documentation for details.
class tFaultySubProcess(tSubProcess):
def __init__(fproc, t, f, *argv, **kw):
kw.setdefault('zurl', zstor_2zurl(t.root._p_jar.db().storage))
kw.setdefault('zfile_oid', t.zfile._p_oid)
kw.setdefault('_procname', f.__name__)
kw.setdefault('pintimeout', t.pintimeout)
tremain = t.ctx.deadline() - time.now()
assert t.pintimeout < tremain/3 # 2·pintimeout is needed to reliably detect wcfs kill reaction
for k,v in list(kw.items()):
if isinstance(v, tAt): # tAt is not picklable
kw[k] = v.raw
super(tFaultySubProcess, fproc).__init__(_tFaultySubProcess_start, f.__name__, *argv, **kw)
assert fproc.cout.recv() == "f: start"
@func @func
def test_wcfs_pintimeout_kill(with_prompt_pintimeout): def _tFaultySubProcess_start(cin, cout, funcname, **kw):
t = tDB(); zf = t.zfile f = tFaultyClient()
defer(t.close) f.cin = cin
f.cout = cout
f.zurl = kw.pop('zurl')
f.zfile_oid = kw.pop('zfile_oid')
f.pintimeout = kw.pop('pintimeout')
f.wc = wcfs.join(f.zurl, autostart=False); defer(f.wc.close)
# we do not need to implement timeouts precisely in the child process
# because parent will kill us on its timeout anyway.
ctx = context.background()
f.cout.send("f: start")
testf = globals()[funcname]
testf(ctx, f, **kw)
at1 = t.commit(zf, {2:'c1'}) # tFaultyClient is placeholder for arguments + WCFS connection for running test
at2 = t.commit(zf, {2:'c2'}) # function inside tFaultySubProcess.
f = t.open(zf) class tFaultyClient:
f.assertData(['','','c2']) # .cin
# .cout
# .zurl
# .zfile_oid
# .wc
# .pintimeout
pass
# ---- tests ----
# XXX move into subprocess not to kill whole testing
ctx, _ = context.with_timeout(context.background(), 2*t.pintimeout)
wl = t.openwatch() # verify that wcfs kills slow/faulty client who does not reply to pin in time.
@func
def _bad_watch_no_pin_reply(ctx, f, at):
wl = wcfs.WatchLink(f.wc) ; defer(wl.close)
# wait for command to start watching
_ = f.cin.recv()
assert _ == "start watch", _
wg = sync.WorkGroup(ctx) wg = sync.WorkGroup(ctx)
def _(ctx): def _(ctx):
# send watch. The pin handler won't be replying -> we should never get reply here. # send watch. The pin handler won't be replying -> we should never get reply here.
wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1))) wl.sendReq(ctx, b"watch %s @%s" % (h(f.zfile_oid), h(at)))
fail("watch request completed (should not as pin handler is stuck)") raise AssertionError("watch request completed (should not as pin handler is stuck)")
wg.go(_) wg.go(_)
def _(ctx): def _(ctx):
req = wl.recvReq(ctx) req = wl.recvReq(ctx)
assert req is not None assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1)) f.cout.send(req.msg)
# sleep > wcfs pin timeout - wcfs must kill us # sleep > wcfs pin timeout - wcfs must kill us
_, _rx = select( _, _rx = select(
ctx.done().recv, # 0 ctx.done().recv, # 0
time.after(2*t.pintimeout).recv, # 1 time.after(2*f.pintimeout).recv, # 1
) )
if _ == 0: if _ == 0:
raise ctx.err() raise ctx.err()
fail("wcfs did not killed stuck client") raise AssertionError("wcfs did not kill stuck client")
wg.go(_) wg.go(_)
wg.wait() wg.wait()
@xfail # protection against faulty/slow clients
@func
def test_wcfs_pintimeout_kill(with_prompt_pintimeout):
t = tDB(multiproc=True); zf = t.zfile
defer(t.close)
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2'})
f = t.open(zf)
f.assertData(['','','c2'])
# launch faulty process that should be killed by wcfs on problematic pin during watch setup
p = tFaultySubProcess(t, _bad_watch_no_pin_reply, at=at1)
defer(p.close)
# wait till faulty client issues its watch, receives pin and pauses/misbehaves
p.send("start watch")
assert p.recv(t.ctx) == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
# the faulty client must become killed by wcfs
p.join(t.ctx)
assert p.exitcode is not None
...@@ -350,7 +350,7 @@ class DFile: ...@@ -350,7 +350,7 @@ class DFile:
# TODO(?) print -> t.trace/debug() + t.verbose depending on py.test -v -v ? # TODO(?) print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tWCFS(_tWCFS): class tWCFS(_tWCFS):
@func @func
def __init__(t): def __init__(t, multiproc=False):
assert not os.path.exists(testmntpt) assert not os.path.exists(testmntpt)
wc = wcfs.join(testzurl, autostart=True) wc = wcfs.join(testzurl, autostart=True)
assert wc.mountpoint == testmntpt assert wc.mountpoint == testmntpt
...@@ -359,6 +359,9 @@ class tWCFS(_tWCFS): ...@@ -359,6 +359,9 @@ class tWCFS(_tWCFS):
t.wc = wc t.wc = wc
t.pintimeout = float(t.wc._read(".wcfs/pintimeout")) t.pintimeout = float(t.wc._read(".wcfs/pintimeout"))
# multiproc=True indicates that wcfs server will be used by multiple client processes
t.multiproc=multiproc
# the whole test is limited in time to detect deadlocks # the whole test is limited in time to detect deadlocks
# NOTE with_timeout must be << timeout # NOTE with_timeout must be << timeout
# NOTE pintimeout can be either # NOTE pintimeout can be either
...@@ -488,7 +491,7 @@ class tDB(tWCFS): ...@@ -488,7 +491,7 @@ class tDB(tWCFS):
# create before wcfs startup. old_data is []changeDelta - see .commit # create before wcfs startup. old_data is []changeDelta - see .commit
# and .change for details. # and .change for details.
@func @func
def __init__(t, old_data=[]): def __init__(t, old_data=[], **kw):
t.root = testdb.dbopen() t.root = testdb.dbopen()
def _(): # close/unlock db if __init__ fails def _(): # close/unlock db if __init__ fails
exc = sys.exc_info()[1] exc = sys.exc_info()[1]
...@@ -518,7 +521,7 @@ class tDB(tWCFS): ...@@ -518,7 +521,7 @@ class tDB(tWCFS):
t._commit(t.zfile, changeDelta) t._commit(t.zfile, changeDelta)
# start wcfs after testdb is created and initial data is committed # start wcfs after testdb is created and initial data is committed
super(tDB, t).__init__() super(tDB, t).__init__(**kw)
# fh(.wcfs/zhead) + history of zhead read from there # fh(.wcfs/zhead) + history of zhead read from there
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead") t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
...@@ -968,7 +971,8 @@ class tWatchLink(wcfs.WatchLink): ...@@ -968,7 +971,8 @@ class tWatchLink(wcfs.WatchLink):
# this tWatchLink currently watches the following files at particular state. # this tWatchLink currently watches the following files at particular state.
t._watching = {} # {} foid -> tWatch t._watching = {} # {} foid -> tWatch
tdb.assertStats({'WatchLink': len(tdb._wlinks)}) if not tdb.multiproc:
tdb.assertStats({'WatchLink': len(tdb._wlinks)})
def close(t): def close(t):
tdb = t.tdb tdb = t.tdb
...@@ -981,7 +985,8 @@ class tWatchLink(wcfs.WatchLink): ...@@ -981,7 +985,8 @@ class tWatchLink(wcfs.WatchLink):
w.pinned = {} w.pinned = {}
t._watching = {} t._watching = {}
tdb.assertStats({'WatchLink': len(tdb._wlinks)}) if not tdb.multiproc:
tdb.assertStats({'WatchLink': len(tdb._wlinks)})
# ---- infrastructure: watch setup/adjust ---- # ---- infrastructure: watch setup/adjust ----
...@@ -2057,6 +2062,13 @@ class tAt(bytes): ...@@ -2057,6 +2062,13 @@ class tAt(bytes):
return "@" + h(at) return "@" + h(at)
__str__ = __repr__ __str__ = __repr__
# raw returns raw bytes form of at.
# It should be used in contexts where at needs to be pickled, because tAt
# is unpicklable due to .tdb being unpicklable.
@property
def raw(at):
return fromhex(h(at))
# hpin returns human-readable representation for {}blk->rev. # hpin returns human-readable representation for {}blk->rev.
@func(tDB) @func(tDB)
def hpin(t, pin): def hpin(t, pin):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment