# -*- coding: utf-8 -*- # Copyright (C) 2018-2019 Nexedi SA and Contributors. # Kirill Smelkov <kirr@nexedi.com> # # This program is free software: you can Use, Study, Modify and Redistribute # it under the terms of the GNU General Public License version 3, or (at your # option) any later version, as published by the Free Software Foundation. # # You can also Link and Combine this program with other software covered by # the terms of any of the Free Software licenses or any of the Open Source # Initiative approved licenses and Convey the resulting work. Corresponding # source of such a combination shall include the source code for all other # software used. # # This program is distributed WITHOUT ANY WARRANTY; without even the implied # warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # # See COPYING file for full licensing terms. # See https://www.nexedi.com/licensing for rationale and options. """Module wcfs.py provides python gateway for spawning and interoperating with wcfs server Join(zurl) joins wcfs server. If wcfs server for zurl is not yet running, it will be automatically started if `autostart=True` parameter is passed to join. It will also be automatically started by default unless $WENDELIN_CORE_WCFS_AUTOSTART=no is specified in environment. WCFS represents connection to wcfs server obtained by join. FileH ... XXX XXX $WENDELIN_CORE_WCFS_AUTOSTART $WENDELIN_CORE_WCFS_OPTIONS """ import os, sys, hashlib, tempfile, subprocess, time, re import logging as log from os.path import dirname from errno import ENOENT, EEXIST from golang import chan, select, default, func, defer from golang import sync, context from golang.gcompat import qq import threading from persistent import Persistent from ZODB.FileStorage import FileStorage from ZODB.utils import z64, u64, p64 from zodbtools.util import ashex, fromhex from .internal import mm from six import reraise # WCFS represents filesystem-level connection to wcfs server. # # Use join to create it. # # The primary way to access wcfs is to open logical connection viewing on-wcfs # data as of particular database state, and use that logical connection to # create base-layer mappings. See .connect and Conn for details. # # Raw files on wcfs can be accessed with ._path/._read/._stat/._open . # # WCFS logically mirrors ZODB.DB . class WCFS(object): # .mountpoint path to wcfs mountpoint # ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly) # ._njoin this connection was returned for so many joins # XXX for-testing only? # ._proc wcfs process if it was opened by this WCFS | None pass # Conn represents logical connection viewing data on wcfs filesystem as of # particular database state. # # It uses /head/bigfile/* and notifications received from /head/watch to # maintain isolated database view while at the same time sharing most of data # cache in OS pagecache of /head/bigfile/*. # # Use .mmap to create new Mappings. # # Conn logically mirrors ZODB.Connection . class Conn(object): # ._wc WCFS # .at Tid # ._wlink WatchLink watch/receive pins for created mappings # ._filemu threading.Lock # ._filetab {} foid -> _File pass # _File represent isolated file view under Conn. class _File(object): # .wconn Conn # .foid hex of ZBigFile root object ID # .blksize block size of this file # .headf file object of head/file # .pinned {} blk -> rev that wcfs already sent us for this file # .mmaps []_Mapping ↑blk_start mappings of this file pass # _Mapping represents one mapping of _File. class _Mapping(object): # .file _File # .blk_start offset of this mapping in file # .mem mmaped memory pass # XXX property .blk_stop = blk_start + len(mem) // blksize & assert len(mem) % blksize == 0 # connect creates new Conn viewing WCFS state as of @at. @func(WCFS) def connect(wc, at): # -> Conn wconn = Conn() # XXX support !isolated mode wconn._wc = wc wconn.at = at wconn._wlink = WatchLink(wc) wconn._filemu = threading.Lock() wconn._filetab = {} # XXX wg.go(wconn._pinner, xxxctx) return wconn # close releases resources associated with wconn. # XXX what happens to file mmappings? @func(Conn) def close(wconn): # XXX stop/join pinner wconn._wlink.close() # _pinner receives pin messages from wcfs and adjusts wconn mappings. @func(Conn) def _pinner(wconn, ctx): while 1: req = wconn._wlink.recvReq(ctx) if req is None: return # XXX ok? (EOF - when wcfs closes wlink) # we received request to pin/unpin file block. perform it with wconn._filemu: f = wconn._filetab.get(req.foid) if f is None: 1/0 # XXX we are not watching the file - why wcfs sent us this update? # XXX relock wconn -> f ? for mmap in f.mmaps: # XXX use ↑blk_start for binary search if not (mmap.blk_start <= req.blk < mmap.blk_stop): continue # blk ∉ mmap # FIXME check if virtmem did not mapped RW page into this block already mmap._remmapblk(req.blk, req.at) # update f.pinned if req.at is None: f.pinned.pop(req.blk, None) # = delete(f.pinned, req.blk) -- unpin to @head else: f.pinned[req.blk] = req.at # mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state. @func(Conn) def mmap(wconn, foid, blk_start, blk_len): # -> Mapping assert blk_len >= 0 blk_stop = blk_start + blk_len with wconn._filemu: f = wconn._filetab.get(foid) if f is None: f = _File() f.wconn = wconn f.foid = foid f.headf = wconn._wc._open("head/bigfile/%s" % (ashex(foid),), "rb") f.blksize = os.fstat(f.headf.fileno()).st_blksize f.pinned = {} f.mmaps = [] wconn._filetab[foid] = f # XXX relock wconn -> f ? # create memory with head/f mapping and applied pins mem = mm.map_ro(f.headf.fileno(), blk_start*f.blksize, blk_len*f.blksize) mmap = _Mapping() mmap.file = f mmap.blk_start = blk_start mmap.mem = mem for blk, rev in f.pinned.items(): # XXX keep f.pinned ↑blk and use binary search? if not (blk_start <= blk < blk_stop): continue # blk out of this mapping mmap._remmapblk(blk, rev) f.mmaps.append(mmap) # XXX keep f.mmaps ↑blk_start return mmap # _remmapblk remmaps mapping memory for file[blk] to be viewing database as of @at state. # # at=None means unpin to head/ . # NOTE this does not check wrt virtmem already mapped blk as RW XXX ok? @func(_Mapping) def _remmapblk(mmap, blk, at): assert mmap.blk_start <= blk < mmap.blk_stop f = mmap.file if at is None: fsfile = f.headf else: # TODO share @rev fd until wconn is resynced? fsfile = f.wconn._wc._open("@%s/bigfile/%s" % (ashex(at), ashex(f.foid)), "rb") defer(fsfile.close) assert os.fstat(fsfile.fileno()).st_blksize == f.blksize # FIXME assert mm.map_into_ro(mmap.mem[(blk-mmap.blk_start)*blksize:][:blksize], fsfile.fileno(), blk*blksize) # remmap_blk remmaps file[blk] in its place again. # virtmem calls this to remmap a block after RW dirty page was e.g. discarded. @func(_Mapping) def remmap_blk(mmap, blk): # XXX locking assert (mmap.blk_start <= blk < mmap.blk_stop) blkrev = mmap.pinned.get(blk, None) # rev | @head mmap._remmapblk(blk, blkrev) # unmap is removes mapping memory from address space. # virtmem calls this when VMA is unmapped. @func(_Mapping) def unmap(mmap): # XXX locking mm.unmap(mmap.mem) mmap.mem = None mmap.file.mmaps.remove(mmap) # WatchLink represents /head/watch link opened on wcfs. # # .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages. # # XXX safe/not-safe to access from multiple threads? class WatchLink(object): def __init__(wlink, wc): wlink._wc = wc # head/watch handle. # # python/stdio lock file object on read/write, however we need both # read and write to be working simultaneously. # -> use 2 separate file objects for rx and tx. # # fdopen takes ownership of file descriptor and closes it when file # object is closed -> dup fd so that each file object has its own fd. wh = os.open(wc._path("head/watch"), os.O_RDWR) wh2 = os.dup(wh) wlink._wrx = os.fdopen(wh, 'rb') wlink._wtx = os.fdopen(wh2, 'wb') # XXX vvv -> test only? wlink.rx_eof = chan() # becomes ready when wcfs closes its tx side wlink.fatalv = [] # fatal messages received from wcfs # inv.protocol message IO wlink._acceptq = chan() # (stream, msg) server originated messages go here wlink._rxmu = threading.Lock() wlink._rxtab = {} # stream -> rxq server replies go via here wlink._accepted = set() # of stream streams we accepted but did not replied yet wlink._txmu = threading.Lock() # serializes writes wlink._txclosed = False serveCtx, wlink._serveCancel = context.with_cancel(context.background()) wlink._serveWG = sync.WorkGroup(serveCtx) wlink._serveWG.go(wlink._serveRX) # this tWatchLink currently watches the following files at particular state. # XXX back -> tWatchLink ? wlink._watching = {} # {} foid -> tWatch def _closeTX(wlink): if wlink._txclosed: return # ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up # _serveRX on client (= on us). The connection can be already closed by # wcfs - so ignore errors when sending bye. try: wlink._send(1, b'bye') except IOError: pass wlink._wtx.close() wlink._txclosed = True def close(wlink): wlink._closeTX() wlink._serveCancel() # XXX we can get stuck here if wcfs does not behave as we want. # XXX in particular if there is a silly - e.g. syntax or type error in # test code - we currently get stuck here. # # XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour? # XXX -> we now have `kill -QUIT` to wcfs.go on test timeout - remove ^^^ comments? try: wlink._serveWG.wait() except Exception as e: # canceled is expected and ok if e != context.canceled: reraise(e, None, e.__traceback__) wlink._wrx.close() # disable all established watches for w in wlink._watching.values(): w.at = z64 w.pinned = {} wlink._watching = {} # _serveRX receives messages from ._wrx and dispatches them according to streamID. @func def _serveRX(wlink, ctx): # when finishing - wakeup everyone waiting for rx def _(): wlink._acceptq.close() with wlink._rxmu: rxtab = wlink._rxtab wlink._rxtab = None # don't allow new rxtab registers for rxq in rxtab.values(): rxq.close() defer(_) while 1: # NOTE: .close() makes sure ._wrx.read*() will wake up l = wlink._wrx.readline() print('C: watch : rx: %r' % l) if len(l) == 0: # peer closed its tx wlink.rx_eof.close() break # <stream> ... \n stream, msg = l.split(' ', 1) stream = int(stream) msg = msg.rstrip('\n') if stream == 0: # control/fatal message from wcfs # XXX print -> receive somewhere? print('C: watch : rx fatal: %r' % msg) wlink.fatalv.append(msg) continue reply = bool(stream % 2) if reply: with wlink._rxmu: assert stream in wlink._rxtab # XXX !test assert - recheck rxq = wlink._rxtab.pop(stream) _, _rx = select( ctx.done().recv, # 0 (rxq.send, msg), # 1 ) if _ == 0: raise ctx.err() else: with wlink._rxmu: assert stream not in wlink._accepted # XXX !test assert - recheck wlink._accepted.add(stream) _, _rx = select( ctx.done().recv, # 0 (wlink._acceptq.send, (stream, msg)), # 1 ) if _ == 0: raise ctx.err() # _send sends raw message via specified stream. # # multiple _send can be called in parallel - _send serializes writes. # XXX +ctx? def _send(wlink, stream, msg): assert '\n' not in msg pkt = b"%d %s\n" % (stream, msg) wlink._write(pkt) def _write(wlink, pkt): with wlink._txmu: #print('C: watch : tx: %r' % pkt) wlink._wtx.write(pkt) wlink._wtx.flush() # sendReq sends client -> server request and returns server reply. # # only 1 sendReq must be used at a time. # XXX relax? def sendReq(wlink, ctx, req): # -> reply | None when EOF rxq = wlink._sendReq(ctx, req) _, _rx = select( ctx.done().recv, # 0 rxq.recv, # 1 ) if _ == 0: raise ctx.err() return _rx def _sendReq(wlink, ctx, req): # -> rxq stream = 1 # XXX -> dynamic rxq = chan() with wlink._rxmu: assert stream not in wlink._rxtab # XXX !test assert - recheck wlink._rxtab[stream] = rxq wlink._send(stream, req) return rxq # recvReq receives client <- server request. # # multiple recvReq could be used at a time. def recvReq(wlink, ctx): # -> SrvReq | None when EOF _, _rx = select( ctx.done().recv, # 0 wlink._acceptq.recv, # 1 ) if _ == 0: raise ctx.err() rx = _rx if rx is None: return rx stream, msg = rx return SrvReq(wlink, stream, msg) # SrvReq represents 1 server-initiated wcfs request received over /head/watch link. # XXX struct place -> ^^^ (nearby WatchLink) ? class SrvReq(object): def __init__(req, wlink, stream, msg): req.wlink = wlink req.stream = stream req.msg = msg def reply(req, answer): #print('C: reply %s <- %r ...' % (req, answer)) wlink = req.wlink with wlink._rxmu: assert req.stream in wlink._accepted wlink._send(req.stream, answer) with wlink._rxmu: assert req.stream in wlink._accepted wlink._accepted.remove(req.stream) # XXX also track as answered? (and don't accept with the same ID ?) def _parse(req): # -> (foid, blk, at|None) # pin <foid> #<blk> @(<at>|head) m = re.match(b"pin (?P<foid>[0-9a-f]{16}) #(?P<blk>[0-9]+) @(?P<at>[^ ]+)$", req.msg) if m is None: raise RuntimeError("message is not valid pin request: %s" % qq(req.msg)) foid = fromhex(m.group('foid')) blk = int(m.group('blk')) at = m.group('at') if at == "head": at = None else: at = fromhex(at) return foid, blk, at @property def foid(req): return req._parse()[0] @property def blk(req): return req._parse()[1] @property def at(req): return req._parse()[2] # ---- WCFS raw file access ---- # _path returns path for object on wcfs. # - str: wcfs root + obj; # - Persistent: wcfs root + (head|@<at>)/bigfile/obj @func(WCFS) def _path(wc, obj, at=None): if isinstance(obj, Persistent): #assert type(obj) is ZBigFile XXX import cycle objtypestr = type(obj).__module__ + "." + type(obj).__name__ assert objtypestr == "wendelin.bigfile.file_zodb.ZBigFile", objtypestr head = "head/" if at is None else ("@%s/" % ashex(at)) obj = "%s/bigfile/%s" % (head, ashex(obj._p_oid)) at = None assert isinstance(obj, str) assert at is None # must not be used with str return os.path.join(wc.mountpoint, obj) # _read reads file corresponding to obj on wcfs. @func(WCFS) def _read(wc, obj, at=None): path = wc._path(obj, at=at) with open(path, 'rb') as f: # XXX -> readfile return f.read() # _stat stats file corresponding to obj on wcfs. @func(WCFS) def _stat(wc, obj, at=None): path = wc._path(obj, at=at) return os.stat(path) # _open opens file corresponding to obj on wcfs. @func(WCFS) def _open(wc, obj, mode='rb', at=None): path = wc._path(obj, at=at) return open(path, mode, 0) # unbuffered """ # open creates wcfs file handle, which can be mmaped to give data of ZBigFile. # # XXX more text # # All mmapings of one FileH share changes. # There can be several different FileH for the same (at, oid), and those # FileH do not share changes. def open(self, zfile): # -> FileH #assert isinstance(zfile, ZBigFile) # XXX import cycle zconn = zfile._p_jar # XXX ._start is probably ZODB5 only -> check ZODB4 and ZODB3 zat = p64(u64(zconn._storage._start)-1) # before -> at # XXX pinned to @revX/... for now -> TODO /head/bigfile/... path = '%s/@%s/bigfile/%s' % (self.mountpoint, ashex(zat), ashex(zfile._p_oid)) fd = os.open(path, os.O_RDONLY) return FileH(fd) # FileH is handle to opened bigfile/X. # # XXX it mimics BigFileH and should be integrated into virtmem (see fileh_open_overlay) # # XXX it should implement wcfs invalidation protocol and remmap head/... parts # to pinned as requested. import mmap from bigarray import pagesize # XXX hack from bigarray.array_ram import _VMA # XXX hack class FileH(object): # .fd def __init__(self, fd): self.fd = fd def __del__(self): os.close(self.fd) def mmap(self, pgoffset, pglen): return _VMA(self.fd, pgoffset, pglen, pagesize, mmap.PROT_READ) """ # ---- join/run wcfs ---- _wcmu = threading.Lock() _wcregistry = {} # mntpt -> WCFS @func(WCFS) def __init__(wc, mountpoint, fwcfs, proc): wc.mountpoint = mountpoint wc._fwcfs = fwcfs wc._njoin = 1 wc._proc = proc # close must be called to release joined connection after it is no longer needed. @func(WCFS) def close(wc): with _wcmu: wc._njoin -= 1 if wc._njoin == 0: # XXX unmount wcfs as well? wc._fwcfs.close() del _wcregistry[wc.mountpoint] # _default_autostart returns default autostart setting for join. # # Out-of-the-box we want wcfs to be automatically started, to ease developer # experience when wendelin.core is standalone installed. However in environments # like SlapOS, it is more preferable to start and monitor wcfs service explicitly. # SlapOS & co. should thus set $WENDELIN_CORE_WCFS_AUTOSTART=no. def _default_autostart(): autostart = os.environ.get("WENDELIN_CORE_WCFS_AUTOSTART", "yes") autostart = autostart.lower() return {"yes": True, "no": False}[autostart] # join connects to wcfs server for ZODB @ zurl. # # If wcfs for that zurl was already started, join connects to it. # Otherwise it starts wcfs for zurl if autostart is True. # # For the same zurl join returns the WCFS object. def join(zurl, autostart=_default_autostart()): # -> WCFS mntpt = _mntpt_4zurl(zurl) with _wcmu: # check if we already have connection to wcfs server from this process wc = _wcregistry.get(mntpt) if wc is not None: wc._njoin += 1 return wc # no. try opening .wcfs - if we succeed - wcfs is already mounted. try: f = open(mntpt + "/.wcfs/zurl") except IOError as e: if e.errno != ENOENT: raise else: # already have it wc = WCFS(mntpt, f, None) _wcregistry[mntpt] = wc return wc if not autostart: raise RuntimeError("wcfs: join %s: server not started" % zurl) # start wcfs with telling it to automatically exit when there is no client activity. optv_extra = os.environ.get("WENDELIN_CORE_WCFS_OPTIONS", "").split() return _start(zurl, "-autoexit", *optv_extra) # _start starts wcfs server for ZODB @ zurl. # # optv can be optionally given to pass flags to wcfs. # called under _wcmu def _start(zurl, *optv): # -> WCFS mntpt = _mntpt_4zurl(zurl) log.info("wcfs: starting for %s ...", zurl) # XXX errctx "wcfs: start" # spawn wcfs and wait till filesystem-level access to it is ready wc = WCFS(mntpt, None, None) wg = sync.WorkGroup(context.background()) fsready = chan() def _(ctx): # XXX errctx "spawn" argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt] proc = subprocess.Popen(argv, close_fds=True) while 1: ret = proc.poll() if ret is not None: raise RuntimeError("exited with %s" % ret) _, _rx = select( ctx.done().recv, # 0 fsready.recv, # 1 default, # 2 ) if _ == 0: proc.terminate() raise ctx.err() if _ == 1: # startup was ok - don't monitor spawned wcfs any longer wc._proc = proc return time.sleep(0.1) wg.go(_) def _(ctx): # XXX errctx "waitmount" while 1: try: f = open("%s/.wcfs/zurl" % mntpt) except IOError as e: if e.errno != ENOENT: raise else: dotwcfs = f.read() if dotwcfs != zurl: raise RuntimeError(".wcfs/zurl != zurl (%s != %s)" % (qq(dotwcfs), qq(zurl))) wc._fwcfs = f fsready.close() return _, _rx = select( ctx.done().recv, # 0 default, # 1 ) if _ == 0: raise ctx.err() time.sleep(0.1) wg.go(_) wg.wait() assert mntpt not in _wcregistry _wcregistry[mntpt] = wc return wc # ---- misc ---- # _wcfs_exe returns path to wcfs executable. def _wcfs_exe(): return '%s/wcfs' % dirname(__file__) # _mntpt_4zurl returns wcfs should-be mountpoint for ZODB @ zurl. # # it also makes sure the mountpoint exists. def _mntpt_4zurl(zurl): # XXX what if zurl is zconfig://... ? -> then we have to look inside? # -> zstor_2zurl extracts zurl in canonical form and zconfig:// is not possible there. m = hashlib.sha1() m.update(zurl) mntpt = "%s/wcfs/%s" % (tempfile.gettempdir(), m.hexdigest()) _mkdir_p(mntpt) return mntpt # zstor_2zurl converts a ZODB storage to URL to access it. # XXX -> unexport? def zstor_2zurl(zstor): # There is, sadly, no unified way to do it, as even if storages are created via # zodburi, after creation its uri is lost. And storages could be created not # only through URI but e.g. via ZConfig and manually. We want to support all # those cases... # # For this reason extract URL with important for wcfs use-case parameters in # ad-hoc way. if isinstance(zstor, FileStorage): return "file://%s" % (zstor._file_name,) # TODO ZEO + NEO support raise NotImplementedError("don't know how to extract zurl from %r" % zstor) # mkdir -p. def _mkdir_p(path): try: os.makedirs(path) except OSError as e: if e.errno != EEXIST: raise # serve starts and runs wcfs server for ZODB @ zurl. # # it mounts wcfs at a location that is with 1-1 correspondence with zurl. # it then waits for wcfs to exit (either due to unmount or an error). # # it is an error if wcfs was already started. # # XXX optv # if exec_ is True, wcfs is not spawned, but executed into. # # serve(zurl, exec_=False). def serve(zurl, optv, exec_=False): mntpt = _mntpt_4zurl(zurl) # try opening .wcfs - it is an error if we can do it. # XXX -> option to wcfs itself? try: f = open(mntpt + "/.wcfs/zurl") except IOError as e: if e.errno != ENOENT: raise else: f.close() raise RuntimeError("wcfs: start %s: already started" % zurl) # seems to be ok to start # XXX race window if something starts after ^^^ check argv = [_wcfs_exe()] + list(optv) + [zurl, mntpt] if not exec_: subprocess.check_call(argv, close_fds=True) else: os.execv(argv[0], argv) # if called as main just -> serve() def main(): argv = sys.argv[1:] # XXX usage zurl = argv[-1] # -a -b zurl -> zurl optv = argv[:-1] # -a -b zurl -> -a -b serve(zurl, optv, exec_=True)