Commit 34771671 authored by Levin Zimmermann's avatar Levin Zimmermann

wcfs_test: Ensure faulty client is killed @readPinWatchers

This patch extends the test scope of 'test_wcfs_pintimeout_kill'. Before
this patch, the test only ensured that a client that does not
respond to pin requests during the initial watch request [1] is
killed. Now it also tests that a faulty client is killed when a block
is invalidated. Since there are no other situations where the WCFS
server sends pin requests to a client, the tests now cover all situations
where a faulty client might not respond. This patch therefore aims to
increase the security that WCFS is not blocked by a faulty client.

[1] See nexedi/wendelin.core!18
parent e13975d6
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2018-2023 Nexedi SA and Contributors. # Copyright (C) 2018-2024 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
...@@ -773,42 +773,7 @@ class tFile: ...@@ -773,42 +773,7 @@ class tFile:
blkview = t._blk(blk) blkview = t._blk(blk)
assert t.cached()[blk] == cached assert t.cached()[blk] == cached
def _(ctx, ev): ev = doCheckingPin(lambda ctx, ev: t._triggerPin(ctx, ev, blk, cached), pinokByWLink, pinfunc)
assert t.cached()[blk] == cached
ev.append('read pre')
# access data with released GIL so that the thread that reads data from
# head/watch can receive pin message. Be careful to handle cancellation,
# so that on error in another worker we don't get stuck and the
# error can be propagated to wait and reported.
#
# we handle cancellation by spawning read in another thread and
# waiting for either ctx cancel, or read thread to complete. This
# way on ctx cancel (e.g. assertion failure in another worker), the
# read thread can remain running even after _assertBlk returns, and
# in particular till the point where the whole test is marked as
# failed and shut down. But on test shutdown .fmmap is unmapped for
# all opened tFiles, and so read will hit SIGSEGV. Prepare to catch
# that SIGSEGV here.
have_read = chan(1)
def _():
try:
b = read_exfault_nogil(blkview[0:1])
except SegmentationFault:
b = 'FAULT'
t._blkaccess(blk)
have_read.send(b)
go(_)
_, _rx = select(
ctx.done().recv, # 0
have_read.recv, # 1
)
if _ == 0:
raise ctx.err()
b = _rx
ev.append('read ' + b)
ev = doCheckingPin(_, pinokByWLink, pinfunc)
# XXX hack - wlinks are notified and emit events simultaneously - we # XXX hack - wlinks are notified and emit events simultaneously - we
# check only that events begin and end with read pre/post and that pins # check only that events begin and end with read pre/post and that pins
...@@ -832,6 +797,43 @@ class tFile: ...@@ -832,6 +797,43 @@ class tFile:
# we just accessed the block in full - it has to be in OS cache completely # we just accessed the block in full - it has to be in OS cache completely
assert t.cached()[blk] == 1 assert t.cached()[blk] == 1
# 'triggerPin' triggers server pin requests by reading 'blk'.
def _triggerPin(t, ctx, ev, blk, cached):
assert t.cached()[blk] == cached
blkview = t._blk(blk)
ev.append('read pre')
# access data with released GIL so that the thread that reads data from
# head/watch can receive pin message. Be careful to handle cancellation,
# so that on error in another worker we don't get stuck and the
# error can be propagated to wait and reported.
#
# we handle cancellation by spawning read in another thread and
# waiting for either ctx cancel, or read thread to complete. This
# way on ctx cancel (e.g. assertion failure in another worker), the
# read thread can remain running even after _assertBlk returns, and
# in particular till the point where the whole test is marked as
# failed and shut down. But on test shutdown .fmmap is unmapped for
# all opened tFiles, and so read will hit SIGSEGV. Prepare to catch
# that SIGSEGV here.
have_read = chan(1)
def _():
try:
b = read_exfault_nogil(blkview[0:1])
except SegmentationFault:
b = 'FAULT'
t._blkaccess(blk)
have_read.send(b)
go(_)
_, _rx = select(
ctx.done().recv, # 0
have_read.recv, # 1
)
if _ == 0:
raise ctx.err()
b = _rx
ev.append('read ' + b)
# assertData asserts that file has data blocks as specified. # assertData asserts that file has data blocks as specified.
# #
...@@ -1453,63 +1455,141 @@ def test_wcfs_watch_going_back(): ...@@ -1453,63 +1455,141 @@ def test_wcfs_watch_going_back():
# TODO extend tests to also cover situation that a non-faulty # TODO extend tests to also cover situation that a non-faulty
# client continues to be served ok # client continues to be served ok
# TODO explicitly cover readPinWatchers behaviour with tests
# verify that wcfs kills slow/faulty client who does not reply to pin in time. # verify that wcfs kills slow/faulty client who does not reply to pin in time.
@func @func
def test_wcfs_pintimeout_kill(): def test_wcfs_pintimeout_kill():
# adjusted wcfs timeout to kill client who is stuck not providing pin reply # WCFS server sends pin requests in two different situations:
# timeout until killing is 30 seconds, we add 2 seconds delay for the actual # (1) during the initial watch request of the client. If the client wants
# killing/process shutdown. # to watch blocks older than head, the server sends pin requests for
tkill = 32*time.second # these blocks.
t = tDB(timeout=tkill*2); zf = t.zfile _test_wcfs_pintimeout_kill_at_initial_watch_request()
defer(t.close) # (2) After a block has been updated and a client still watches at
# a pre-update state, the server needs to send pin requests.
_test_wcfs_pintimeout_kill_at_read_pin_watchers()
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2'}) @func
f = t.open(zf) def _test_wcfs_pintimeout_kill_at_initial_watch_request():
t = _tPinTimeout(); defer(t.close)
at1 = t.DB.commit(t.zf, {2:'c1'})
at2 = t.DB.commit(t.zf, {2:'c2'})
f = t.DB.open(t.zf)
f.assertData(['','','c2']) f.assertData(['','','c2'])
# move into subprocess to avoid killing testing process, in @t
# case test is successful
def test(err): def test(err):
try: ctx, _ = context.with_timeout(context.background(), 2*t.kill)
_test(err) wl = t.DB.openwatch()
except Exception:
if not err.value:
err.value = "unexpected error in test code"
def _test(err):
ctx, _ = context.with_timeout(context.background(), 2*tkill)
wl = t.openwatch()
wg = sync.WorkGroup(ctx) wg = sync.WorkGroup(ctx)
def _(ctx): def _(ctx):
# send watch. The pin handler won't be replying -> we should never get reply here. # send watch. The pin handler won't be replying -> we should never get reply here.
wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1))) wl.sendReq(ctx, b"watch %s @%s" % (h(t.zf._p_oid), h(at1)))
err.value= "watch request completed (should not as pin handler is stuck)" err.value = "watch request completed (should not as pin handler is stuck)"
wg.go(_)
def _(ctx):
# server must reply with pin request, as we want to watch OID@at1,
# but head is already @at2.
req = wl.recvReq(ctx)
assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(t.zf._p_oid), 2, h(at1))
# sleep > wcfs pin timeout - wcfs must kill us
_, _rx = select(
ctx.done().recv, # 0
time.after(t.kill).recv, # 1
)
if _ == 0:
raise ctx.err()
err.value = "wcfs did not kill stuck client"
wg.go(_) wg.go(_)
wg.wait()
@func
def _test_wcfs_pintimeout_kill_at_read_pin_watchers():
t = _tPinTimeout(); defer(t.close)
blk = 2
at1 = t.DB.commit(t.zf, {blk:'c1'})
f = t.DB.open(t.zf)
f.assertData(['','','c1'])
@t
def test(err):
ctx, _ = context.with_timeout(context.background(), 2*t.kill)
wl = t.DB.openwatch()
wg = sync.WorkGroup(ctx)
# Initial watch request:
# Client sends OID watch request @at1. As there isn't any newer
# data than 'at1' for OID, the client can watch @head & server doesn't
# need to pin anything to client
def _(ctx): def _(ctx):
wl.sendReq(ctx, b"watch %s @%s" % (h(t.zf._p_oid), h(at1)))
wg.go(_)
wg.wait()
# Trigger readPinWatchers by overridding blk => client needs to
# pin blk to at1, because head is now @at2.
at2 = t.DB.commit(t.zf, {blk:'c2'})
blkview = f._blk(blk)
catched = f.cached()[blk]
wg = sync.WorkGroup(ctx)
wg.go(lambda ctx: f._triggerPin(ctx, [], blk, catched))
def _(ctx):
# Client receives now pin request by server, but doesn't respond in order
# to trigger server to kill client.
req = wl.recvReq(ctx) req = wl.recvReq(ctx)
assert req is not None assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1)) assert req.msg == b"pin %s #%d @%s" % (h(t.zf._p_oid), 2, h(at1))
# sleep > wcfs pin timeout - wcfs must kill us # sleep > wcfs pin timeout - wcfs must kill us
_, _rx = select( _, _rx = select(
ctx.done().recv, # 0 ctx.done().recv, # 0
time.after(tkill).recv, # 1 time.after(t.kill).recv, # 1
) )
if _ == 0: if _ == 0:
raise ctx.err() raise ctx.err()
err.value= "wcfs did not kill stuck client" err.value = "wcfs did not kill stuck client"
wg.go(_) wg.go(_)
wg.wait() wg.wait()
err = Value((ctypes.c_char_p if six.PY2 else ctypes.c_wchar_p), "")
p = Process(target=test, args=(err,)) # '_tPinTimeout' provides common utilities for pin timeout tests
p.start() class _tPinTimeout(object):
p.join() def __init__(t):
if err.value: # adjusted wcfs timeout to kill client who is stuck not providing pin reply
fail(err.value) # timeout until killing is 30 seconds, we add 2 seconds delay for the actual
# killing/process shutdown.
t.kill = 32*time.second
t.DB = tDB(timeout=t.kill*2)
t.zf = t.DB.zfile
def close(t):
t.DB.close()
# Executes test code in subprocess, to avoid killing testing
# process, in case test is successful & faulty client is killed.
def __call__(t, test):
def _(err):
try:
test(err)
except Exception:
if not err.value:
err.value = "unexpected error in test code"
err = Value((ctypes.c_char_p if six.PY2 else ctypes.c_wchar_p), "")
p = Process(target=_, args=(err,))
p.start()
p.join()
if err.value:
fail(err.value)
# watch with @at > head - must wait for head to become >= at. # watch with @at > head - must wait for head to become >= at.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment