Commit 4095241e authored by Kirill Smelkov's avatar Kirill Smelkov

Y wcfs: Fix and enhance `wcfs stop` to be more reliable

Since wcfs beginning - since e3f2ee2d (wcfs: Initial implementation of
basic filesystem) `wcfs stop` was implemented as just `fusermount -u`.
That, however, turned out to be not robust because if wcfs is
deadlocked, unmounting hangs, and if wcfs server is crashed, but there
are still running client processes, unmount will fail with "Device or
resource busy" error.

For the deadlocked case we often see a situation where both wcfs and
client zope processes are hung, kill -9 does not work on them (they
still remain hung) and there is no easy way to do the unmount and
restart wcfs.

-> Fix `wcfs stop` to do that by first breaking the deadlock via
/sys/fs/fuse/connection/<X>/abort and making sure that:

1) wcfs.go is not running,
2) all left clients are terminated, and
3) the mount is also gone

In many ways this coincides with what Server.stop was already doing, so
here we teach `wcfs stop` to work via that Server.stop codepath and
adjust the latter to work ok if Server._proc is not only
subprocess.Popen that current process spawned, but also an xos.Proc,
that `wcfs stop` discovered. Which can be also None if wcfs.go crashed
by itself.

As explained in the comments I took the decision to kill client
processes instead of doing the final unmount try lazily because

    # NOTE if we do `fusermount -uz` (lazy unmount = MNT_DETACH), we will
    #      remove the mount from filesystem tree and /proc/mounts, but the
    #      clients will be left alive and using the old filesystem which is
    #      left in a bad ENOTCONN state. From this point of view restart of
    #      the clients is more preferred compared to leaving them running
    #      but actually disconnected from the data.
    #
    # TODO try to teach wcfs clients to detect filesystem in ENOTCONN state
    #      and reconnect automatically instead of being killed. Then we could
    #      use MNT_DETACH.
