Commit e3f2ee2d authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Initial implementation of basic filesystem

Provide filesystem view of in-ZODB ZBigFiles, but do not implement support for
invalidations nor isolation protocol yet. In particular, because ZODB
invalidations are not yet handled, the filesystem does not update its data in
accordance with ZODB updates, and instead provides stale data view that
corresponds to the state of ZODB at the time when wcfs was mounted.

The main parts of this patch are:

- wcfs/wcfs.go is filesystem implementation itself together with overview.
- wcfs/__init__.py is python wrapper to spawn and interoperate with that filesystem.
- wcfs/wcfs_test.py is tests.

Some preliminary history:

fe7efb94    X start of wcfs
878b2787    X draft loading
d58c71e8    X don't overalign end by 1 blksize if end is already aligned
29c9f13d    X readBlk: Fix thinko in already case
59552328    X wcfs: Care to disable OS polling on us
c00d94c7    X workaround lack of exception chaining on Python2 with xdefer
0398e23d    X bytearray turned out to be copying data
7a837040    X print wcfs.py py-level traceback on SIGBUS (e.g. wcfs.go aborting due to bug/panic)
661b871f    X make sure tests don't get stuck even if wcfs gets killed -9 ...
2c043d29    X More effort to unmount failed wcfs.go
1ccc4478    X Use `with gil` + regular py code instead of PyGILState_Ensure/PyGILState_Release/PyRun_SimpleString
5dc9c791    X wcfs: Kill xdefer
91e9eba8    X wcfs: test: Register tFile to tDB early
a7138fef    X wcfs: mkdir /tmp/wcfs with sticky bit
1eec76d0    X wcfs: try to set sticky for /tmp/wcfs even if the directory already exists
c2c35851    X wcfs: tests: Factor-out waiting for a general condition to become true into waitfor
78f36993    X wcfs: test: Fix thinko in getting /sys/fs/fuse/connection/<X> for wcfs
bc9eb16f    X wcfs: tests: Don't use testmntpt everywhere
6dec74e7    X wcfs: tests: Split tDB into -> tDB + tWCFS
3a6bd764    X wcfs: tests: Run `fusermount -u` the second time if we had to kill wcfs
112720f3    X wcfs: tests: Print which files are still opened on wcfs if `fusermount -u` fails
bb40185b    X wcfs: Take $WENDELIN_CORE_WCFS_OPTIONS into account not only from under join
03a9ef33    X wcfs: Remove credentials from zurl when computing wcfs mountpoint
68ee5bdc    X wcfs: lsof tweaks
21671879    X wcfs: Teach entrypoint frontend to handle subcommands: serve, status, stop
b0642b80    X wcfs: Switch mountpoints from /tmp/wcfs/* to /dev/shm/*
b0ca031f    X wcfs: Teach join/serve to start successfully even after unclean wcfs shutdown
5bfa8cf8    X wcfs: Add start to spawn a Server that can be later stopped  (draft)
5fcec261    X wcfs: Run fusermount and friends with /bin:/usr/bin always on path
669d7a20    fixup! X wcfs: Run fusermount and friends with /bin:/usr/bin always on path
6b22f8c4    X wcfs: Teach start to start successfully even after unclean wcfs shutdown
15389db0    X wcfs: Tune _fuse_unmount to include `fusermount -u` error message into raised exception
153c002a    X wcfs: _fuse_unmount: Try first `kill -TERM` before `kill -QUIT` wcfs
3244f3a6    X wcfs: lsof +D misbehaves - don't use it
a126e709    X wcfs: Put client log into its own logger
ac303d1e    X wcfs: tests: -v  ->  show only wcfs.py logs verbosely
d671a9e9    X wcfs: Give more time to stop wcfs server
parent 2c152d41
...@@ -41,3 +41,20 @@ def transaction_reset(): ...@@ -41,3 +41,20 @@ def transaction_reset():
transaction.manager.clearSynchs() transaction.manager.clearSynchs()
yield yield
# nothing to run after test # nothing to run after test
# enable log_cli on no-capture
# (output during a test is a mixture of print and log)
def pytest_configure(config):
if config.option.capture == "no":
config.inicfg['log_cli'] = "true"
assert config.getini("log_cli") is True
# -v -> verbose wcfs.py logs
if config.option.verbose > 0:
import logging
wcfslog = logging.getLogger('wcfs')
wcfslog.setLevel(logging.INFO)
# -vv -> verbose *.py logs
# XXX + $WENDELIN_CORE_WCFS_OPTIONS="-d -alsologtostderr -v=1" ?
if config.option.verbose > 1:
config.inicfg['log_cli_level'] = "INFO"
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
# See COPYING file for full licensing terms. # See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
from golang.pyx.build import setup, DSO as _DSO, Extension as _PyGoExt, build_ext as _build_ext from golang.pyx.build import setup, DSO as _DSO, Extension as _PyGoExt, build_ext as _build_ext
from setuptools_dso import Extension
from setuptools import Command, find_packages from setuptools import Command, find_packages
from setuptools.command.build_py import build_py as _build_py from setuptools.command.build_py import build_py as _build_py
from pkg_resources import working_set, EntryPoint from pkg_resources import working_set, EntryPoint
...@@ -73,8 +74,12 @@ def _with_defaults(what, *argv, **kw): ...@@ -73,8 +74,12 @@ def _with_defaults(what, *argv, **kw):
ccdefault.append('-std=gnu++11') # not c++11 since we use typeof ccdefault.append('-std=gnu++11') # not c++11 since we use typeof
# DSOs are not yet annotated for visibility # DSOs are not yet annotated for visibility
# XXX pyext besides _bigfile.so also cannot do this because PyMODINIT_FUNC
# does not include export in it. TODO reenable for _bigfile.so
"""
if what != _DSO: if what != _DSO:
ccdefault.append('-fvisibility=hidden') # by default symbols not visible outside DSO ccdefault.append('-fvisibility=hidden') # by default symbols not visible outside DSO
"""
_ = kw.get('extra_compile_args', [])[:] _ = kw.get('extra_compile_args', [])[:]
_[0:0] = ccdefault _[0:0] = ccdefault
...@@ -313,6 +318,15 @@ setup( ...@@ -313,6 +318,15 @@ setup(
define_macros = [('_GNU_SOURCE',None)], define_macros = [('_GNU_SOURCE',None)],
language = 'c', language = 'c',
dsos = ['wendelin.bigfile.libvirtmem']), dsos = ['wendelin.bigfile.libvirtmem']),
PyGoExt('wendelin.wcfs.internal.wcfs_test',
['wcfs/internal/wcfs_test.pyx']),
Extension('wendelin.wcfs.internal.io',
['wcfs/internal/io.pyx']),
Extension('wendelin.wcfs.internal.mm',
['wcfs/internal/mm.pyx']),
], ],
package_dir = {'wendelin': ''}, package_dir = {'wendelin': ''},
......
...@@ -18,19 +18,671 @@ ...@@ -18,19 +18,671 @@
# See COPYING file for full licensing terms. # See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options. # See https://www.nexedi.com/licensing for rationale and options.
"""Module wcfs.py provides python gateway for spawning wcfs server. """Module wcfs.py provides python gateway for spawning and interoperating with wcfs server.
Serve(zurl) starts and runs WCFS server for ZODB at zurl.
Start(zurl) starts WCFS server for ZODB at zurl and returns corresponding Server object.
Join(zurl) joins wcfs server for ZODB at zurl and returns WCFS object that
represents filesystem-level connection to joined wcfs server. If wcfs server
for zurl is not yet running, it will be automatically started if join is given
`autostart=True` option.
Environment variables
---------------------
The following environment variables can be used to control wcfs.py client:
$WENDELIN_CORE_WCFS_AUTOSTART
yes join: spawn wcfs server if no one was found and no explicit
autostart=X was given (default)
no join: don't spawn wcfs server unless explicitly requested via autostart=True
$WENDELIN_CORE_WCFS_OPTIONS
"" serve/start/join: additional options to pass to wcfs server when spawning it
""" """
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
import os, sys import os, sys, hashlib, subprocess, stat
import logging; log = logging.getLogger('wcfs')
from os.path import dirname from os.path import dirname
from stat import S_ISDIR
from errno import ENOENT, ENOTCONN, EEXIST
from signal import SIGTERM, SIGQUIT, SIGKILL
from golang import chan, select, default, func, defer
from golang import context, errors, sync, time
from golang.gcompat import qq
from persistent import Persistent
from zodbtools.util import ashex as h
from six.moves.urllib.parse import urlsplit, urlunsplit
# Server represents running wcfs server.
#
# Use start to create it.
class Server:
# .mountpoint path to wcfs mountpoint
# ._proc wcfs process
# ._fuseabort opened /sys/fs/fuse/connections/X/abort for this server
# ._stopOnce
pass
# WCFS represents filesystem-level connection to wcfs server.
#
# Use join to create it.
#
# Raw files on wcfs can be accessed with ._path/._read/._stat/._open .
#
# WCFS logically mirrors ZODB.DB .
class WCFS:
# .mountpoint path to wcfs mountpoint
# ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly)
# ._njoin this connection was returned for so many joins
# ._wcsrv wcfs Server if it was opened by this WCFS | None
pass
# ---- WCFS raw file access (primarily for tests) ----
# _path returns path for object on wcfs.
# - str: wcfs root + obj;
# - Persistent: wcfs root + (head|@<at>)/bigfile/obj
@func(WCFS)
def _path(wc, obj, at=None):
if isinstance(obj, Persistent):
#assert type(obj) is ZBigFile XXX import cycle
objtypestr = type(obj).__module__ + "." + type(obj).__name__
assert objtypestr == "wendelin.bigfile.file_zodb.ZBigFile", objtypestr
head = "head/" if at is None else ("@%s/" % h(at))
obj = "%s/bigfile/%s" % (head, h(obj._p_oid))
at = None
assert isinstance(obj, str)
assert at is None # must not be used with str
return os.path.join(wc.mountpoint, obj)
# _read reads file corresponding to obj on wcfs.
@func(WCFS)
def _read(wc, obj, at=None):
path = wc._path(obj, at=at)
with open(path, 'rb') as f:
return f.read()
# _stat stats file corresponding to obj on wcfs.
@func(WCFS)
def _stat(wc, obj, at=None):
path = wc._path(obj, at=at)
return os.stat(path)
# _open opens file corresponding to obj on wcfs.
@func(WCFS)
def _open(wc, obj, mode='rb', at=None):
path = wc._path(obj, at=at)
return open(path, mode, 0) # unbuffered
# ---- join/run wcfs ----
_wcmu = sync.Mutex()
_wcregistry = {} # mntpt -> WCFS
@func(WCFS)
def __init__(wc, mountpoint, fwcfs, wcsrv):
wc.mountpoint = mountpoint
wc._fwcfs = fwcfs
wc._njoin = 1
wc._wcsrv = wcsrv
# close must be called to release joined connection after it is no longer needed.
@func(WCFS)
def close(wc):
with _wcmu:
wc._njoin -= 1
if wc._njoin == 0:
del _wcregistry[wc.mountpoint]
# NOTE not unmounting wcfs - it either runs as separate service, or
# is spawned on demand with -autoexit.
# NOTE ._fwcfs.close can raise IOError (e.g. ENOTCONN after wcfs server crash)
wc._fwcfs.close()
# _default_autostart returns default autostart setting for join.
#
# Out-of-the-box we want wcfs to be automatically started, to ease developer
# experience when wendelin.core is standalone installed. However in environments
# like SlapOS, it is more preferable to start and monitor wcfs service explicitly.
# SlapOS & co. should thus set $WENDELIN_CORE_WCFS_AUTOSTART=no.
def _default_autostart():
autostart = os.environ.get("WENDELIN_CORE_WCFS_AUTOSTART", "yes")
autostart = autostart.lower()
return {"yes": True, "no": False}[autostart]
# join connects to wcfs server for ZODB @ zurl.
#
# If wcfs for that zurl is already running, join connects to it.
# Otherwise it starts wcfs for zurl if autostart is True.
#
# For the same zurl join returns the same WCFS object.
def join(zurl, autostart=_default_autostart()): # -> WCFS
mntpt = _mntpt_4zurl(zurl)
with _wcmu:
# check if we already have connection to wcfs server from this process
wc = _wcregistry.get(mntpt)
if wc is not None:
wc._njoin += 1
return wc
# no. try opening .wcfs - if we succeed - wcfs is already running.
fwcfs, trylockstartf = _try_attach_wcsrv(mntpt)
if fwcfs is not None:
# already have it
wc = WCFS(mntpt, fwcfs, None)
_wcregistry[mntpt] = wc
return wc
if not autostart:
raise RuntimeError("wcfs: join %s: server not running" % zurl)
# start wcfs with telling it to automatically exit when there is no client activity.
trylockstartf() # XXX retry access if another wcfs was started in the meantime
wcsrv, fwcfs = _start(zurl, "-autoexit")
wc = WCFS(mntpt, fwcfs, wcsrv)
assert mntpt not in _wcregistry
_wcregistry[mntpt] = wc
return wc
# _try_attach_wcsrv tries to attach to running wcfs server.
#
# if successful, it returns fwcfs - opened file handle for /.wcfs/zurl
# if unsuccessful, it returns fwcfs=None, and trylockstartf function that can
# be used to prepare to start new WCFS server.
def _try_attach_wcsrv(mntpt): # -> (fwcfs, trylockstartf)
# try opening .wcfs - if we succeed - wcfs is already running.
unclean = False
try:
fwcfs = open(mntpt + "/.wcfs/zurl")
except IOError as e:
if e.errno == ENOENT: # wcfs cleanly unmounted
pass
elif e.errno == ENOTCONN: # wcfs crashed/killed
unclean = True
else:
raise
else:
return (fwcfs, None)
# the server is not running.
# return func to prepare start of another wcfs server
def trylockstartf():
# XXX race window if external process starts after ^^^ check
# TODO -> fs-level locking
if unclean:
_fuse_unmount(mntpt)
return (None, trylockstartf)
# start starts wcfs server for ZODB @ zurl.
#
# optv can be optionally given to pass flags to wcfs.
def start(zurl, *optv): # -> Server
# verify that wcfs is not already running
mntpt = _mntpt_4zurl(zurl)
fwcfs, trylockstartf = _try_attach_wcsrv(mntpt)
if fwcfs is not None:
fwcfs.close()
raise RuntimeError("wcfs: start %s: already running" % zurl)
# seems to be ok to start
trylockstartf() # XXX -> "already running" if lock fails
wcsrv, fwcfs = _start(zurl, *optv)
fwcfs.close()
return wcsrv
# _optv_with_wcfs_defaults returns optv prepended with default WCFS options taken from environment.
def _optv_with_wcfs_defaults(optv): # -> optv
optv_defaults = os.environ.get("WENDELIN_CORE_WCFS_OPTIONS", "").split()
return tuple(optv_defaults) + tuple(optv)
# _start serves start and join.
@func
def _start(zurl, *optv): # -> Server, fwcfs
mntpt = _mntpt_4zurl(zurl)
optv = _optv_with_wcfs_defaults(optv)
log.info("starting for %s ...", zurl)
# XXX errctx "wcfs: start"
# spawn wcfs and wait till filesystem-level access to it is ready
wcsrv = Server(mntpt, None, None)
wg = sync.WorkGroup(context.background())
fsready = chan(dtype='C.structZ')
def _(ctx):
# XXX errctx "spawn"
argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt]
proc = subprocess.Popen(argv, close_fds=True)
while 1:
ret = proc.poll()
if ret is not None:
raise RuntimeError("exited with %s" % ret)
_, _rx = select(
ctx.done().recv, # 0
fsready.recv, # 1
default, # 2
)
if _ == 0:
proc.terminate()
raise ctx.err()
if _ == 1:
# startup was ok - don't monitor spawned wcfs any longer
wcsrv._proc = proc
return
time.sleep(0.1*time.second)
wg.go(_)
def _(ctx):
# XXX errctx "waitmount"
fwcfs = _waitmount(ctx, zurl, mntpt)
wcsrv._fwcfs = fwcfs
fsready.close()
wg.go(_)
wg.wait()
log.info("started pid%d @ %s", wcsrv._proc.pid, mntpt)
fwcfs = wcsrv._fwcfs
del wcsrv._fwcfs
# open fuse abort control file
# shutdown wcsrv if that open fails
try:
x = os.minor(os.stat(wcsrv.mountpoint).st_dev)
wcsrv._fuseabort = open("/sys/fs/fuse/connections/%d/abort" % x, "wb")
except:
defer(wcsrv.stop)
defer(fwcfs.close)
raise
return wcsrv, fwcfs
# _waitmount waits for wcfs filesystem for zurl @mntpt to become ready.
def _waitmount(ctx, zurl, mntpt): # -> fwcfs
while 1:
try:
f = open("%s/.wcfs/zurl" % mntpt)
except IOError as e:
# ENOTCONN (wcfs crashed/killed) is an error here
if e.errno != ENOENT:
raise
else:
dotwcfs = f.read()
if dotwcfs != zurl:
raise RuntimeError(".wcfs/zurl != zurl (%s != %s)" % (qq(dotwcfs), qq(zurl)))
return f
_, _rx = select(
ctx.done().recv, # 0
default, # 1
)
if _ == 0:
raise ctx.err()
time.sleep(0.1*time.second)
@func(Server)
def __init__(wcsrv, mountpoint, proc, ffuseabort):
wcsrv.mountpoint = mountpoint
wcsrv._proc = proc
wcsrv._fuseabort = ffuseabort
wcsrv._stopOnce = sync.Once()
# stop shutdowns the server.
@func(Server)
def stop(wcsrv, ctx=None):
if ctx is None:
ctx, cancel = context.with_timeout(context.background(), 20*time.second)
defer(cancel)
wcsrv._stop(ctx)
@func(Server)
def _stop(wcsrv, ctx, _onstuck=None):
def _():
wcsrv.__stop(ctx, _onstuck)
wcsrv._stopOnce.do(_)
@func(Server)
def __stop(wcsrv, ctx, _onstuck):
log.info("unmount/stop wcfs pid%d @ %s", wcsrv._proc.pid, wcsrv.mountpoint)
deadline = ctx.deadline()
if deadline is None:
deadline = float('inf')
timeoutTotal = (deadline - time.now())
if timeoutTotal < 0:
timeoutTotal = 0.
# timeoutFrac returns ctx with `timeout ~= fraction·totalTimeout`
# however if the context is already cancelled, returned timeout is 0.1s to
# give chance for an operation to complete.
def timeoutFrac(fraction):
if _ready(ctx.done()):
tctx, _ = context.with_timeout(context.background(), 0.1*time.second)
else:
tctx, _ = context.with_timeout(ctx, fraction*timeoutTotal)
return tctx
# unmount and wait for wcfs to exit
# kill wcfs and abort FUSE connection if clean unmount fails
def _():
if wcsrv._fuseabort is not None:
wcsrv._fuseabort.close()
defer(_)
@func
def _():
# kill wcfs.go in case it is deadlocked and does not exit by itself
if _procwait_(timeoutFrac(0.5), wcsrv._proc):
return
log.warn("wcfs.go does not exit (after SIGTERM)")
log.warn("-> kill -QUIT wcfs.go ...")
os.kill(wcsrv._proc.pid, SIGQUIT)
if _procwait_(timeoutFrac(0.25), wcsrv._proc):
return
log.warn("wcfs.go does not exit (after SIGQUIT)")
log.warn("-> kill -KILL wcfs.go ...")
os.kill(wcsrv._proc.pid, SIGKILL)
if _procwait_(timeoutFrac(0.25), wcsrv._proc):
return
log.warn("wcfs.go does not exit (after SIGKILL; probably it is stuck in kernel)")
log.warn("-> nothing we can do...") # XXX dump /proc/pid/task/*/stack instead (ignore EPERM)
if _onstuck is not None:
_onstuck()
else:
_procwait(context.background(), wcsrv._proc)
defer(_)
try:
if _is_mountpoint(wcsrv.mountpoint): # could be unmounted from outside
_fuse_unmount(wcsrv.mountpoint)
except:
# if clean unmount failed -> kill -TERM wcfs and force abort of fuse connection.
#
# aborting fuse connection is needed in case wcfs/kernel will be stuck
# in a deadlock even after being `kill -9`. See comments in tWCFS for details.
def _():
log.warn("-> kill -TERM wcfs.go ...")
os.kill(wcsrv._proc.pid, SIGTERM)
if wcsrv._fuseabort is not None:
log.warn("-> abort FUSE connection ...")
wcsrv._fuseabort.write(b"1\n")
wcsrv._fuseabort.flush()
defer(_)
raise
# ---- misc ----
# _wcfs_exe returns path to wcfs executable. # _wcfs_exe returns path to wcfs executable.
def _wcfs_exe(): def _wcfs_exe():
return '%s/wcfs' % dirname(__file__) return '%s/wcfs' % dirname(__file__)
# _mntpt_4zurl returns wcfs should-be mountpoint for ZODB @ zurl.
#
# it also makes sure the mountpoint exists.
def _mntpt_4zurl(zurl):
# remove credentials from zurl.
# The same database can be accessed from different clients with different
# credentials, but we want to map them all to the same single WCFS
# instance.
scheme, netloc, path, query, frag = urlsplit(zurl)
if '@' in netloc:
netloc = netloc[netloc.index('@')+1:]
zurl = urlunsplit((scheme, netloc, path, query, frag))
m = hashlib.sha1()
m.update(zurl)
# WCFS mounts are located under /dev/shm/wcfs. /dev/shm is already used by
# userspace part of wendelin.core memory manager for dirtied pages.
# In a sense WCFS mount provides shared read-only memory backed by ZODB.
# mkdir /dev/shm/wcfs with stiky bit. This way multiple users can create subdirectories inside.
wcfsroot = "/dev/shm/wcfs"
wcfsmode = 0o777 | stat.S_ISVTX
if _mkdir_p(wcfsroot):
os.chmod(wcfsroot, wcfsmode)
else:
# migration workaround for the situation when /dev/shm/wcfs was created by
# code that did not yet set sticky bit.
_ = os.stat(wcfsroot)
if _.st_uid == os.getuid():
if _.st_mode != wcfsmode:
os.chmod(wcfsroot, wcfsmode)
mntpt = "%s/%s" % (wcfsroot, m.hexdigest())
_mkdir_p(mntpt)
return mntpt
# mkdir -p.
def _mkdir_p(path, mode=0o777): # -> created(bool)
try:
os.makedirs(path, mode)
except OSError as e:
if e.errno != EEXIST:
raise
return False
return True
# _fuse_unmount calls `fusermount -u` + logs details if unmount failed.
@func
def _fuse_unmount(mntpt):
ret, out = _sysproccallout(["fusermount", "-u", mntpt])
if ret != 0:
# unmount failed, usually due to "device is busy".
# Log which files are still opened and reraise
def _():
log.warn("# lsof %s" % mntpt)
# -w to avoid lots of
# lsof: WARNING: can't stat() fuse.wcfs file system /dev/shm/wcfs/X
# Output information may be incomplete.
# if there are other uncleaned wcfs mountpoints.
# (lsof stats all filesystems on startup)
# NOTE lsof +D misbehaves - don't use it
ret, out = _sysproccallout(["lsof", "-w", mntpt])
log.warn(out)
if ret:
log.warn("(lsof failed)")
defer(_)
out = out.rstrip() # kill trailing \n\n
emsg = "fuse_unmount %s: failed: %s" % (mntpt, out)
log.warn(emsg)
raise RuntimeError("%s\n(more details logged)" % emsg)
# _is_mountpoint returns whether path is a mountpoint
def _is_mountpoint(path): # -> bool
# NOTE we don't call mountpoint directly on path, because if FUSE
# fileserver failed, the mountpoint will also fail and print ENOTCONN
try:
_ = os.lstat(path)
except OSError as e:
if e.errno == ENOENT:
return False
# "Transport endpoint is not connected" -> it is a failed FUSE server
# (XXX we can also grep /proc/mounts)
if e.errno == ENOTCONN:
return True
raise
if not S_ISDIR(_.st_mode):
return False
mounted = (0 == _sysproccall(["mountpoint", "-q", path]))
return mounted
# _sysproc creates subprocess.Popen for "system" command.
#
# System commands are those that reside either in /bin or /usr/bin and which
# should be found even if $PATH does no contain those directories. For example
# runUnitTest in ERP5 sets $PATH without /bin, and this way executing
# fusermount via subprocess.Popen instead of _sysproc would fail.
def _sysproc(argv, **kw): # -> subprocess.Popen
env = kw.get('env', None)
if env is None:
env = os.environ
env = env.copy()
path = env.get('PATH', '')
if path:
path += ':'
path += '/bin:/usr/bin'
env['PATH'] = path
return subprocess.Popen(argv, env=env, close_fds=True, **kw)
# _sysproccall calls _sysproc and waits for spawned program to complete.
def _sysproccall(argv, **kw): # -> retcode
return _sysproc(argv, **kw).wait()
# _sysproccallout calls _sysproc, waits for spawned program to complete and returns combined out/err.
def _sysproccallout(argv, **kw): # -> retcode, output
proc = _sysproc(argv, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw)
out, _ = proc.communicate()
return proc.returncode, out
# _procwait waits for a process (subprocess.Popen) to terminate.
def _procwait(ctx, proc):
_waitfor(ctx, lambda: proc.poll() is not None)
# _procwait_, similarly to _procwait, waits for a process (subprocess.Popen) to terminate.
#
# it returns bool whether process terminated or not - e.g. due to context being canceled.
def _procwait_(ctx, proc): # -> ok
return _waitfor_(ctx, lambda: proc.poll() is not None)
# _waitfor waits for condf() to become true.
def _waitfor(ctx, condf):
wg = sync.WorkGroup(ctx)
def _(ctx):
while 1:
if _ready(ctx.done()):
raise ctx.err()
if condf():
return
time.sleep(10*time.millisecond)
wg.go(_)
wg.wait()
# _waitfor_, similarly to _waitfor, waits for condf() to become true.
#
# it returns bool whether target condition was reached or not - e.g. due to
# context being canceled.
def _waitfor_(ctx, condf): # -> ok
try:
_waitfor(ctx, condf)
except Exception as e:
if errors.Is(e, context.canceled) or errors.Is(e, context.deadlineExceeded):
return False
raise
return True
# _ready reports whether chan ch is ready.
def _ready(ch):
_, _rx = select(
default, # 0
ch.recv, # 1
)
return bool(_)
# serve starts and runs wcfs server for ZODB @ zurl.
#
# it mounts wcfs at a location that is with 1-1 correspondence with zurl.
# it then waits for wcfs to exit (either due to unmount or an error).
#
# it is an error if wcfs is already running.
#
# optv is list of options to pass to wcfs server.
# if exec_ is True, wcfs is not spawned, but executed into.
#
# serve(zurl, exec_=False).
def serve(zurl, optv, exec_=False, _tstartingq=None):
mntpt = _mntpt_4zurl(zurl)
optv = _optv_with_wcfs_defaults(optv)
log.info("serving %s ...", zurl)
# try opening .wcfs - it is an error if we can do it.
fwcfs, trylockstartf = _try_attach_wcsrv(mntpt)
if fwcfs is not None:
fwcfs.close()
raise RuntimeError("wcfs: serve %s: already running" % zurl)
# seems to be ok to start
trylockstartf() # XXX -> "already running" if lock fails
if _tstartingq is not None:
_tstartingq.close()
argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt]
if not exec_:
subprocess.check_call(argv, close_fds=True)
else:
os.execv(argv[0], argv)
# if called as main -> serve as frontend to wcfs service:
#
# wcfs serve <zurl>
# wcfs status <zurl>
# wcfs stop <zurl>
def _usage(w):
progname = os.path.basename(sys.argv[0])
print("Wcfs serves WCFS filesystem for ZODB at zurl for wendelin.core .\n", file=w)
print("Usage: %s (serve|stop|status) [-h | wcfs.go options] zurl" % progname, file=w)
sys.exit(2)
@func
def main(): def main():
argv = sys.argv[1:] argv = sys.argv[1:]
os.execv(_wcfs_exe(), [_wcfs_exe()] + argv) if len(argv) < 2 or argv[0] == '-h':
_usage(sys.stderr)
cmd = argv[0]
argv = argv[1:]
zurl = argv[-1] # -a -b zurl -> zurl
optv = argv[:-1] # -a -b zurl -> -a -b
if cmd == "serve":
if argv[0] == '-h':
os.execv(_wcfs_exe(), [_wcfs_exe(), '-h'])
serve(zurl, optv, exec_=True)
elif cmd == "status":
wc = join(zurl, autostart=False) # raises if wcfs is not started
defer(wc.close)
print("wcfs<%s>: serving ok" % zurl)
elif cmd == "stop":
mntpt = _mntpt_4zurl(zurl)
_fuse_unmount(mntpt)
else:
print("wcfs: unknown command %s" % qq(cmd), file=sys.stderr)
sys.exit(2)
...@@ -3,9 +3,16 @@ module lab.nexedi.com/nexedi/wendelin.core/wcfs ...@@ -3,9 +3,16 @@ module lab.nexedi.com/nexedi/wendelin.core/wcfs
go 1.14 go 1.14
require ( require (
github.com/golang/glog v1.0.0
github.com/hanwen/go-fuse/v2 v2.1.0 // replaced to -> kirr/go-fuse@y/nodefs-cancel
github.com/johncgriffin/overflow v0.0.0-20211019200055-46fa312c352c github.com/johncgriffin/overflow v0.0.0-20211019200055-46fa312c352c
github.com/kisielk/og-rek v1.1.1-0.20210310094122-8def3d024dac github.com/kisielk/og-rek v1.1.1-0.20210310094122-8def3d024dac
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
lab.nexedi.com/kirr/go123 v0.0.0-20210906140734-c9eb28d9e408 lab.nexedi.com/kirr/go123 v0.0.0-20210906140734-c9eb28d9e408
lab.nexedi.com/kirr/neo/go v0.0.0-20211004111643-c74a5a3cd0d0 lab.nexedi.com/kirr/neo/go v0.0.0-20211004111643-c74a5a3cd0d0
) )
// we use kirr/go-fuse@y/nodefs-cancel
// see https://github.com/hanwen/go-fuse/pull/343 for details
replace github.com/hanwen/go-fuse/v2 v2.1.0 => lab.nexedi.com/kirr/go-fuse/v2 v2.0.0-20210910085851-e6ee85fd0a1e
...@@ -27,6 +27,8 @@ github.com/fsnotify/fsnotify v1.4.10-0.20200417215612-7f4cf4dd2b52 h1:0NmERxogGT ...@@ -27,6 +27,8 @@ github.com/fsnotify/fsnotify v1.4.10-0.20200417215612-7f4cf4dd2b52 h1:0NmERxogGT
github.com/fsnotify/fsnotify v1.4.10-0.20200417215612-7f4cf4dd2b52/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.4.10-0.20200417215612-7f4cf4dd2b52/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
...@@ -60,6 +62,7 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn ...@@ -60,6 +62,7 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ= github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
...@@ -170,6 +173,12 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v ...@@ -170,6 +173,12 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
lab.nexedi.com/kirr/go-fuse/v2 v2.0.0-20210215102255-f0cbba3ef97e h1:6eJe/VaiivudiUEc2R324WN5djDycNvM1IkSQrC9idk=
lab.nexedi.com/kirr/go-fuse/v2 v2.0.0-20210215102255-f0cbba3ef97e/go.mod h1:oRyA5eK+pvJyv5otpO/DgccS8y/RvYMaO00GgRLGryc=
lab.nexedi.com/kirr/go-fuse/v2 v2.0.0-20210610115330-7e0334c3e76a h1:1Kagc2s/E5p2iqLLyGBtJRDssIa0uPbUKEEjKUZKSM4=
lab.nexedi.com/kirr/go-fuse/v2 v2.0.0-20210610115330-7e0334c3e76a/go.mod h1:oRyA5eK+pvJyv5otpO/DgccS8y/RvYMaO00GgRLGryc=
lab.nexedi.com/kirr/go-fuse/v2 v2.0.0-20210910085851-e6ee85fd0a1e h1:QP8PhLssUs3SEoM+UfQLxfDke7uQtyte4FNu6cw00L4=
lab.nexedi.com/kirr/go-fuse/v2 v2.0.0-20210910085851-e6ee85fd0a1e/go.mod h1:B1nGE/6RBFyBRC1RRnf23UpwCdyJ31eukw34oAKukAc=
lab.nexedi.com/kirr/go123 v0.0.0-20210128150852-c20e95f0f789/go.mod h1:1wkWl3WhmutZiho+wsE7ymOKvRkN7hV3YZtL0f0gXTo= lab.nexedi.com/kirr/go123 v0.0.0-20210128150852-c20e95f0f789/go.mod h1:1wkWl3WhmutZiho+wsE7ymOKvRkN7hV3YZtL0f0gXTo=
lab.nexedi.com/kirr/go123 v0.0.0-20210906140734-c9eb28d9e408 h1:H7YpNUDfTSvvRpKivUMrL9C09tQssQ6brEoX6K/OxOw= lab.nexedi.com/kirr/go123 v0.0.0-20210906140734-c9eb28d9e408 h1:H7YpNUDfTSvvRpKivUMrL9C09tQssQ6brEoX6K/OxOw=
lab.nexedi.com/kirr/go123 v0.0.0-20210906140734-c9eb28d9e408/go.mod h1:pwDpdCuvtz0QxisDzV/z9eUb9zc/rMQec520h4i8VWQ= lab.nexedi.com/kirr/go123 v0.0.0-20210906140734-c9eb28d9e408/go.mod h1:pwDpdCuvtz0QxisDzV/z9eUb9zc/rMQec520h4i8VWQ=
......
/io.c
/mm.c
/wcfs_test.cpp
# Copyright (C) 2019-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# cython: language_level=2
"""Package io complements IO facility provided by Python."""
from posix.unistd cimport pread
from cpython.exc cimport PyErr_SetFromErrno
# readat calls pread to read from fd@off into buf.
def readat(int fd, size_t off, unsigned char[::1] buf not None): # -> n
cdef void *dest = &buf[0]
cdef size_t size = buf.shape[0]
cdef ssize_t n
with nogil:
n = pread(fd, dest, size, off)
if n < 0:
PyErr_SetFromErrno(OSError)
return n
# Copyright (C) 2019-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# cython: language_level=2
"""Package mm provides access to OS memory management interfaces."""
from posix cimport mman
from cpython.exc cimport PyErr_SetFromErrno
cdef extern from "<sys/user.h>":
cpdef enum:
PAGE_SIZE
from posix.types cimport off_t
# map_ro memory-maps fd[offset +size) as read-only.
# The mapping is created with MAP_SHARED.
def map_ro(int fd, off_t offset, size_t size):
cdef void *addr
addr = mman.mmap(NULL, size, mman.PROT_READ, mman.MAP_SHARED, fd, offset)
if addr == mman.MAP_FAILED:
PyErr_SetFromErrno(OSError)
return <unsigned char[:size:1]>addr
# unmap unmaps memory covered by mem.
def unmap(const unsigned char[::1] mem not None):
cdef const void *addr = &mem[0]
cdef size_t size = mem.shape[0]
cdef err = mman.munmap(<void *>addr, size)
if err:
PyErr_SetFromErrno(OSError)
return
# Copyright (C) 2019-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
# cython: language_level=2
# distutils: language=c++
"""Module wcfs_test.pyx complements wcfs_test.py with things that cannot be
implemented in Python."""
from posix.signal cimport sigaction, sigaction_t, siginfo_t, SA_SIGINFO
from libc.signal cimport SIGBUS
from libc.stdlib cimport abort
from libc.string cimport strlen
from posix.unistd cimport write, sleep
from cpython.exc cimport PyErr_SetFromErrno
from golang cimport panic
# ---- signal handling ----
# TODO -> golang.signal ?
# install_sigbus_trap installs SIGBUS handler that prints python-level
# traceback before aborting.
#
# Such handler is useful, because when wcfs.go bugs/panics while handling file
# access from wcfs.py, wcfs.py receives SIGBUS signal and by default aborts.
def install_sigbus_trap():
cdef sigaction_t act
act.sa_sigaction = on_sigbus
act.sa_flags = SA_SIGINFO
cdef int err = sigaction(SIGBUS, &act, NULL)
if err:
PyErr_SetFromErrno(OSError)
cdef void on_sigbus(int sig, siginfo_t *si, void *_uc) nogil:
# - wait a bit to give time for other threads to complete their exception dumps
# (e.g. getting "Transport endpoint is not connected" after wcfs.go dying)
# - dump py-level traceback and abort.
# TODO turn SIGBUS into python-level exception? (see sigpanic in Go how to do).
writeerr("\nC: SIGBUS received; giving time to other threads "
"to dump their exceptions (if any) ...\n")
with gil:
pass
sleep(1)
writeerr("\nC: SIGBUS'ed thread traceback:\n")
with gil:
import traceback
traceback.print_stack()
writeerr("-> SIGBUS\n");
# FIXME nothing is printed if pytest stdout/stderr capture is on (no -s given)
abort()
# writeerr writes msg to stderr without depending on stdio buffering and locking.
cdef void writeerr(const char *msg) nogil:
xwrite(2, msg)
# xwrite writes msg to fd without depending on stdio buffering and locking.
cdef void xwrite(int fd, const char *msg) nogil:
cdef ssize_t n, left = strlen(msg)
while left > 0:
n = write(fd, msg, left)
if n == -1:
panic("write: failed")
left -= n
msg += n
...@@ -21,8 +21,12 @@ ...@@ -21,8 +21,12 @@
package xzodb package xzodb
import ( import (
"context"
"fmt" "fmt"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
) )
...@@ -42,3 +46,37 @@ func TypeOf(obj interface{}) string { ...@@ -42,3 +46,37 @@ func TypeOf(obj interface{}) string {
return fmt.Sprintf("%T", obj) return fmt.Sprintf("%T", obj)
} }
} }
// ZConn is zodb.Connection + associated read-only transaction under which
// objects of the connection are accessed.
type ZConn struct {
*zodb.Connection
// read-only transaction under which we access zodb.Connection data.
TxnCtx context.Context // XXX -> better directly store txn
}
// ZOpen opens new connection to ZODB database + associated read-only transaction.
func ZOpen(ctx context.Context, zdb *zodb.DB, zopt *zodb.ConnOptions) (_ *ZConn, err error) {
// create new read-only transaction
txn, txnCtx := transaction.New(context.Background())
defer func() {
if err != nil {
txn.Abort()
}
}()
// XXX better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(ctx, txnCtx)
defer cancel()
zconn, err := zdb.Open(ctx, zopt)
if err != nil {
return nil, err
}
return &ZConn{
Connection: zconn,
TxnCtx: txnCtx,
}, nil
}
// Copyright (C) 2018-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package main
// misc utilities
import (
"context"
"fmt"
"io"
"sync/atomic"
"syscall"
log "github.com/golang/glog"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/hanwen/go-fuse/v2/fuse/nodefs"
"github.com/pkg/errors"
)
// ---- FUSE ----
// eInvalError is the error wrapper signifying that underlying error is about "invalid argument".
// err2LogStatus converts such errors into EINVAL return code + logs as warning.
type eInvalError struct {
err error
}
func (e *eInvalError) Error() string {
return "invalid argument: " + e.err.Error()
}
// don't propagate eInvalError.Cause -> e.err
func eINVAL(err error) *eInvalError {
return &eInvalError{err}
}
func eINVALf(format string, argv ...interface{}) *eInvalError {
return eINVAL(fmt.Errorf(format, argv...))
}
// err2LogStatus converts an error into FUSE status code and logs it appropriately.
//
// the error is logged because otherwise, if e.g. returning EINVAL or EIO
// codes, there is no more detail except the error code itself.
func err2LogStatus(err error) fuse.Status {
// no error
if err == nil {
return fuse.OK
}
// direct usage of error code - don't log
ecode, iscode := err.(syscall.Errno)
if iscode {
return fuse.Status(ecode)
}
// handling canceled -> EINTR, don't log
e := errors.Cause(err)
switch e {
case context.Canceled:
return fuse.EINTR
case io.ErrClosedPipe:
return fuse.Status(syscall.ECONNRESET)
}
// otherwise log as warnings EINVAL and as errors everything else
switch e.(type) {
case *eInvalError:
log.WarningDepth(1, err)
return fuse.EINVAL
default:
log.ErrorDepth(1, err)
return fuse.EIO
}
}
// fsNode should be used instead of nodefs.DefaultNode in wcfs.
//
// nodefs.DefaultNode.Open returns ENOSYS. This is convenient for filesystems
// that have no dynamic files at all. But for filesystems, where there are some
// dynamic files - i.e. nodes which do need to support Open, returning ENOSYS
// from any single node will make the kernel think that the filesystem does not
// support Open at all.
//
// In wcfs we have dynamic files (e.g. upcoming /head/watch) and this way we have to
// avoid returning ENOSYS on nodes, that do not need file handles.
//
// fsNode is like nodefs.defaultNode, but by default Open returns to kernel
// fh=0 and FOPEN_KEEP_CACHE - similarly how openless case is handled there.
//
// fsNode behaviour can be additionally controlled via fsOptions.
//
// fsNode should be created via newFSNode.
type fsNode struct {
nodefs.Node
opt *fsOptions
// cache for path
// we don't use hardlinks / don't want to pay locks + traversal price every time.
xpath atomic.Value
}
func (n *fsNode) Open(flags uint32, _ *fuse.Context) (nodefs.File, fuse.Status) {
return &nodefs.WithFlags{
File: nil,
FuseFlags: fuse.FOPEN_KEEP_CACHE,
}, fuse.OK
}
// fsOptions allows to tune fsNode behaviour.
type fsOptions struct {
// Sticky nodes are not removed from inode tree on FORGET.
// Correspondingly OnForget is never called on a sticky node.
Sticky bool
}
var fSticky = &fsOptions{Sticky: true} // frequently used shortcut
func (n *fsNode) Deletable() bool {
return !n.opt.Sticky
}
func newFSNode(opt *fsOptions) fsNode { // NOTE not pointer
return fsNode{
Node: nodefs.NewDefaultNode(),
opt: opt,
}
}
// path returns node path in its filesystem.
func (n *fsNode) path() string {
xpath := n.xpath.Load()
if xpath != nil {
return xpath.(string)
}
// slow part - let's construct the path and remember it
path := ""
inode := n.Inode()
for {
var name string
inode, name = inode.Parent()
if inode == nil {
break
}
path = "/" + name + path
}
n.xpath.Store(path)
return path
}
// NewStaticFile creates nodefs.Node for file with static data.
//
// Created file is sticky.
func NewStaticFile(data []byte) *SmallFile {
return newSmallFile(func(_ *fuse.Context) ([]byte, error) {
return data, nil
}, fuse.FOPEN_KEEP_CACHE /*see ^^^*/)
}
// SmallFile is a nodefs.Node for file with potentially dynamic, but always small, data.
type SmallFile struct {
fsNode
fuseFlags uint32 // fuse.FOPEN_*
// readData gives whole file data
readData func(fctx *fuse.Context) ([]byte, error)
}
func newSmallFile(readData func(*fuse.Context) ([]byte, error), fuseFlags uint32) *SmallFile {
return &SmallFile{
fsNode: newFSNode(&fsOptions{Sticky: true}),
fuseFlags: fuseFlags,
readData: readData,
}
}
// NewSmallFile creates nodefs.Node for file with dynamic, but always small, data.
//
// Created file is sticky.
func NewSmallFile(readData func(*fuse.Context) ([]byte, error)) *SmallFile {
return newSmallFile(readData, fuse.FOPEN_DIRECT_IO)
}
func (f *SmallFile) Open(flags uint32, _ *fuse.Context) (nodefs.File, fuse.Status) {
return &nodefs.WithFlags{
File: nil,
FuseFlags: f.fuseFlags,
}, fuse.OK
}
func (f *SmallFile) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status {
data, err := f.readData(fctx)
if err != nil {
return err2LogStatus(err)
}
out.Size = uint64(len(data))
out.Mode = fuse.S_IFREG | 0644
return fuse.OK
}
func (f *SmallFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
data, err := f.readData(fctx)
if err != nil {
return nil, err2LogStatus(err)
}
l := int64(len(data))
end := off + l
if end > l {
end = l
}
return fuse.ReadResultData(data[off:end]), fuse.OK
}
// mkdir adds child to parent as directory.
//
// Note: parent must be already in the filesystem tree - i.e. associated
// with Inode. if not - nodefs will panic in Inode.NewChild on nil dereference.
func mkdir(parent nodefs.Node, name string, child nodefs.Node) {
parent.Inode().NewChild(name, true, child)
}
// mkfile adds child to parent as file.
//
// Note: parent must be already in the filesystem tree (see mkdir for details).
func mkfile(parent nodefs.Node, name string, child nodefs.Node) {
parent.Inode().NewChild(name, false, child)
}
// mount is like nodefs.MountRoot but allows to pass in full fuse.MountOptions.
func mount(mntpt string, root nodefs.Node, opts *fuse.MountOptions) (*fuse.Server, *nodefs.FileSystemConnector, error) {
nodefsOpts := nodefs.NewOptions()
nodefsOpts.Debug = opts.Debug
return nodefs.Mount(mntpt, root, opts, nodefsOpts)
}
// ---- make df happy (else it complains "function not supported") ----
func (root *Root) StatFs() *fuse.StatfsOut {
return &fuse.StatfsOut{
// filesystem sizes (don't try to estimate)
Blocks: 0,
Bfree: 0,
Bavail: 0,
// do we need to count files?
Files: 0,
Ffree: 0,
// block size
Bsize: 2*1024*1024, // "optimal transfer block size" XXX better get from root?
Frsize: 2*1024*1024, // "fragment size"
NameLen: 255, // XXX ok? /proc uses the same
}
}
// ---- misc ----
func panicf(format string, argv ...interface{}) {
panic(fmt.Sprintf(format, argv...))
}
...@@ -17,8 +17,846 @@ ...@@ -17,8 +17,846 @@
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options. // See https://www.nexedi.com/licensing for rationale and options.
// Program wcfs provides filesystem server with file data backed by wendelin.core arrays.
//
// Intro
//
// Each wendelin.core array (ZBigArray) is actually a linear file (ZBigFile)
// and array metadata like dtype, shape and strides associated with it. This
// program exposes as files only ZBigFile data and leaves rest of
// array-specific handling to clients. Every ZBigFile is exposed as one separate
// file that represents whole ZBigFile's data.
//
// For a client, the primary way to access a bigfile should be to mmap
// head/bigfile/<bigfileX> which represents always latest bigfile data.
//
// In the usual situation when bigfiles are big
// there should be no need for any cache besides shared kernel cache of latest
// bigfile data.
//
//
// Filesystem organization
//
// Top-level structure of provided filesystem is as follows:
//
// head/ ; latest database view
// ...
// @<rev1>/ ; database view as of revision <revX>
// ...
// @<rev2>/
// ...
// ...
//
// where head/ represents latest data as stored in upstream ZODB, and
// @<revX>/ represents data as of database revision <revX>.
//
// head/ has the following structure:
//
// head/
// at ; data inside head/ is as of this ZODB transaction
// bigfile/ ; bigfiles' data
// <oid(ZBigFile1)>
// <oid(ZBigFile2)>
// ...
//
// where /bigfile/<bigfileX> represents latest bigfile data as stored in
// upstream ZODB.
// /at describes precisely ZODB state for which bigfile data is currently
// exposed.
//
// @<revX>/ has the following structure:
//
// @<revX>/
// bigfile/ ; bigfiles' data as of revision <revX>
// <oid(ZBigFile1)>
// <oid(ZBigFile2)>
// ...
//
// where /bigfile/<bigfileX> represent bigfile data as of revision <revX>.
//
// Unless accessed {head,@<revX>}/bigfile/<bigfileX> are not automatically visible in
// wcfs filesystem. Similarly @<revX>/ become visible only after access.
//
//
// Writes
//
// As each bigfile is represented by 1 synthetic file, there can be several
// write schemes:
//
// 1. mmap(MAP_PRIVATE) + writeout by client
//
// In this scheme bigfile data is mmapped in MAP_PRIVATE mode, so that local
// user changes are not automatically propagated back to the file. When there
// is a need to commit, client investigates via some OS mechanism, e.g.
// /proc/self/pagemap or something similar, which pages of this mapping it
// modified. Knowing this it knows which data it dirtied and so can write this
// data back to ZODB itself, without filesystem server providing write support.
//
// 2. mmap(MAP_SHARED, PROT_READ) + write-tracking & writeout by client
//
// In this scheme bigfile data is mmaped in MAP_SHARED mode with read-only pages
// protection. Then whenever write fault occurs, client allocates RAM from
// shmfs, copies faulted page to it, and then mmaps RAM page with RW protection
// in place of original bigfile page. Writeout implementation should be similar
// to "1", only here client already knows the pages it dirtied, and this way
// there is no need to consult /proc/self/pagemap.
//
// The advantage of this scheme over mmap(MAP_PRIVATE) is that in case
// there are several in-process mappings of the same bigfile with overlapping
// in-file ranges, changes in one mapping will be visible in another mapping.
// Contrary: whenever a MAP_PRIVATE mapping is modified, the kernel COWs
// faulted page into a page completely private to this mapping, so that other
// MAP_PRIVATE mappings of this file, including ones created from the same
// process, do not see changes made to the first mapping.
//
// Since wendelin.core needs to provide coherency in between different slices
// of the same array, this is the mode wendelin.core will actually use.
//
// 3. write to wcfs
//
// TODO we later could implement "write-directly" mode where clients would write
// data directly into the file.
package main package main
// Wcfs organization
//
// Wcfs is a ZODB client that translates ZODB objects into OS files as would
// non-wcfs wendelin.core do for a ZBigFile.
// It is organized as follows:
//
// 1) 1 ZODB connection for "latest data" for whole filesystem (zhead).
// 2) head/bigfile/* of all bigfiles represent state as of zhead.At .
// 7) when we receive a FUSE read(#blk) request to a head/bigfile/file, we process it as follows:
//
// 7.1) load blkdata for head/bigfile/file[blk] @zhead.at .
// 7.3) blkdata is returned to kernel.
//
// 8) serving FUSE reads from @<rev>/bigfile/file is organized similarly to
// serving reads from head/bigfile/file, but with using dedicated per-<rev>
// ZODB connection and without notifying any watches.
//
// 9) for every ZODB connection (zhead + one per @<rev>) a dedicated read-only
// transaction is maintained.
//
// TODO 10) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout
// Notation used
//
// f - BigFile
// bfdir - BigFileDir
import (
"context"
"flag"
"fmt"
"io"
stdlog "log"
"math"
"os"
"runtime"
"strings"
"sync"
log "github.com/golang/glog"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xruntime/race"
"lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/btree"
_ "lab.nexedi.com/kirr/neo/go/zodb/wks"
"github.com/johncgriffin/overflow"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/hanwen/go-fuse/v2/fuse/nodefs"
"github.com/pkg/errors"
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/xzodb"
"lab.nexedi.com/nexedi/wendelin.core/wcfs/internal/zdata"
)
// shorthands for ZBigFile and ZBlk*
type (
ZBlk = zdata.ZBlk
ZBlk0 = zdata.ZBlk0
ZBlk1 = zdata.ZBlk1
ZData = zdata.ZData
ZBigFile = zdata.ZBigFile
)
// Root represents root of wcfs filesystem.
type Root struct {
fsNode
// ZODB storage we work with
zstor zodb.IStorage
// ZODB DB handle for zstor.
// keeps cache of connections for @<rev>/ accesses.
// only one connection is used for each @<rev>.
zdb *zodb.DB
// directory + ZODB connection for head/
head *Head
// directories + ZODB connections for @<rev>/
revMu sync.Mutex
revTab map[zodb.Tid]*Head
}
// /(head|<rev>)/ - served by Head.
type Head struct {
fsNode
rev zodb.Tid // 0 for head/, !0 for @<rev>/
bfdir *BigFileDir // bigfile/
// at - served by .readAt
// ZODB connection for everything under this head
zconn *xzodb.ZConn
}
// /(head|<rev>)/bigfile/ - served by BigFileDir.
type BigFileDir struct {
fsNode
head *Head // parent head/ or @<rev>/
// {} oid -> <bigfileX>
fileMu sync.Mutex
fileTab map[zodb.Oid]*BigFile
}
// /(head|<rev>)/bigfile/<bigfileX> - served by BigFile.
type BigFile struct {
fsNode
// this BigFile is under .head/bigfile/; it views ZODB via .head.zconn
// parent's BigFileDir.head is the same.
head *Head
// ZBigFile top-level object
zfile *ZBigFile
// things read/computed from .zfile; constant during lifetime of current transaction.
// i.e. changed under zhead.W
blksize int64 // zfile.blksize
size int64 // zfile.Size()
revApprox zodb.Tid // approx last revision that modified zfile data
// inflight loadings of ZBigFile from ZODB.
// successful load results are kept here until blkdata is put into OS pagecache.
//
// Being a staging area for data to enter OS cache, loading has to be
// consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache.
loadMu sync.Mutex
loading map[int64]*blkLoadState // #blk -> {... blkdata}
}
// blkLoadState represents a ZBlk load state/result.
//
// when !ready the loading is in progress.
// when ready the loading has been completed.
type blkLoadState struct {
ready chan struct{}
blkdata []byte
err error
}
// -------- ZODB cache control --------
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
// because it is essentially the index where to find ZBigFile data.
//
// For the data itself - we put it to kernel pagecache and always deactivate
// from ZODB right after that.
type zodbCacheControl struct {}
func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolicy {
switch obj.(type) {
// don't let ZBlk*/ZData to pollute the cache
case *ZBlk0:
return zodb.PCacheDropObject | zodb.PCacheDropState
case *ZBlk1:
return zodb.PCacheDropObject | zodb.PCacheDropState
case *ZData:
return zodb.PCacheDropObject | zodb.PCacheDropState
// keep ZBigFile and its btree index in cache to speedup file data access.
//
// ZBigFile is top-level object that is used on every block load, and
// it would be a waste to evict ZBigFile from cache.
case *ZBigFile:
return zodb.PCachePinObject | zodb.PCacheKeepState
case *btree.LOBTree:
return zodb.PCachePinObject | zodb.PCacheKeepState
case *btree.LOBucket:
return zodb.PCachePinObject | zodb.PCacheKeepState
}
return 0
}
// -------- 7) FUSE read(#blk) --------
// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
// cap read request to file size
end, ok := overflow.Add64(off, int64(len(dest)))
if !ok {
end = math.MaxInt64 // cap read request till max possible file size
}
if end > f.size {
end = f.size
}
if end <= off {
// the kernel issues e.g. [0 +4K) read for f.size=0 and expects to get (0, ok)
// POSIX also says to return 0 if off >= f.size
return fuse.ReadResultData(nil), fuse.OK
}
// widen read request to be aligned with blksize granularity
// (we can load only whole ZBlk* blocks)
aoff := off - (off % f.blksize)
aend := end
if re := end % f.blksize; re != 0 {
aend += f.blksize - re
}
// TODO use original dest if it can fit the data
dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file
// TODO better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(fctx, f.head.zconn.TxnCtx)
defer cancel()
// read/load all block(s) in parallel
wg := xsync.NewWorkGroup(ctx)
for blkoff := aoff; blkoff < aend; blkoff += f.blksize {
blkoff := blkoff
blk := blkoff / f.blksize
wg.Go(func(ctx context.Context) error {
δ := blkoff-aoff // blk position in dest
//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize)
return f.readBlk(ctx, blk, dest[δ:δ+f.blksize])
})
}
err := wg.Wait()
if err != nil {
return nil, err2LogStatus(err)
}
return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK
}
// readBlk serves Read to read 1 ZBlk #blk into destination buffer.
//
// see "7) when we receive a FUSE read(#blk) request ..." in overview.
//
// len(dest) == blksize.
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) {
defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk)
// check if someone else is already loading this block
f.loadMu.Lock()
loading, already := f.loading[blk]
if !already {
loading = &blkLoadState{
ready: make(chan struct{}),
}
f.loading[blk] = loading
}
f.loadMu.Unlock()
// if it is already loading - just wait for it
if already {
select {
case <-ctx.Done():
return ctx.Err()
case <-loading.ready:
if loading.err == nil {
copy(dest, loading.blkdata) // TODO avoid copy
}
return loading.err
}
}
// noone was loading - we became responsible to load this block
blkdata, _, _, _, _, err := f.zfile.LoadBlk(ctx, blk)
loading.blkdata = blkdata
loading.err = err
// data loaded with error - cleanup .loading
if loading.err != nil {
close(loading.ready)
f.loadMu.Lock()
delete(f.loading, blk)
f.loadMu.Unlock()
return err
}
// data can be used now
close(loading.ready)
copy(dest, blkdata) // TODO avoid copy
// store to kernel pagecache whole block that we've just loaded from database.
// This way, even if the user currently requested to read only small portion from it,
// it will prevent next e.g. consecutive user read request to again hit
// the DB, and instead will be served by kernel from its pagecache.
//
// We cannot do this directly from reading goroutine - while reading
// kernel FUSE is holding corresponding page in pagecache locked, and if
// we would try to update that same page in pagecache it would result
// in deadlock inside kernel.
//
// .loading cleanup is done once we are finished with putting the data into OS pagecache.
// If we do it earlier - a simultaneous read covered by the same block could result
// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
// and thus would trigger DB access again.
//
// TODO if direct-io: don't touch pagecache
// TODO upload parts only not covered by current read (not to e.g. wait for page lock)
// TODO skip upload completely if read is wide to cover whole blksize
go f.uploadBlk(blk, loading)
return nil
}
// uploadBlk complements readBlk: it uploads loaded blkdata into OS cache.
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
oid := f.zfile.POid()
st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)
f.loadMu.Lock()
bug := (loading != f.loading[blk])
if !bug {
delete(f.loading, blk)
}
f.loadMu.Unlock()
if bug {
panicf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk)
}
if st == fuse.OK {
return
}
// pagecache update failed, but it must not (we verified on startup that
// pagecache control is supported by kernel). We can correctly live on
// with the error, but data access will be likely very slow. Tell user
// about the problem.
log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s (ignoring, but reading from bigfile will be very slow)", oid, blk, st)
}
// ---- Lookup ----
// /(head|<rev>)/bigfile/ -> Lookup receives client request to create /(head|<rev>)/bigfile/<bigfileX>.
func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
f, err := bfdir.lookup(out, name, fctx)
var inode *nodefs.Inode
if f != nil {
inode = f.Inode()
}
return inode, err2LogStatus(err)
}
func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) (f *BigFile, err error) {
defer xerr.Contextf(&err, "%s: lookup %q", bfdir.path(), name)
oid, err := zodb.ParseOid(name)
if err != nil {
return nil, eINVALf("not oid")
}
defer func() {
if f != nil {
f.getattr(out)
}
}()
// check to see if dir(oid) is already there
bfdir.fileMu.Lock()
f, already := bfdir.fileTab[oid]
bfdir.fileMu.Unlock()
if already {
return f, nil
}
// not there - without bfdir lock proceed to open BigFile from ZODB
f, err = bfdir.head.bigfopen(fctx, oid)
if err != nil {
return nil, err
}
// relock bfdir and either register f or, if the file was maybe
// simultaneously created while we were not holding bfdir.fileMu, return that.
bfdir.fileMu.Lock()
f2, already := bfdir.fileTab[oid]
if already {
bfdir.fileMu.Unlock()
// f.Close() not needed - BigFile is all just garbage-collected
return f2, nil
}
bfdir.fileTab[oid] = f
bfdir.fileMu.Unlock()
// mkfile takes filesystem treeLock - do it outside bfdir.fileMu
mkfile(bfdir, name, f)
return f, nil
}
// / -> Lookup receives client request to create @<rev>/.
func (root *Root) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
revd, err := root.lookup(name, fctx)
var inode *nodefs.Inode
if revd != nil {
inode = revd.Inode()
_ = revd.GetAttr(out, nil, fctx) // always ok
}
return inode, err2LogStatus(err)
}
func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
defer xerr.Contextf(&err, "/: lookup %q", name)
var rev zodb.Tid
ok := false
if strings.HasPrefix(name, "@") {
rev, err = zodb.ParseTid(name[1:])
ok = (err == nil)
}
if !ok {
return nil, eINVALf("not @rev")
}
// check to see if dir(rev) is already there
root.revMu.Lock()
revDir, already := root.revTab[rev]
root.revMu.Unlock()
if already {
// XXX race wrt simultaneous "FORGET @<rev>" ?
return revDir, nil
}
// not there - without revMu lock proceed to open @rev view of ZODB
zconnRev, err := xzodb.ZOpen(fctx, root.zdb, &zodb.ConnOptions{At: rev})
if err != nil {
return nil, err
}
// relock root and either register new revX/ directory or, if the
// directory was maybe simultaneously created while we were not holding
// revMu, return that.
root.revMu.Lock()
revDir, already = root.revTab[rev]
if already {
root.revMu.Unlock()
// zconnRev.Release()
transaction.Current(zconnRev.TxnCtx).Abort()
return revDir, nil
}
revDir = &Head{
// TODO how to test forgets:
// echo 2 >/proc/sys/vm/drop_caches (root)
// mount -i -oremount $mntpt (root ?) (shrinks dcache)
// notify invalidate dentry from inside fs
fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + Head.OnForget() -> del root.revTab[]
rev: rev,
zconn: zconnRev, // TODO + Head.OnForget() -> release zconn (= abort zconn.TxnCtx)
}
bfdir := &BigFileDir{
fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + BigFileDir.OnForget()
head: revDir,
fileTab: make(map[zodb.Oid]*BigFile),
}
revDir.bfdir = bfdir
root.revTab[rev] = revDir
root.revMu.Unlock()
// mkdir takes filesystem treeLock - do it outside revMu.
mkdir(root, name, revDir)
mkdir(revDir, "bigfile", bfdir)
return revDir, nil
}
// bigfopen opens BigFile corresponding to oid on head.zconn.
//
// A ZBigFile corresponding to oid is activated and statted.
func (head *Head) bigfopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) {
zconn := head.zconn
defer xerr.Contextf(&err, "bigfopen %s @%s", oid, zconn.At())
// TODO better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(ctx, zconn.TxnCtx)
defer cancel()
xzfile, err := zconn.Get(ctx, oid)
if err != nil {
switch errors.Cause(err).(type) {
case *zodb.NoObjectError:
return nil, eINVAL(err)
case *zodb.NoDataError:
return nil, eINVAL(err)
default:
return nil, err
}
}
zfile, ok := xzfile.(*ZBigFile)
if !ok {
return nil, eINVALf("%s is not a ZBigFile", xzodb.TypeOf(xzfile))
}
// extract blksize, size and initial approximation for file revision
err = zfile.PActivate(ctx)
if err != nil {
return nil, err
}
blksize := zfile.BlkSize()
// NOTE file revision should be revision of both ZBigFile and its data. But we
// cannot get data revision without expensive scan of all ZBigFile's objects.
// -> approximate mtime initially with ZBigFile object mtime.
revApprox := zfile.PSerial()
zfile.PDeactivate()
size, _, _, err := zfile.Size(ctx)
if err != nil {
return nil, err
}
f := &BigFile{
fsNode: newFSNode(&fsOptions{Sticky: false}), // TODO + BigFile.OnForget -> del .head.bfdir.fileTab[]
head: head,
zfile: zfile,
blksize: blksize,
size: size,
revApprox: revApprox,
loading: make(map[int64]*blkLoadState),
}
return f, nil
}
// ---- misc ---
// /(head|<rev>)/at -> readAt serves read.
func (h *Head) readAt(fctx *fuse.Context) ([]byte, error) {
return []byte(h.zconn.At().String()), nil
}
// /(head|<rev>)/ -> Getattr serves stat.
func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status {
at := head.rev
if at == 0 {
at = head.zconn.At()
}
t := at.Time().Time
out.Mode = fuse.S_IFDIR | 0555
out.SetTimes(/*atime=*/nil, /*mtime=*/&t, /*ctime=*/&t)
return fuse.OK
}
// /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat.
func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status {
f.getattr(out)
return fuse.OK
}
func (f *BigFile) getattr(out *fuse.Attr) {
out.Mode = fuse.S_IFREG | 0444
out.Size = uint64(f.size)
out.Blksize = uint32(f.blksize) // NOTE truncating 64 -> 32
// .Blocks
mtime := f.revApprox.Time().Time
out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime)
}
// FIXME gfsconn is tmp workaround for lack of way to retrieve FileSystemConnector from nodefs.Inode
// TODO:
// - Inode += .Mount() -> nodefs.Mount
// - Mount:
// .Root() -> root Inode of the fs
// .Connector() -> FileSystemConnector through which fs is mounted
var gfsconn *nodefs.FileSystemConnector
// TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?)
func main() { func main() {
// XXX stub //stdlog.SetPrefix("wcfs: ") NOTE conflicts with log.CopyStandardLogTo
log.CopyStandardLogTo("WARNING")
defer log.Flush()
err := _main()
if err != nil {
log.Fatal(err)
}
}
func _main() (err error) {
debug := flag.Bool("d", false, "debug")
autoexit := flag.Bool("autoexit", false, "automatically stop service when there is no client activity")
flag.Parse()
if len(flag.Args()) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s [OPTIONS] zurl mntpt\n", os.Args[0])
os.Exit(2)
}
zurl := flag.Args()[0]
mntpt := flag.Args()[1]
xclose := func(c io.Closer) {
err = xerr.First(err, c.Close())
}
// debug -> precise t, no dates (TODO(?) -> always precise t?)
if *debug {
stdlog.SetFlags(stdlog.Lmicroseconds)
}
log.Infof("start %q %q", mntpt, zurl)
gover := "(built with " + runtime.Version()
if race.Enabled {
gover += " -race"
}
gover += ")"
log.Info(gover)
// open zodb storage/watch/db/connection
ctx := context.Background() // TODO(?) + timeout?
zstor, err := zodb.Open(ctx, zurl, &zodb.OpenOptions{
ReadOnly: true,
})
if err != nil {
return err
}
defer xclose(zstor)
// TODO consider using zodbCacheControl for all connections
// ( in addition to zhead, historic connections - that are used to access @rev -
// also need to traverse BigFile.blktab btree )
zdb := zodb.NewDB(zstor, &zodb.DBOptions{})
defer xclose(zdb)
zhead, err := xzodb.ZOpen(ctx, zdb, &zodb.ConnOptions{
})
if err != nil {
return err
}
zhead.Cache().Lock()
zhead.Cache().SetControl(&zodbCacheControl{})
zhead.Cache().Unlock()
// mount root + head/
head := &Head{
fsNode: newFSNode(fSticky),
rev: 0,
zconn: zhead,
}
bfdir := &BigFileDir{
fsNode: newFSNode(fSticky),
head: head,
fileTab: make(map[zodb.Oid]*BigFile),
}
head.bfdir = bfdir
root := &Root{
fsNode: newFSNode(fSticky),
zstor: zstor,
zdb: zdb,
head: head,
revTab: make(map[zodb.Tid]*Head),
}
opts := &fuse.MountOptions{
FsName: zurl,
Name: "wcfs",
DisableXAttrs: true, // we don't use
Debug: *debug,
}
fssrv, fsconn, err := mount(mntpt, root, opts)
if err != nil {
return err
}
gfsconn = fsconn // FIXME temp workaround (see ^^^)
// we require proper pagecache control (added to Linux 2.6.36 in 2010)
kinit := fssrv.KernelSettings()
kfuse := fmt.Sprintf("kernel FUSE (API %d.%d)", kinit.Major, kinit.Minor)
supports := kinit.SupportsNotify
if !supports(fuse.NOTIFY_STORE_CACHE) {
return fmt.Errorf("%s does not support pagecache control", kfuse)
}
// add entries to /
mkdir(root, "head", head)
mkdir(head, "bigfile", bfdir)
mkfile(head, "at", NewSmallFile(head.readAt)) // TODO mtime(at) = tidtime(at)
// for debugging/testing
_wcfs := newFSNode(fSticky)
mkdir(root, ".wcfs", &_wcfs)
mkfile(&_wcfs, "zurl", NewStaticFile([]byte(zurl)))
// TODO handle autoexit
// (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl
// opened, so when all inodes has been forgotten - we know all wcfs.py clients exited)
_ = autoexit
defer xerr.Contextf(&err, "serve %s %s", mntpt, zurl)
// spawn filesystem server.
//
// use `go serve` + `waitMount` not just `serve` - because waitMount
// cares to disable OS calling poll on us.
// ( if we don't disable polling - fs serving can get stuck - see
// https://github.com/hanwen/go-fuse/commit/4f10e248eb for details )
serveCtx, serveCancel := context.WithCancel(context.Background())
go func () {
defer serveCancel()
fssrv.Serve()
}()
err = fssrv.WaitMount()
if err != nil {
return err
}
// filesystem server is serving requests.
// wait for unmount
// NOTE the kernel does not send FORGETs on unmount - but we don't need
// to release left node resources ourselves, because it is just memory.
<-serveCtx.Done()
log.Infof("stop %q %q", mntpt, zurl)
return nil
} }
...@@ -22,6 +22,852 @@ ...@@ -22,6 +22,852 @@
from __future__ import print_function, absolute_import from __future__ import print_function, absolute_import
from wendelin.lib.testing import getTestDB
from wendelin.lib.zodb import dbclose, zstor_2zurl
from wendelin.lib.mem import memcpy
from wendelin.bigfile.file_zodb import ZBigFile
from wendelin.bigfile.tests.test_filezodb import blksize
from wendelin import wcfs from wendelin import wcfs
# TODO import transaction
from persistent import Persistent
from persistent.timestamp import TimeStamp
import sys, os, os.path
from thread import get_ident as gettid
from time import gmtime
from errno import EINVAL, ENOTCONN
from golang import go, chan, select, func, defer, b
from golang import context, time
from zodbtools.util import ashex as h
import pytest; xfail = pytest.mark.xfail
from pytest import raises, fail
from wendelin.wcfs.internal import io, mm
from wendelin.wcfs.internal.wcfs_test import install_sigbus_trap
from wendelin.wcfs import _is_mountpoint as is_mountpoint
# setup:
# - create test database, compute zurl and mountpoint for wcfs
# - at every test: make sure wcfs is not running before & after the test.
testdb = None
testzurl = None # URL of testdb
testmntpt = None # wcfs is mounted here
def setup_module():
# if wcfs.py receives SIGBUS because wcfs.go panics while serving mmap'ed
# read, we want to see python-level traceback instead of being killed.
install_sigbus_trap()
# if wcfs.go is built with race detector and detects a race - make it fail
# current test loudly on the first wcfs.go race.
gorace = os.environ.get("GORACE", "")
if gorace != "":
gorace += " "
os.environ["GORACE"] = gorace + "halt_on_error=1"
global testdb, testzurl, testmntpt
testdb = getTestDB()
testdb.setup()
zstor = testdb.getZODBStorage()
testzurl = zstor_2zurl(zstor)
zstor.close()
testmntpt = wcfs._mntpt_4zurl(testzurl)
os.rmdir(testmntpt)
def teardown_module():
testdb.teardown()
# make sure we start every test without wcfs server running.
def setup_function(f):
assert not os.path.exists(testmntpt)
with raises(KeyError):
procmounts_lookup_wcfs(testzurl)
# make sure we unmount wcfs after every test.
# (tDB checks this in more detail, but join tests don't use tDB)
def teardown_function(f):
mounted = is_mountpoint(testmntpt)
if mounted:
fuse_unmount(testmntpt)
if os.path.exists(testmntpt):
os.rmdir(testmntpt)
with raises(KeyError):
procmounts_lookup_wcfs(testzurl)
# fuse_unmount unmounts FUSE filesystem mounted @ mntpt.
def fuse_unmount(mntpt):
assert is_mountpoint(mntpt)
wcfs._fuse_unmount(mntpt)
# ---- test join/autostart/serve ----
# test that join works.
@func
def test_join():
zurl = testzurl
with raises(RuntimeError, match="wcfs: join .*: server not running"):
wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry == {}
def _():
assert wcfs._wcregistry == {}
defer(_)
wcsrv = wcfs.start(zurl)
defer(wcsrv.stop)
assert wcsrv.mountpoint == testmntpt
assert readfile(wcsrv.mountpoint + "/.wcfs/zurl") == zurl
assert os.path.isdir(wcsrv.mountpoint + "/head")
assert os.path.isdir(wcsrv.mountpoint + "/head/bigfile")
wc = wcfs.join(zurl, autostart=False)
defer(wc.close)
assert wc.mountpoint == wcsrv.mountpoint
assert wc._njoin == 1
assert wc._wcsrv is None
wc2 = wcfs.join(zurl, autostart=False)
defer(wc2.close)
assert wc2 is wc
assert wc._njoin == 2
# test that join(autostart=y) works.
@func
def test_join_autostart():
zurl = testzurl
with raises(RuntimeError, match="wcfs: join .*: server not running"):
wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry == {}
def _():
assert wcfs._wcregistry == {}
defer(_)
wc = wcfs.join(zurl, autostart=True)
defer(wc.close)
assert wc.mountpoint == testmntpt
assert wc._njoin == 1
assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
assert os.path.isdir(wc.mountpoint + "/head")
assert os.path.isdir(wc.mountpoint + "/head/bigfile")
# verify that join successfully starts wcfs if previous wcfs exited uncleanly.
@func
def test_join_after_crash():
zurl = testzurl
mntpt = testmntpt
wc = start_and_crash_wcfs(zurl, mntpt)
# start the server again - it should start ok despite that FUSE connection
# to previously aborted wcfs is still there
wc2 = wcfs.join(zurl, autostart=True)
assert wc2 is not wc
assert wcfs._wcregistry[mntpt] is wc2
assert wc2.mountpoint == mntpt
assert readfile(mntpt + "/.wcfs/zurl") == zurl
# /proc/mounts should contain wcfs entry
assert procmounts_lookup_wcfs(zurl) == mntpt
# stop the server
wc2.close()
fuse_unmount(mntpt)
# /proc/mounts entry should be gone
with raises(KeyError):
procmounts_lookup_wcfs(zurl)
# verify that start successfully starts server if previous wcfs exited uncleanly.
@func
def test_start_after_crash():
zurl = testzurl
mntpt = testmntpt
wc = start_and_crash_wcfs(zurl, mntpt)
wcsrv = wcfs.start(zurl)
defer(wcsrv.stop)
assert wcsrv.mountpoint == mntpt
assert readfile(mntpt + "/.wcfs/zurl") == zurl
# /proc/mounts should contain wcfs entry
assert procmounts_lookup_wcfs(zurl) == mntpt
# stop the server - /proc/mounts entry should be gone
wcsrv.stop()
with raises(KeyError):
procmounts_lookup_wcfs(zurl)
# verify that serve successfully starts if previous wcfs exited uncleanly.
@func
def test_serve_after_crash():
zurl = testzurl
mntpt = testmntpt
wc = start_and_crash_wcfs(zurl, mntpt)
serve_starting = chan(dtype='C.structZ')
serve_done = chan(dtype='C.structZ')
@func
def _():
defer(serve_done.close)
wcfs.serve(zurl, [], _tstartingq=serve_starting)
go(_)
def _():
fuse_unmount(mntpt)
serve_done.recv()
defer(_)
serve_starting.recv() # wait before serve is going to spawn wcfs after cleanup
wcfs._waitmount(timeout(), zurl, mntpt)
assert readfile(mntpt + "/.wcfs/zurl") == zurl
assert procmounts_lookup_wcfs(zurl) == mntpt
# start_and_crash_wcfs starts wcfs and then kills it.
# it returns closed WCFS connection that was connected to the killed WCFS server.
def start_and_crash_wcfs(zurl, mntpt): # -> WCFS
# /proc/mounts should not contain wcfs entry
with raises(KeyError):
procmounts_lookup_wcfs(zurl)
# start the server with attached client
wcsrv = wcfs.start(zurl)
assert wcsrv.mountpoint == mntpt
assert mntpt not in wcfs._wcregistry
wc = wcfs.join(zurl, autostart=False)
assert wcfs._wcregistry[mntpt] is wc
assert wc.mountpoint == mntpt
assert readfile(mntpt + "/.wcfs/zurl") == zurl
# /proc/mounts should now contain wcfs entry
assert procmounts_lookup_wcfs(zurl) == mntpt
# kill the server
wcsrv._proc.kill() # sends SIGKILL
assert wcsrv._proc.wait() != 0
# access to filesystem should raise "Transport endpoint not connected"
with raises(IOError) as exc:
readfile(mntpt + "/.wcfs/zurl")
assert exc.value.errno == ENOTCONN
# client close should also raise "Transport endpoint not connected" but remove wc from _wcregistry
assert wcfs._wcregistry[mntpt] is wc
with raises(IOError) as exc:
wc.close()
assert exc.value.errno == ENOTCONN
assert mntpt not in wcfs._wcregistry
# /proc/mounts should still contain wcfs entry
assert procmounts_lookup_wcfs(zurl) == mntpt
return wc
# ---- infrastructure for data access tests ----
#
# Testing infrastructure consists of tDB and tFile that
# jointly organize wcfs behaviour testing. See individual classes for details.
# many tests need to be run with some reasonable timeout to detect lack of wcfs
# response. with_timeout and timeout provide syntactic shortcuts to do so.
def with_timeout(parent=context.background()): # -> ctx, cancel
return context.with_timeout(parent, 3*time.second)
def timeout(parent=context.background()): # -> ctx
ctx, _ = with_timeout()
return ctx
# DF represents a change in files space.
class DF:
# .rev tid
# .byfile {} ZBigFile -> DFile
def __init__(dF):
# rev set from outside
dF.byfile = {}
# DFile represents a change to one file.
class DFile:
# .rev tid
# .ddata {} blk -> data
def __init__(dfile):
# rev set from outside
dfile.ddata = {}
# tDB/tWCFS provides database/wcfs testing environment.
#
# Database root and wcfs connection are represented by .root and .wc correspondingly.
# The database is initialized with one ZBigFile created and opened via ZODB connection as .zfile .
#
# The primary way to access wcfs is by opening BigFiles.
# A BigFile opened under tDB is represented as tFile - see .open for details.
#
# The database can be mutated (via !wcfs codepath) with .change + .commit .
# Current database head is represented by .head .
# The history of the changes is kept in .dFtail .
# There are various helpers to query history (_blkDataAt, ...)
#
# tDB must be explicitly closed once no longer used.
#
# TODO(?) print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tWCFS(object):
@func
def __init__(t):
assert not os.path.exists(testmntpt)
wc = wcfs.join(testzurl, autostart=True)
assert wc.mountpoint == testmntpt
assert os.path.exists(wc.mountpoint)
assert is_mountpoint(wc.mountpoint)
t.wc = wc
# force-unmount wcfs on timeout to unstuck current test and let it fail.
# Force-unmount can be done reliably only by writing into
# /sys/fs/fuse/connections/<X>/abort. For everything else there are
# cases, when wcfs, even after receiving `kill -9`, will be stuck in kernel.
# ( git.kernel.org/linus/a131de0a482a makes in-kernel FUSE client to
# still wait for request completion even after fatal signal )
t._closed = chan()
t._wcfuseaborted = chan()
t._wcfuseabort = os.fdopen(os.dup(wc._wcsrv._fuseabort.fileno()), 'w')
go(t._abort_ontimeout, 10*time.second) # NOTE must be: with_timeout << · << wcfs_pin_timeout
# _abort_ontimeout sends abort to fuse control file if timeout happens
# before tDB is closed.
def _abort_ontimeout(t, dt):
_, _rx = select(
time.after(dt).recv, # 0
t._closed.recv, # 1
)
if _ == 1:
return # tDB closed = testcase completed
# timeout -> force-umount wcfs
eprint("\nC: test timed out after %.1fs" % (dt / time.second))
eprint("-> aborting wcfs fuse connection to unblock ...\n")
t._wcfuseabort.write(b"1\n")
t._wcfuseabort.flush()
t._wcfuseaborted.close()
# close closes connection to wcfs, unmounts the filesystem and makes sure
# that wcfs server exits.
@func
def close(t):
def _():
os.close(t._wcfuseabort)
defer(t._closed.close)
# unmount and wait for wcfs to exit
def _():
# run `fusermount -u` the second time after if wcfs was killed to
# cleanup /proc/mounts.
if is_mountpoint(t.wc.mountpoint):
fuse_unmount(t.wc.mountpoint)
assert not is_mountpoint(t.wc.mountpoint)
os.rmdir(t.wc.mountpoint)
defer(_)
def _():
def onstuck():
fail("wcfs.go does not exit even after SIGKILL")
t.wc._wcsrv._stop(timeout(), _onstuck=onstuck)
defer(_)
defer(t.wc.close)
assert is_mountpoint(t.wc.mountpoint)
class tDB(tWCFS):
@func
def __init__(t):
t.root = testdb.dbopen()
def _(): # close/unlock db if __init__ fails
exc = sys.exc_info()[1]
if exc is not None:
dbclose(t.root)
defer(_)
# start wcfs after testdb is created
super(tDB, t).__init__()
# ZBigFile(s) scheduled for commit
t._changed = {} # ZBigFile -> {} blk -> data
# committed: (tail, head] + δF history
t.tail = t.root._p_jar.db().storage.lastTransaction()
t.dFtail = [] # of DF; head = dFtail[-1].rev
# tracked opened tFiles
t._files = set()
# ID of the thread which created tDB
# ( transaction plays dirty games with threading.local and we have to
# check the thread is the same when .root is used )
t._maintid = gettid()
# prepare initial objects for test: zfile, nonzfile
t.root['!file'] = t.nonzfile = Persistent()
t.root['zfile'] = t.zfile = ZBigFile(blksize)
t.at0 = t.commit()
@property
def head(t):
return t.dFtail[-1].rev
# close closes test database as well as all tracked files and wcfs.
# it also prints change history to help developer overview current testcase.
@func
def close(t):
defer(super(tDB, t).close)
defer(lambda: dbclose(t.root))
defer(t.dump_history)
for tf in t._files.copy():
tf.close()
assert len(t._files) == 0
# open opens wcfs file corresponding to zf@at and starts to track it.
# see returned tFile for details.
def open(t, zf, at=None): # -> tFile
return tFile(t, zf, at=at)
# change schedules zf to be changed according to changeDelta at commit.
#
# changeDelta: {} blk -> data.
# data can be both bytes and unicode.
def change(t, zf, changeDelta):
assert isinstance(zf, ZBigFile)
zfDelta = t._changed.setdefault(zf, {})
for blk, data in changeDelta.iteritems():
data = b(data)
assert len(data) <= zf.blksize
zfDelta[blk] = data
# commit commits transaction and makes sure wcfs is synchronized to it.
#
# It updates .dFtail and returns committed transaction ID.
#
# zf and changeDelta can be optionally provided, in which case .change(zf,
# changeDelta) call is made before actually committing.
def commit(t, zf=None, changeDelta=None): # -> tAt
if zf is not None:
assert changeDelta is not None
t.change(zf, changeDelta)
# perform modifications scheduled by change.
# use !wcfs mode so that we prepare data independently of wcfs code paths.
dF = DF()
zconns = set()
for zf, zfDelta in t._changed.items():
dfile = DFile()
zconns.add(zf._p_jar)
zfh = zf.fileh_open() # NOTE does not use wcfs
for blk, data in zfDelta.iteritems():
dfile.ddata[blk] = data
data += b'\0'*(zf.blksize - len(data)) # trailing \0
vma = zfh.mmap(blk, 1)
memcpy(vma, data)
dF.byfile[zf] = dfile
# verify that all changed objects come from the same ZODB connection
assert len(zconns) in (0, 1) # either nothing to commit or all from the same zconn
if len(zconns) == 1:
zconn = zconns.pop()
root = zconn.root()
else:
# no objects to commit
root = t.root
assert gettid() == t._maintid
# perform the commit. NOTE there is no clean way to retrieve tid of
# just committed transaction - we use last._p_serial as workaround.
root['_last'] = last = Persistent()
last._p_changed = 1
transaction.commit()
head = tAt(t, last._p_serial)
dF.rev = head
for dfile in dF.byfile.values():
dfile.rev = head
t.dFtail.append(dF)
assert t.head == head # self-check
print('\nM: commit -> %s' % head)
for zf, zfDelta in t._changed.items():
print('M: f<%s>\t%s' % (h(zf._p_oid), sorted(zfDelta.keys())))
t._changed = {}
# synchronize wcfs to db, and we are done
t._wcsync()
return head
# _wcsync makes sure wcfs is synchronized to latest committed transaction.
def _wcsync(t):
# XXX stub: unmount/remount + close/reopen files until wcfs supports invalidations
files = t._files.copy()
for tf in files:
tf.close()
tWCFS.close(t)
tWCFS.__init__(t)
for tf in files:
tf.__init__(t, tf.zf, tf.at)
assert len(t._files) == len(files)
# tFile provides testing environment for one bigfile opened on wcfs.
#
# ._blk() provides access to data of a block.
# .assertBlk/.assertData assert
# on state of data.
class tFile:
# maximum number of pages we mmap for 1 file.
_max_tracked_pages = 8
def __init__(t, tdb, zf, at=None):
assert isinstance(zf, ZBigFile)
t.tdb = tdb
t.zf = zf
t.at = at
t.f = tdb.wc._open(zf, at=at)
t.blksize = zf.blksize
t.fmmap = None
tdb._files.add(t)
# make sure that wcfs reports zf.blksize as preferred block size for IO.
# wcfs.py also uses .st_blksize in blk -> byte offset computation.
st = os.fstat(t.f.fileno())
assert st.st_blksize == t.blksize
# mmap the file past the end up to _max_tracked_pages
assert t.blksize % mm.PAGE_SIZE == 0
t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked_pages*t.blksize)
def close(t):
t.tdb._files.remove(t)
if t.fmmap is not None:
mm.unmap(t.fmmap)
t.f.close()
# _blk returns memoryview of file[blk].
# when/if block memory is accessed, the user has to notify tFile with _blkaccess call.
def _blk(t, blk):
assert blk <= t._max_tracked_pages
return memoryview(t.fmmap[blk*t.blksize:(blk+1)*t.blksize])
def _blkaccess(t, blk):
if t.at is None: # notify tDB only for head/file access
t.tdb._blkheadaccess(t.zf, blk)
# _sizeinblk returns file size in blocks.
def _sizeinblk(t):
st = os.fstat(t.f.fileno())
assert st.st_blksize == t.blksize # just in case
assert st.st_size % t.blksize == 0
assert st.st_size // t.blksize <= t._max_tracked_pages
return st.st_size // t.blksize
# assertBlk asserts that file[blk] has data as expected.
#
# Expected data may be given with size < t.blksize. In such case the data
# is implicitly appended with trailing zeros. Data can be both bytes and unicode.
@func
def assertBlk(t, blk, dataok):
# TODO -> assertCtx('blk #%d' % blk)
def _():
assertCtx = 'blk #%d' % blk
_, e, _ = sys.exc_info()
if isinstance(e, AssertionError):
assert len(e.args) == 1 # pytest puts everything as args[0]
e.args = (assertCtx + "\n" + e.args[0],)
defer(_)
dataok = b(dataok)
blkdata, _ = t.tdb._blkDataAt(t.zf, blk, t.at)
assert blkdata == dataok, "computed vs explicit data"
t._assertBlk(blk, dataok)
@func
def _assertBlk(t, blk, dataok):
assert len(dataok) <= t.blksize
dataok += b'\0'*(t.blksize - len(dataok)) # tailing zeros
assert blk < t._sizeinblk()
blkview = t._blk(blk)
# verify full data of the block
# TODO(?) assert individually for every block's page? (easier debugging?)
assert blkview.tobytes() == dataok
# assertData asserts that file has data blocks as specified.
#
# Expected blocks may be given with size < zf.blksize. In such case they
# are implicitly appended with trailing zeros. If a block is specified as
# 'x' - this particular block is not accessed and is not checked.
#
# The file size and optionally mtime are also verified.
def assertData(t, dataokv, mtime=None):
st = os.fstat(t.f.fileno())
assert st.st_blksize == t.blksize
assert st.st_size == len(dataokv)*t.blksize
if mtime is not None:
assert st.st_mtime == tidtime(mtime)
for blk, dataok in enumerate(dataokv):
if dataok == 'x':
continue
t.assertBlk(blk, dataok)
# ---- infrastructure: helpers to query dFtail/accessed history ----
# _blkDataAt returns expected zf[blk] data and its revision as of @at database state.
#
# If the block is hole - (b'', at0) is returned. XXX -> @z64?
# Hole include cases when the file does not exists, or when blk is > file size.
@func(tDB)
def _blkDataAt(t, zf, blk, at): # -> (data, rev)
if at is None:
at = t.head
# all changes to zf
vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]
# changes to zf[blk] <= at
blkhistoryat = [_ for _ in vdf if blk in _.ddata and _.rev <= at]
if len(blkhistoryat) == 0:
# blk did not existed @at
data = b''
rev = t.dFtail[0].rev # was hole - at0
else:
_ = blkhistoryat[-1]
data = _.ddata[blk]
rev = _.rev
assert rev <= at
return data, rev
# -------------------------------------
# ---- actual tests to access data ----
# exercise wcfs functionality
# plain data access.
@func
def test_wcfs_basic():
t = tDB(); zf = t.zfile
defer(t.close)
# >>> lookup non-BigFile -> must be rejected
with raises(OSError) as exc:
t.wc._stat("head/bigfile/%s" % h(t.nonzfile._p_oid))
assert exc.value.errno == EINVAL
# >>> file initially empty
f = t.open(zf)
f.assertData ([], mtime=t.at0)
# >>> (@at1) commit data -> we can see it on wcfs
at1 = t.commit(zf, {2:'c1'})
f.assertData (['','','c1']) # TODO + mtime=t.head
# >>> (@at2) commit again -> we can see both latest and snapshotted states
# NOTE blocks e(4) and f(5) will be accessed only in the end
at2 = t.commit(zf, {2:'c2', 3:'d2', 5:'f2'})
# f @head
f.assertData (['','', 'c2', 'd2', 'x','x']) # TODO + mtime=t.head
# f @at1
f1 = t.open(zf, at=at1)
f1.assertData (['','','c1']) # TODO + mtime=at1
# >>> (@at3) commit again without changing zf size
f2 = t.open(zf, at=at2)
at3 = t.commit(zf, {0:'a3', 2:'c3', 5:'f3'})
# f @head
f.assertData (['a3','','c3','d2','x','x']) # TODO + mtime=t.head
# f @at2
# NOTE f(2) is accessed but via @at/ not head/ ; f(2) in head/zf remains unaccessed
f2.assertData (['','','c2','d2','','f2']) # TODO mtime=at2
# f @at1
f1.assertData (['','','c1']) # TODO mtime=at1
# verify that read after file size returns (0, ok)
# (the same behaviour as on e.g. ext4 and as requested by posix)
@func
def test_wcfs_basic_read_aftertail():
t = tDB(); zf = t.zfile
defer(t.close)
t.commit(zf, {2:'c1'})
f = t.open(zf)
f.assertData(['','','c1'])
def _(off): # -> bytes read from f[off +4)
buf = bytearray(4)
n = io.readat(f.f.fileno(), off, buf)
return bytes(buf[:n])
assert _(0*blksize) == b'\x00\x00\x00\x00'
assert _(1*blksize) == b'\x00\x00\x00\x00'
assert _(2*blksize) == b'c1\x00\x00'
assert _(3*blksize-4) == b'\x00\x00\x00\x00'
assert _(3*blksize-3) == b'\x00\x00\x00'
assert _(3*blksize-2) == b'\x00\x00'
assert _(3*blksize-1) == b'\x00'
assert _(3*blksize-0) == b''
assert _(3*blksize+1) == b''
assert _(3*blksize+2) == b''
assert _(3*blksize+3) == b''
assert _(4*blksize) == b''
assert _(8*blksize) == b''
assert _(100*blksize) == b''
# ---- misc ---
# readfile reads file @ path.
def readfile(path):
with open(path) as f:
return f.read()
# writefile writes data to file @ path.
def writefile(path, data):
with open(path, "w") as f:
f.write(data)
# tidtime converts tid to transaction commit time.
def tidtime(tid):
t = TimeStamp(tid).timeTime()
# ZODB/py vs ZODB/go time resolution is not better than 1µs
# see e.g. https://lab.nexedi.com/kirr/neo/commit/9112f21e
#
# NOTE pytest.approx supports only ==, not e.g. <, so we use plain round.
return round(t, 6)
# tidfromtime converts time into corresponding transaction ID.
def tidfromtime(t):
f = t - int(t) # fraction of seconds
t = int(t)
_ = gmtime(t)
s = _.tm_sec + f # total seconds
ts = TimeStamp(_.tm_year, _.tm_mon, _.tm_mday, _.tm_hour, _.tm_min, s)
return ts.raw()
# verify that tidtime is precise enough to show difference in between transactions.
# verify that tidtime -> tidfromtime is identity within rounding tolerance.
@func
def test_tidtime():
t = tDB()
defer(t.close)
# tidtime not rough
atv = [t.commit()]
for i in range(10):
at = t.commit()
assert tidtime(at) > tidtime(atv[-1])
atv.append(at)
# tidtime -> tidfromtime
for at in atv:
tat = tidtime(at)
at_ = tidfromtime(tat)
tat_ = tidtime(at_)
assert abs(tat_ - tat) <= 2E-6
# tAt is bytes whose repr returns human readable string considering it as `at` under tDB.
#
# It gives both symbolic version and raw hex forms, for example:
# @at2 (03cf7850500b5f66)
#
# tAt is used everywhere with the idea that e.g. if an assert comparing at, or
# e.g. dicts containing at, fails, everything is printed in human readable
# form instead of raw hex that is hard to visibly map to logical transaction.
class tAt(bytes):
def __new__(cls, tdb, at):
tat = bytes.__new__(cls, at)
tat.tdb = tdb
return tat
def __repr__(at):
t = at.tdb
for i, dF in enumerate(t.dFtail):
if dF.rev == at:
return "@at%d (%s)" % (i, h(at))
return "@" + h(at)
__str__ = __repr__
# zfiles returns ZBigFiles that were ever changed under t.
@func(tDB)
def zfiles(t):
zfs = set()
for dF in t.dFtail:
for zf in dF.byfile:
if zf not in zfs:
zfs.add(zf)
return zfs
# dump_history prints t's change history in tabular form.
#
# the output is useful while developing or analyzing a test failure: to get
# overview of how file(s) are changed in tests.
@func(tDB)
def dump_history(t):
print('>>> Change history by file:')
for zf in t.zfiles():
print('\nf<%s>:' % h(zf._p_oid))
indent = '\t%s\t' % (' '*len('%s' % t.head),)
print('%s%s' % (indent, ' '.join('01234567')))
print('%s%s' % (indent, ' '.join('abcdefgh')))
for dF in t.dFtail:
df = dF.byfile.get(zf)
emitv = []
if df is not None:
dblk = set(df.ddata.keys())
for blk in range(max(dblk)+1):
if blk in dblk:
emitv.append('%d' % blk)
else:
emitv.append(' ')
print('\t%s\t%s' % (dF.rev, ' '.join(emitv)))
print()
# procmounts_lookup_wcfs returns /proc/mount entry for wcfs mounted to serve zurl.
def procmounts_lookup_wcfs(zurl): # -> mountpoint | KeyError
for line in readfile('/proc/mounts').splitlines():
# <zurl> <mountpoint> fuse.wcfs ...
zurl_, mntpt, typ, _ = line.split(None, 3)
if typ != 'fuse.wcfs':
continue
if zurl_ == zurl:
return mntpt
raise KeyError("lookup wcfs %s: no /proc/mounts entry" % zurl)
# eprint prints msg to stderr
def eprint(msg):
print(msg, file=sys.stderr)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment