# -*- coding: utf-8 -*- # Copyright (C) 2018-2019 Nexedi SA and Contributors. # Kirill Smelkov <kirr@nexedi.com> # # This program is free software: you can Use, Study, Modify and Redistribute # it under the terms of the GNU General Public License version 3, or (at your # option) any later version, as published by the Free Software Foundation. # # You can also Link and Combine this program with other software covered by # the terms of any of the Free Software licenses or any of the Open Source # Initiative approved licenses and Convey the resulting work. Corresponding # source of such a combination shall include the source code for all other # software used. # # This program is distributed WITHOUT ANY WARRANTY; without even the implied # warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # # See COPYING file for full licensing terms. # See https://www.nexedi.com/licensing for rationale and options. """Module wcfs.py provides python gateway for spawning and interoperating with wcfs server Join(zurl) joins wcfs server. If wcfs server for zurl is not yet running, it will be automatically started if `autostart=True` parameter is passed to join. It will also be automatically started by default unless $WENDELIN_CORE_WCFS_AUTOSTART=no is specified in environment. Conn represents connection to wcfs server obtained by join. FileH ... XXX XXX $WENDELIN_CORE_WCFS_AUTOSTART $WENDELIN_CORE_WCFS_OPTIONS """ import os, sys, hashlib, tempfile, subprocess, time import logging as log from os.path import dirname from errno import ENOENT, EEXIST from golang import chan, select, default from golang import sync, context from golang.gcompat import qq from ZODB.FileStorage import FileStorage from ZODB.utils import u64, p64 from zodbtools.util import ashex # Conn represents connection to wcfs server. class Conn(object): # .mountpoint path to wcfs mountpoint # ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly) # XXX 4testing only? # ._proc wcfs process if it was opened by this conn | None def __init__(self, mountpoint, fwcfs, proc): self.mountpoint = mountpoint self._fwcfs = fwcfs self._proc = proc def close(self): # XXX unmount wcfs as well? self._fwcfs.close() # open creates wcfs file handle, which can be mmaped to give data of ZBigFile. # # XXX more text # # All mmapings of one FileH share changes. # There can be several different FileH for the same (at, oid), and those # FileH do not share changes. def open(self, zfile): # -> FileH #assert isinstance(zfile, ZBigFile) # XXX import cycle zconn = zfile._p_jar # XXX ._start is probably ZODB5 only -> check ZODB4 and ZODB3 zat = p64(u64(zconn._storage._start)-1) # before -> at # XXX pinned to @revX/... for now -> TODO /head/bigfile/... path = '%s/@%s/bigfile/%s' % (self.mountpoint, ashex(zat), ashex(zfile._p_oid)) fd = os.open(path, os.O_RDONLY) return FileH(fd) # FileH is handle to opened bigfile/X. # # XXX it mimics BigFileH and should be integrated into virtmem (see fileh_open_overlay) # # XXX it should implement wcfs invalidation protocol and remmap head/... parts # to pinned as requested. import mmap from bigarray import pagesize # XXX hack from bigarray.array_ram import _VMA # XXX hack class FileH(object): # .fd def __init__(self, fd): self.fd = fd def __del__(self): os.close(self.fd) def mmap(self, pgoffset, pglen): return _VMA(self.fd, pgoffset, pglen, pagesize, mmap.PROT_READ) # ---- join/run wcfs ---- # serve starts and runs wcfs server for ZODB @ zurl. # # it mounts wcfs at a location that is with 1-1 correspondence with zurl. # it then waits for wcfs to exit (either due to unmount or an error). # # it is an error if wcfs was already started. # # XXX optv # if exec_ is True, wcfs is not spawned, but executed into. # # serve(zurl, exec_=False). def serve(zurl, optv, exec_=False): mntpt = _mntpt_4zurl(zurl) # try opening .wcfs - it is an error if we can do it. # XXX -> option to wcfs itself? try: f = open(mntpt + "/.wcfs/zurl") except IOError as e: if e.errno != ENOENT: raise else: f.close() raise RuntimeError("wcfs: start %s: already started" % zurl) # seems to be ok to start # XXX race window if something starts after ^^^ check argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt] if not exec_: subprocess.check_call(argv, close_fds=True) else: os.execv(argv[0], argv) # _default_autostart returns default autostart setting for join. # # Out-of-the-box we want wcfs to be automatically started, to ease developer # experience when wendelin.core is standalone installed. However in environments # like SlapOS, it is more preferable to start and monitor wcfs service explicitly. # SlapOS & co. should thus set $WENDELIN_CORE_WCFS_AUTOSTART=no. def _default_autostart(): autostart = os.environ.get("WENDELIN_CORE_WCFS_AUTOSTART", "yes") autostart = autostart.lower() return {"yes": True, "no": False}[autostart] # join connects to wcfs server for ZODB @ zurl. # # If wcfs for that zurl was already started, join connects to it. # Otherwise it starts wcfs for zurl if autostart is True. # # If shared is True - a shared connection is returned - one that will be also # returned for following join(shared=True) requests with the same zurl. def join(zurl, autostart=_default_autostart(), shared=False): # -> Conn # XXX implement shared mntpt = _mntpt_4zurl(zurl) # try opening .wcfs - if we succeed - it is already mounted. # XXX -> wcfs itself? try: f = open(mntpt + "/.wcfs/zurl") except IOError as e: if e.errno != ENOENT: raise else: # already have it return Conn(mntpt, f, None) if not autostart: raise RuntimeError("wcfs: join %s: server not started" % zurl) # start wcfs with telling it to automatically exit when there is no client activity. # XXX extra opts -> join args -> test + -v=1 if running py.test -v # XXX ^^^ check log level? optv_extra = os.environ.get("WENDELIN_CORE_WCFS_OPTIONS", "").split() return _start(zurl, "-autoexit", *optv_extra) # _start starts wcfs server for ZODB @ zurl. # # optv can be optionally given to pass flags to wcfs. def _start(zurl, *optv): # -> Conn mntpt = _mntpt_4zurl(zurl) log.info("wcfs: starting for %s ...", zurl) # XXX errctx "wcfs: start" # spawn wcfs and wait till filesystem-level access to it is ready conn = Conn(mntpt, None, None) wg = sync.WorkGroup(context.background()) fsready = chan() def _(ctx): # XXX errctx "spawn" argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt] proc = subprocess.Popen(argv, close_fds=True) while 1: ret = proc.poll() if ret is not None: raise "exited with %s" % ret _, _rx = select( ctx.done().recv, # 0 fsready.recv, # 1 default, # 2 ) if _ == 0: proc.terminate() raise ctx.err() if _ == 1: # startup was ok - don't monitor spawned wcfs any longer conn._proc = proc return time.sleep(0.1) wg.go(_) def _(ctx): # XXX errctx "waitmount" while 1: try: f = open("%s/.wcfs/zurl" % mntpt) except IOError as e: if e.errno != ENOENT: raise else: dotwcfs = f.read() if dotwcfs != zurl: raise RuntimeError(".wcfs/zurl != zurl (%s != %s)" % (qq(dotwcfs), qq(zurl))) conn._fwcfs = f fsready.close() return _, _rx = select( ctx.done().recv, # 0 default, # 1 ) if _ == 0: raise ctx.err() time.sleep(0.1) wg.go(_) wg.wait() return conn # _wcfs_exe returns path to wcfs executable. def _wcfs_exe(): return '%s/wcfs' % dirname(__file__) # _mntpt_4zurl returns wcfs should-be mountpoint for ZODB @ zurl. # # it also makes sure the mountpoint exists. def _mntpt_4zurl(zurl): # XXX what if zurl is zconfig://... ? -> then we have to look inside? # -> zstor_2zurl extracts zurl in canonical form and zconfig:// is not possible there. m = hashlib.sha1() m.update(zurl) mntpt = "%s/wcfs/%s" % (tempfile.gettempdir(), m.hexdigest()) _mkdir_p(mntpt) return mntpt # zstor_2zurl converts a ZODB storage to URL to access it. # XXX -> unexport? def zstor_2zurl(zstor): # There is, sadly, no unified way to do it, as even if storages are created via # zodburi, after creation its uri is lost. And storages could be created not # only through URI but e.g. via ZConfig and manually. We want to support all # those cases... # # For this reason extact URL with important for wcfs use-case parameters in # ad-hoc way. if isinstance(zstor, FileStorage): return "file://%s" % (zstor._file_name,) # TODO ZEO + NEO support raise NotImplementedError("don't know how to extract url from %r" % zstor) # mkdir -p. def _mkdir_p(path): try: os.makedirs(path) except OSError as e: if e.errno != EEXIST: raise # if called as main just -> serve() def main(): argv = sys.argv[1:] # XXX usage zurl = argv[-1] # -a -b zurl -> zurl optv = argv[:-1] # -a -b zurl -> -a -b serve(zurl, optv, exec_=True)