parent e45fd39f
...@@ -18,11 +18,12 @@ ...@@ -18,11 +18,12 @@
# See COPYING file for full licensing terms. # See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
"""Module wcfs.py provides python gateway for spawning, monitoring """Module wcfs.py provides python gateway for spawning, stopping, monitoring
and interoperating with wcfs server. and interoperating with wcfs server.
Serve (zurl) starts and runs WCFS server for ZODB at zurl. Serve (zurl) starts and runs WCFS server for ZODB at zurl.
Start (zurl) starts WCFS server for ZODB at zurl and returns corresponding Server object. Start (zurl) starts WCFS server for ZODB at zurl and returns corresponding Server object.
Stop (zurl) makes sure that WCFS server for ZODB at zurl is not running.
Status(zurl) verifies whether WCFS server for ZODB at zurl is running in correct state. Status(zurl) verifies whether WCFS server for ZODB at zurl is running in correct state.
Join(zurl) joins wcfs server for ZODB at zurl and returns WCFS object that Join(zurl) joins wcfs server for ZODB at zurl and returns WCFS object that
...@@ -97,7 +98,7 @@ class Server: ...@@ -97,7 +98,7 @@ class Server:
# ._mnt mount entry # ._mnt mount entry
# ._proc wcfs process: # ._proc wcfs process:
# \/ subprocess.Popen ; we spawned it # \/ subprocess.Popen ; we spawned it
# \/ xos.Proc ; we discovered it during status # \/ xos.Proc | None ; we discovered it during status or stop
# ._fuseabort opened /sys/fs/fuse/connections/X/abort for server's mount # ._fuseabort opened /sys/fs/fuse/connections/X/abort for server's mount
# ._stopOnce # ._stopOnce
pass pass
...@@ -398,19 +399,22 @@ def mountpoint(wcsrv): ...@@ -398,19 +399,22 @@ def mountpoint(wcsrv):
@func(Server) @func(Server)
def stop(wcsrv, ctx=None): def stop(wcsrv, ctx=None):
if ctx is None: if ctx is None:
ctx, cancel = context.with_timeout(context.background(), 20*time.second) ctx, cancel = context.with_timeout(context.background(), 25*time.second)
defer(cancel) defer(cancel)
wcsrv._stop(ctx) wcsrv._stop(ctx)
@func(Server) @func(Server)
def _stop(wcsrv, ctx, _onstuck=None): def _stop(wcsrv, ctx, _on_wcfs_stuck=None, _on_fs_busy=None, _on_last_unmount_try=None):
def _(): def _():
wcsrv.__stop(ctx, _onstuck) wcsrv.__stop(ctx, _on_wcfs_stuck, _on_fs_busy, _on_last_unmount_try)
wcsrv._stopOnce.do(_) wcsrv._stopOnce.do(_)
@func(Server) @func(Server)
def __stop(wcsrv, ctx, _onstuck): def __stop(wcsrv, ctx, _on_wcfs_stuck, _on_fs_busy, _on_last_unmount_try):
log.info("unmount/stop wcfs pid%d @ %s", wcsrv._proc.pid, wcsrv.mountpoint) wcstr = "(not running)"
if wcsrv._proc is not None:
wcstr = "pid%d" % wcsrv._proc.pid
log.info("unmount/stop wcfs %s @ %s", wcstr, wcsrv.mountpoint)
deadline = ctx.deadline() deadline = ctx.deadline()
if deadline is None: if deadline is None:
...@@ -440,12 +444,69 @@ def __stop(wcsrv, ctx, _onstuck): ...@@ -440,12 +444,69 @@ def __stop(wcsrv, ctx, _onstuck):
defer(_) defer(_)
def _(): def _():
# second unmount, if first unmount failed and we had to abort FUSE connection # second unmount try, if first unmount failed and we had to kill
# -z (lazy) because this one has to succeed, but there could be still # wcfs.go, abort FUSE connection and kill clients. This still can fail
# client file descriptors left pointing to the mounted filesystem. # because e.g. killing clients had not enough permissions, or because
# new clients arrived.
# TODO the second problem can be addressed with first mounting an empty
# directory over previous mount, but it is a privileged operation.
# Maybe we can do that via small "empty" fuse fs.
if _is_mountpoint(wcsrv.mountpoint): if _is_mountpoint(wcsrv.mountpoint):
log.warn("-> unmount -z ...") log.warn("last unmount try")
_fuse_unmount(wcsrv.mountpoint, "-z") if _on_last_unmount_try is not None:
_on_last_unmount_try(wcsrv.mountpoint)
else:
_fuse_unmount(wcsrv.mountpoint)
defer(_)
def _():
# if mount is still there - it is likely because clients keep opened file descriptors to it
# -> kill clients to free the mount
#
# NOTE if we do `fusermount -uz` (lazy unmount = MNT_DETACH), we will
# remove the mount from filesystem tree and /proc/mounts, but the
# clients will be left alive and using the old filesystem which is
# left in a bad ENOTCONN state. From this point of view restart of
# the clients is more preferred compared to leaving them running
# but actually disconnected from the data.
#
# TODO try to teach wcfs clients to detect filesystem in ENOTCONN state
# and reconnect automatically instead of being killed. Then we could
# use MNT_DETACH.
if _is_mountpoint(wcsrv.mountpoint):
lsof = list(wcsrv._mnt.lsof())
for (proc, use) in lsof:
log.warn("the mount is still used by %s:" % proc)
for key, path in use.items():
log.warn("\t%s\t-> %s\n" % (key, path))
if len(lsof) > 0:
if _on_fs_busy is not None:
_on_fs_busy()
else:
wg = sync.WorkGroup(timeoutFrac(0.2))
def kill(ctx, proc):
dt = ctx.deadline() - time.now()
log.warn("%s: <- SIGTERM" % proc)
os.kill(proc.pid, SIGTERM)
ctx1, _ = context.with_timeout(ctx, dt/2)
if _procwait_(ctx1, proc):
log.warn("%s: terminated" % proc)
return
if ctx.err() is not None:
raise ctx.err()
log.warn("%s: is still alive after SIGTERM" % proc)
log.warn("%s: <- SIGKILL" % proc)
os.kill(proc.pid, SIGKILL)
ctx2, _ = context.with_timeout(ctx, dt/2)
if _procwait_(ctx2, proc):
log.warn("%s: terminated" % proc)
return
log.warn("%s: does not exit after SIGKILL")
for (proc, _) in lsof:
wg.go(kill, proc)
wg.wait()
defer(_) defer(_)
def _(): def _():
...@@ -455,8 +516,11 @@ def __stop(wcsrv, ctx, _onstuck): ...@@ -455,8 +516,11 @@ def __stop(wcsrv, ctx, _onstuck):
@func @func
def _(): def _():
# kill wcfs.go in case it is deadlocked and does not exit by itself # kill wcfs.go harder in case it is deadlocked and does not exit by itself
if _procwait_(timeoutFrac(0.5), wcsrv._proc): if wcsrv._proc is None:
return
if _procwait_(timeoutFrac(0.2), wcsrv._proc):
return return
log.warn("wcfs.go does not exit") log.warn("wcfs.go does not exit")
...@@ -464,20 +528,20 @@ def __stop(wcsrv, ctx, _onstuck): ...@@ -464,20 +528,20 @@ def __stop(wcsrv, ctx, _onstuck):
log.warn("-> kill -QUIT wcfs.go ...") log.warn("-> kill -QUIT wcfs.go ...")
os.kill(wcsrv._proc.pid, SIGQUIT) os.kill(wcsrv._proc.pid, SIGQUIT)
if _procwait_(timeoutFrac(0.25), wcsrv._proc): if _procwait_(timeoutFrac(0.2), wcsrv._proc):
return return
log.warn("wcfs.go does not exit (after SIGQUIT)") log.warn("wcfs.go does not exit (after SIGQUIT)")
log.warn(wcsrv._stuckdump()) log.warn(wcsrv._stuckdump())
log.warn("-> kill -KILL wcfs.go ...") log.warn("-> kill -KILL wcfs.go ...")
os.kill(wcsrv._proc.pid, SIGKILL) os.kill(wcsrv._proc.pid, SIGKILL)
if _procwait_(timeoutFrac(0.25), wcsrv._proc): if _procwait_(timeoutFrac(0.2), wcsrv._proc):
return return
log.warn("wcfs.go does not exit (after SIGKILL; probably it is stuck in kernel)") log.warn("wcfs.go does not exit (after SIGKILL; probably it is stuck in kernel)")
log.warn(wcsrv._stuckdump()) log.warn(wcsrv._stuckdump())
log.warn("-> nothing we can do...") log.warn("-> nothing we can do...")
if _onstuck is not None: if _on_wcfs_stuck is not None:
_onstuck() _on_wcfs_stuck()
else: else:
_procwait(context.background(), wcsrv._proc) _procwait(context.background(), wcsrv._proc)
defer(_) defer(_)
...@@ -491,8 +555,13 @@ def __stop(wcsrv, ctx, _onstuck): ...@@ -491,8 +555,13 @@ def __stop(wcsrv, ctx, _onstuck):
# aborting fuse connection is needed in case wcfs/kernel will be stuck # aborting fuse connection is needed in case wcfs/kernel will be stuck
# in a deadlock even after being `kill -9`. See comments in tWCFS for details. # in a deadlock even after being `kill -9`. See comments in tWCFS for details.
def _(): def _():
log.warn("-> kill -TERM wcfs.go ...") if wcsrv._proc is not None:
os.kill(wcsrv._proc.pid, SIGTERM) log.warn("-> kill -TERM wcfs.go ...")
os.kill(wcsrv._proc.pid, SIGTERM)
if _procwait_(timeoutFrac(0.2), wcsrv._proc):
return
log.warn("wcfs.go does not exit")
log.warn(wcsrv._stuckdump())
if wcsrv._fuseabort is not None: if wcsrv._fuseabort is not None:
log.warn("-> abort FUSE connection ...") log.warn("-> abort FUSE connection ...")
wcsrv._fuseabort.write(b"1\n") wcsrv._fuseabort.write(b"1\n")
...@@ -504,6 +573,34 @@ def __stop(wcsrv, ctx, _onstuck): ...@@ -504,6 +573,34 @@ def __stop(wcsrv, ctx, _onstuck):
if not isinstance(e, _FUSEUnmountError): if not isinstance(e, _FUSEUnmountError):
raise raise
# stop stops wcfs server for ZODB @ zurl.
#
# It makes sure that wcfs server is stopped and tries to unmount corresponding
# tree from the filesystem namespace. This unmount can fail with EBUSY if there
# are present client processes that still use anything under WCFS mountpoint.
# In such a case all present clients are terminated to proceed with successful
# unmount.
#
# TODO consider doing lazy unmount (MNT_DETACH) instead of killing clients.
# See comments in Server.__stop for details.
#
# XXX tests
def stop(zurl):
log.info("stop %s ...", zurl)
# find mount entry for mntpt
mnt = _lookup_wcmnt(zurl, nomount_ok=True)
if mnt is None:
log.info("not mounted")
return
log.info("mount entry: %s (%s)" % (mnt.point, _devstr(mnt.dev)))
# find server process that serves the mount
wcsrv = _lookup_wcsrv(mnt, nosrv_ok=True)
log.info("served by %s" % wcsrv._proc)
wcsrv.stop()
# status verifies whether wcfs server for ZODB @ zurl is running in correct state. # status verifies whether wcfs server for ZODB @ zurl is running in correct state.
# #
# XXX tests # XXX tests
...@@ -598,7 +695,7 @@ def status(zurl): ...@@ -598,7 +695,7 @@ def status(zurl):
# _lookup_wcsrv returns WCFS Server with server process that is serving filesystem under mnt. # _lookup_wcsrv returns WCFS Server with server process that is serving filesystem under mnt.
def _lookup_wcsrv(mnt): # -> Server def _lookup_wcsrv(mnt, nosrv_ok=False): # -> Server (with ._proc=None if nosrv_ok and no server proc is found)
assert isinstance(mnt, xos.Mount) assert isinstance(mnt, xos.Mount)
assert mnt.fstype == "fuse.wcfs" assert mnt.fstype == "fuse.wcfs"
...@@ -632,8 +729,11 @@ def _lookup_wcsrv(mnt): # -> Server ...@@ -632,8 +729,11 @@ def _lookup_wcsrv(mnt): # -> Server
if len(_) > 1: if len(_) > 1:
raise RuntimeError("multiple wcfs servers found: %s" % (_)) raise RuntimeError("multiple wcfs servers found: %s" % (_))
if len(_) == 0: if len(_) == 0:
raise RuntimeError("no wcfs server found") if not nosrv_ok:
wcproc = _[0] raise RuntimeError("no wcfs server found")
wcproc = None
else:
wcproc = _[0]
wcsrv = Server(mnt, wcproc, fuseabort) wcsrv = Server(mnt, wcproc, fuseabort)
return wcsrv return wcsrv
...@@ -867,15 +967,35 @@ def _sysproccallout(argv, **kw): # -> retcode, output ...@@ -867,15 +967,35 @@ def _sysproccallout(argv, **kw): # -> retcode, output
return proc.returncode, out return proc.returncode, out
# _procwait waits for a process (subprocess.Popen) to terminate. # _procwait waits for a process (subprocess.Popen | xos.Proc) to terminate.
def _procwait(ctx, proc): def _procwait(ctx, proc):
_waitfor(ctx, lambda: proc.poll() is not None) _waitfor(ctx, lambda: not _proc_isalive(proc))
# _procwait_, similarly to _procwait, waits for a process (subprocess.Popen) to terminate. # _procwait_, similarly to _procwait, waits for a process (subprocess.Popen | xos.Proc) to terminate.
# #
# it returns bool whether process terminated or not - e.g. due to context being canceled. # it returns bool whether process terminated or not - e.g. due to context being canceled.
def _procwait_(ctx, proc): # -> ok def _procwait_(ctx, proc): # -> ok
return _waitfor_(ctx, lambda: proc.poll() is not None) return _waitfor_(ctx, lambda: not _proc_isalive(proc))
# _proc_isalive returns whether process (subprocess.Popen | xos.Proc) is alive or not.
def _proc_isalive(proc): # -> bool
assert isinstance(proc, (subprocess.Popen, xos.Proc))
if isinstance(proc, subprocess.Popen):
return proc.poll() is None
if isinstance(proc, xos.Proc):
# proc can be from connection with isolation_level being repeatable-read or snapshot
# so proc cannot be used directly to recheck its status
# -> use another proc with isolation=none to poll the status
pdbc = xos.ProcDB.open(isolation_level=xos.ISOLATION_NONE)
p = pdbc.get(proc.pid)
if p is None:
return False
try:
st = p.status['State']
except xos.ProcGone: # process completed and waited on by its parent
return False
return not st.startswith('Z') # zombie
# _waitfor waits for condf() to become true. # _waitfor waits for condf() to become true.
def _waitfor(ctx, condf): def _waitfor(ctx, condf):
...@@ -999,9 +1119,9 @@ def main(): ...@@ -999,9 +1119,9 @@ def main():
status(zurl) status(zurl)
elif cmd == "stop": elif cmd == "stop":
mntpt = _mntpt_4zurl(zurl) if optv:
_fuse_unmount(mntpt) _usage(sys.stderr)
_rmdir_ifexists(mntpt) stop(zurl)
else: else:
print("wcfs: unknown command %s" % qq(cmd), file=sys.stderr) print("wcfs: unknown command %s" % qq(cmd), file=sys.stderr)
......
...@@ -410,9 +410,28 @@ class tWCFS(_tWCFS): ...@@ -410,9 +410,28 @@ class tWCFS(_tWCFS):
assert not is_mountpoint(t.wc.mountpoint) assert not is_mountpoint(t.wc.mountpoint)
defer(_) defer(_)
def _(): def _():
def onstuck(): def on_wcfs_stuck():
fail("wcfs.go does not exit even after SIGKILL") fail("wcfs.go does not exit even after SIGKILL")
t.wc._wcsrv._stop(timeout(), _onstuck=onstuck)
# do not kill clients when the filesystem is still in use on stop
# and use -z (lazy) unmount instead because during tests it is more
# convenient that this last unmount unconditionally succeed and we
# do not care that much about file descriptors left open by a buggy
# test function.
#
# NOTE this behaviour is different from on-production stop behaviour
# where we make sure that either stop fails or completes and there
# is no more a) mount, b) wcfs.go running and c) clients using the old mount.
def on_fs_busy():
wcfs.log.warn("test: not killing clients during test run to avoid killing test driver itself)")
def on_last_unomount_try(mntpt):
wcfs.log.warn("test: -> unmount -z ...")
wcfs._fuse_unmount(mntpt, "-z")
t.wc._wcsrv._stop(timeout(),
_on_wcfs_stuck=on_wcfs_stuck,
_on_fs_busy=on_fs_busy,
_on_last_unmount_try=on_last_unomount_try)
defer(_) defer(_)
defer(t.wc.close) defer(t.wc.close)
assert is_mountpoint(t.wc.mountpoint) assert is_mountpoint(t.wc.mountpoint)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment