Commit ba59ea62 authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: tests: Extend faulty protection tests with more kinds of faulty clients

So far we were testing only against faulty client that reads pin
notification ok, but does not reply to the notification. But there could
be more problems:

1) a client does not read pin notification at all
2) a client closes watchlink abruptly after reading pin notification
3) a client replies to pin notification but the reply is not "ack"

The first problem, if not handled leads to whole set of clients to
become stuck on reading the same block as the faulty client. The other
problems also indicate breakage of the isolation protocol from the client
side and that wcfs can no longer be sure that it provides good
uncorrupted data to the client.

In the first case, similarly to "no reply" situation we need to kill the
client to make progress while maintaining safety as well. In the cases 2
and 3 we cannot maintain safety if the faulty client remains in the set
of live and served clients, so it is also logical to send SIGBUS/SIGKILL
to it.

Killing a client with SIGBUS is similar to how OS kernel sends SIGBUS when
a memory-mapped file is accessed and loading file data results in EIO. It is
also similar to wendelin.core 1 where SIGBUS is raised if loading file block
results in an error.

Extend tests to cover all explained scenarios.
parent 0a2cd4a5
...@@ -31,7 +31,7 @@ from golang import select, func, defer ...@@ -31,7 +31,7 @@ from golang import select, func, defer
from golang import context, sync, time from golang import context, sync, time
import pytest; xfail = pytest.mark.xfail import pytest; xfail = pytest.mark.xfail
from pytest import fixture from pytest import mark, fixture
from wendelin.wcfs.wcfs_test import tDB, h, tAt, eprint, \ from wendelin.wcfs.wcfs_test import tDB, h, tAt, eprint, \
setup_module, teardown_module, setup_function, teardown_function setup_module, teardown_module, setup_function, teardown_function
...@@ -214,13 +214,52 @@ class tFaultyClient: ...@@ -214,13 +214,52 @@ class tFaultyClient:
# ---- tests ---- # ---- tests ----
# verify that wcfs kills slow/faulty client who does not reply to pin # verify that wcfs kills slow/faulty client who does not handle pin
# notifications in time during watch setup. # notifications correctly and in time during watch setup.
# #
# This verifies setupWatch codepath. # This verifies setupWatch codepath.
@func @func # faulty client that does not read pin notifications during watch setup.
def _bad_watch_no_pin_reply(ctx, f, at): def _bad_watch_no_pin_read(ctx, f, at):
wlf = f.wc._open("head/watch", mode='r+b') ; defer(wlf.close)
# wait for command to start watching
_ = f.cin.recv()
assert _ == "start watch", _
# send watch; the write should go ok.
wlf.write(b"1 watch %s @%s\n" % (h(f.zfile_oid), h(at)))
# there is no pin handler, because noone reads pin notifications
# -> wcfs must kill us after timing out with sending pin request
f.assertKilled(ctx, "wcfs did not kill client that does not read pin requests")
@func # faulty client that terminates connnection abruptly after receiving pin during watch setup.
def _bad_watch_eof_pin_reply(ctx, f, at):
wlf = f.wc._open("head/watch", mode='r+b') ; defer(wlf.close)
# wait for command to start watching
_ = f.cin.recv()
assert _ == "start watch", _
# send watch; the write should go ok.
wlf.write(b"1 watch %s @%s\n" % (h(f.zfile_oid), h(at)))
# pin notification must be coming
_ = wlf.readline()
assert _.startswith(b"2 pin "), _
f.cout.send(_[2:].rstrip()) # received message without sequence number and trailing \n
# we don't reply to pin notification and just close the connection instead
# NOTE it is different from WatchLink.closeWrite which sends "bye" before doing OS-level close
wlf.close()
# wcfs must kill us right after receiving EOF
f.assertKilled(ctx, "wcfs did not kill client that replied EOF to pin")
@func # faulty client that behaves in problematic way in its pin handler during watch setup.
def __bad_watch_pinh(ctx, f, at, pinh, pinhFailReason):
wl = wcfs.WatchLink(f.wc) ; defer(wl.close) wl = wcfs.WatchLink(f.wc) ; defer(wl.close)
# wait for command to start watching # wait for command to start watching
...@@ -229,23 +268,28 @@ def _bad_watch_no_pin_reply(ctx, f, at): ...@@ -229,23 +268,28 @@ def _bad_watch_no_pin_reply(ctx, f, at):
wg = sync.WorkGroup(ctx) wg = sync.WorkGroup(ctx)
def _(ctx): def _(ctx):
# send watch. The pin handler won't be replying -> we should never get reply here. # send watch. The pin handler either won't be replying or will reply with an error
wl.sendReq(ctx, b"watch %s @%s" % (h(f.zfile_oid), h(at))) # -> we should never get reply here.
raise AssertionError("watch request completed (should not as pin handler is stuck)") _ = wl.sendReq(ctx, b"watch %s @%s" % (h(f.zfile_oid), h(at)))
raise AssertionError("watch request completed (should not as pin handler %s); reply: %r" % (pinhFailReason, _))
wg.go(_) wg.go(_)
def _(ctx): def _(ctx):
req = wl.recvReq(ctx) pinh(ctx, wl)
assert req is not None
f.cout.send(req.msg)
# sleep > wcfs pin timeout - wcfs must kill us
f.assertKilled(ctx, "wcfs did not kill stuck client")
wg.go(_) wg.go(_)
wg.wait() wg.wait()
def _bad_watch_no_pin_reply (ctx, f, at): __bad_watch_pinh(ctx, f, at, f._pinner_no_pin_reply, "is stuck")
def _bad_watch_nak_pin_reply(ctx, f, at): __bad_watch_pinh(ctx, f, at, f._pinner_nak_pin_reply, "replies nak")
@xfail # protection against faulty/slow clients @xfail # protection against faulty/slow clients
@mark.parametrize('faulty', [
_bad_watch_no_pin_read,
_bad_watch_no_pin_reply,
_bad_watch_eof_pin_reply,
_bad_watch_nak_pin_reply,
])
@func @func
def test_wcfs_pintimeout_kill_on_watch(with_prompt_pintimeout): def test_wcfs_pinhfaulty_kill_on_watch(faulty, with_prompt_pintimeout):
t = tDB(multiproc=True); zf = t.zfile t = tDB(multiproc=True); zf = t.zfile
defer(t.close) defer(t.close)
...@@ -255,11 +299,12 @@ def test_wcfs_pintimeout_kill_on_watch(with_prompt_pintimeout): ...@@ -255,11 +299,12 @@ def test_wcfs_pintimeout_kill_on_watch(with_prompt_pintimeout):
f.assertData(['','','c2']) f.assertData(['','','c2'])
# launch faulty process that should be killed by wcfs on problematic pin during watch setup # launch faulty process that should be killed by wcfs on problematic pin during watch setup
p = tFaultySubProcess(t, _bad_watch_no_pin_reply, at=at1) p = tFaultySubProcess(t, faulty, at=at1)
defer(p.close) defer(p.close)
# wait till faulty client issues its watch, receives pin and pauses/misbehaves # wait till faulty client issues its watch, receives pin and pauses/misbehaves
p.send("start watch") p.send("start watch")
if faulty != _bad_watch_no_pin_read:
assert p.recv(t.ctx) == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1)) assert p.recv(t.ctx) == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
# issue our watch request - it should be served well and without any delay # issue our watch request - it should be served well and without any delay
...@@ -271,13 +316,48 @@ def test_wcfs_pintimeout_kill_on_watch(with_prompt_pintimeout): ...@@ -271,13 +316,48 @@ def test_wcfs_pintimeout_kill_on_watch(with_prompt_pintimeout):
assert p.exitcode is not None assert p.exitcode is not None
# verify that wcfs kills slow/faulty client who does not reply to pin # verify that wcfs kills slow/faulty client who does not handle pin
# notifications in time caused by asynchronous read access. # notifications correctly and in time caused by asynchronous read access.
# #
# This verifies readPinWatchers codepath. # This verifies readPinWatchers codepath.
@func @func # faulty client that does not read pin notifications triggered by read.
def _bad_pinh_no_pin_reply(ctx, f, at): def _bad_pinh_no_pin_read(ctx, f, at):
wlf = f.wc._open("head/watch", mode='r+b') ; defer(wlf.close)
# initial watch setup goes ok
wlf.write(b"1 watch %s @%s\n" % (h(f.zfile_oid), h(at)))
_ = wlf.readline()
assert _ == b"1 ok\n", _
f.cout.send("f: watch setup ok")
# sleep > wcfs pin timeout - wcfs must kill us
f.assertKilled(ctx, "wcfs did not kill client that does not read pin requests")
@func # faulty client that terminates connnection abruptly after receiving pin triggered by read.
def _bad_pinh_eof_pin_reply(ctx, f, at):
wlf = f.wc._open("head/watch", mode='r+b') ; defer(wlf.close)
# initial watch setup goes ok
wlf.write(b"1 watch %s @%s\n" % (h(f.zfile_oid), h(at)))
_ = wlf.readline()
assert _ == b"1 ok\n", _
f.cout.send("f: watch setup ok")
# wait for "pin ..." due to read access in the parent
_ = wlf.readline()
assert _.startswith(b"2 pin "), _
f.cout.send(_[2:].rstrip())
# close connection abruptly.
# NOTE it is different from WatchLink.closeWrite which sends "bye" before doing OS-level close
wlf.close()
# wcfs must kill us right after receiving EOF
f.assertKilled(ctx, "wcfs did not kill client that replied EOF to pin")
@func # faulty client that behaves in problematic way in its pin notifications triggered by read.
def __bad_pinh(ctx, f, at, pinh):
wl = wcfs.WatchLink(f.wc) ; defer(wl.close) wl = wcfs.WatchLink(f.wc) ; defer(wl.close)
# initial watch setup goes ok # initial watch setup goes ok
...@@ -286,16 +366,20 @@ def _bad_pinh_no_pin_reply(ctx, f, at): ...@@ -286,16 +366,20 @@ def _bad_pinh_no_pin_reply(ctx, f, at):
f.cout.send("f: watch setup ok") f.cout.send("f: watch setup ok")
# wait for "pin ..." due to read access in the parent # wait for "pin ..." due to read access in the parent
req = wl.recvReq(ctx) pinh(ctx, wl)
assert req is not None
f.cout.send(req.msg)
# sleep > wcfs pin timeout - wcfs must kill us def _bad_pinh_no_pin_reply (ctx, f, at): __bad_pinh(ctx, f, at, f._pinner_no_pin_reply)
f.assertKilled(ctx, "wcfs did not kill stuck client") def _bad_pinh_nak_pin_reply(ctx, f, at): __bad_pinh(ctx, f, at, f._pinner_nak_pin_reply)
@xfail # protection against faulty/slow clients @xfail # protection against faulty/slow clients
@mark.parametrize('faulty', [
_bad_pinh_no_pin_read,
_bad_pinh_no_pin_reply,
_bad_pinh_eof_pin_reply,
_bad_pinh_nak_pin_reply,
])
@func @func
def test_wcfs_pintimeout_kill_on_access(with_prompt_pintimeout): def test_wcfs_pinhfaulty_kill_on_access(faulty, with_prompt_pintimeout):
t = tDB(multiproc=True); zf = t.zfile; at0=t.at0 t = tDB(multiproc=True); zf = t.zfile; at0=t.at0
defer(t.close) defer(t.close)
...@@ -309,12 +393,13 @@ def test_wcfs_pintimeout_kill_on_access(with_prompt_pintimeout): ...@@ -309,12 +393,13 @@ def test_wcfs_pintimeout_kill_on_access(with_prompt_pintimeout):
wl.watch(zf, at1, {2:at1}) wl.watch(zf, at1, {2:at1})
# spawn faulty client and wait until it setups its watch # spawn faulty client and wait until it setups its watch
p = tFaultySubProcess(t, _bad_pinh_no_pin_reply, at=at2) p = tFaultySubProcess(t, faulty, at=at2)
defer(p.close) defer(p.close)
assert p.recv(t.ctx) == "f: watch setup ok" assert p.recv(t.ctx) == "f: watch setup ok"
# commit new transaction and issue read access to modified block # commit new transaction and issue read access to modified block
# our read should be served well even though faulty client is stuck. # our read should be served well even though faulty client is either stuck
# or behaves in problematic way in its pin handler.
# As the result the faulty client should be killed by wcfs. # As the result the faulty client should be killed by wcfs.
at3 = t.commit(zf, {1:'b3'}) at3 = t.commit(zf, {1:'b3'})
wg = sync.WorkGroup(t.ctx) wg = sync.WorkGroup(t.ctx)
...@@ -322,6 +407,7 @@ def test_wcfs_pintimeout_kill_on_access(with_prompt_pintimeout): ...@@ -322,6 +407,7 @@ def test_wcfs_pintimeout_kill_on_access(with_prompt_pintimeout):
f.assertBlk(1, 'b3', {wl: {1:at0}}, timeout=2*t.pintimeout) f.assertBlk(1, 'b3', {wl: {1:at0}}, timeout=2*t.pintimeout)
wg.go(_) wg.go(_)
def _(ctx): def _(ctx):
if faulty != _bad_pinh_no_pin_read:
assert p.recv(ctx) == b"pin %s #%d @%s" % (h(zf._p_oid), 1, h(at0)) assert p.recv(ctx) == b"pin %s #%d @%s" % (h(zf._p_oid), 1, h(at0))
wg.go(_) wg.go(_)
wg.wait() wg.wait()
...@@ -330,6 +416,30 @@ def test_wcfs_pintimeout_kill_on_access(with_prompt_pintimeout): ...@@ -330,6 +416,30 @@ def test_wcfs_pintimeout_kill_on_access(with_prompt_pintimeout):
assert p.exitcode is not None assert p.exitcode is not None
# _pinner_<problem> simulates faulty pinner inside client that behaves in
# problematic way in its pin notification handler.
@func(tFaultyClient)
def _pinner_no_pin_reply(f, ctx, wl):
req = wl.recvReq(ctx)
assert req is not None
f.cout.send(req.msg)
# sleep > wcfs pin timeout - wcfs must kill us
f.assertKilled(ctx, "wcfs did not kill stuck client")
@func(tFaultyClient)
def _pinner_nak_pin_reply(f, ctx, wl):
req = wl.recvReq(ctx)
assert req is not None
f.cout.send(req.msg)
wl.replyReq(ctx, req, b"nak")
# wcfs must kill us right after receiving the nak
f.assertKilled(ctx, "wcfs did not kill client that replied nak to pin")
# assertKilled assert that the current process becomes killed after time goes after pintimeout. # assertKilled assert that the current process becomes killed after time goes after pintimeout.
@func(tFaultyClient) @func(tFaultyClient)
def assertKilled(f, ctx, failmsg): def assertKilled(f, ctx, failmsg):
......
...@@ -1459,7 +1459,8 @@ def test_wcfs_watch_robust(): ...@@ -1459,7 +1459,8 @@ def test_wcfs_watch_robust():
"file not yet known to wcfs or is not a ZBigFile" "file not yet known to wcfs or is not a ZBigFile"
wl.close() wl.close()
# closeTX/bye cancels blocked pin handlers # closeTX gently with "bye" cancels blocked pin handlers without killing client
# (closing abruptly is verified in wcfs_faultyprot_test.py)
f = t.open(zf) f = t.open(zf)
f.assertBlk(2, 'c2') f.assertBlk(2, 'c2')
f.assertCache([0,0,1]) f.assertCache([0,0,1])
...@@ -1467,23 +1468,15 @@ def test_wcfs_watch_robust(): ...@@ -1467,23 +1468,15 @@ def test_wcfs_watch_robust():
wl = t.openwatch() wl = t.openwatch()
wg = sync.WorkGroup(timeout()) wg = sync.WorkGroup(timeout())
def _(ctx): def _(ctx):
# TODO clarify what wcfs should do if pin handler closes wlink TX:
# - reply error + close, or
# - just close
# t = when reviewing WatchLink.serve in wcfs.go
#assert wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \
# "error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \
# "pin #%d @%s: context canceled" % (2, h(at1))
#with raises(error, match="unexpected EOF"):
with raises(error, match="recvReply: link is down"): with raises(error, match="recvReply: link is down"):
wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1))) wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1)))
wg.go(_) wg.go(_)
def _(ctx): def _(ctx):
req = wl.recvReq(ctx) req = wl.recvReq(ctx)
assert req is not None assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1)) assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
# don't reply to req - close instead # don't reply to req - close instead
# NOTE this closes watchlink gently with first sending "bye" message
wl.closeWrite() wl.closeWrite()
wg.go(_) wg.go(_)
wg.wait() wg.wait()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment