// Copyright (C) 2018-2020  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

// Package wcfs provides WCFS client integrated with user-space virtual memory manager.
// See wcfs.h for package overview.


// Wcfs client organization
//
// Wcfs client provides to its users isolated bigfile views backed by data on
// WCFS filesystem. In the absence of Isolation property, wcfs client would
// reduce to just directly using OS-level file wcfs/head/f for a bigfile f. On
// the other hand there is a simple, but inefficient, way to support isolation:
// for @at database view of bigfile f - directly use OS-level file wcfs/@at/f.
// The latter works, but is very inefficient because OS-cache for f data is not
// shared in between two connections with @at1 and @at2 views. The cache is
// also lost when connection view of the database is resynced on transaction
// boundary. To support isolation efficiently, wcfs client uses wcfs/head/f
// most of the time, but injects wcfs/@revX/f parts into mappings to maintain
// f@at view driven by pin messages that wcfs server sends to client in
// accordance to WCFS isolation protocol(*).
//
// Wcfs server sends pin messages synchronously triggered by access to mmaped
// memory. That means that a client thread, that is accessing wcfs/head/f mmap,
// is completely blocked while wcfs server sends pins and waits to receive acks
// from all clients. In other words on-client handling of pins has to be done
// in separate thread, because wcfs server can also send pins to client that
// triggered the access.
//
// Wcfs client implements pins handling in so-called "pinner" thread(+). The
// pinner thread receives pin requests from wcfs server via watchlink handle
// opened through wcfs/head/watch. For every pin request the pinner finds
// corresponding Mappings and injects wcfs/@revX/f parts via Mapping._remmapblk
// appropriately.
//
// The same watchlink handle is used to send client-originated requests to wcfs
// server. The requests are sent to tell wcfs that client wants to observe a
// particular bigfile as of particular revision, or to stop watching for it.
// Such requests originate from regular client threads - not pinner - via entry
// points like Conn.open, Conn.resync and FileH.close.
//
// Every FileH maintains fileh._pinned {} with currently pinned blk -> rev. This
// dict is updated by pinner driven by pin messages, and is used when either
// new fileh Mapping is created (FileH.mmap) or refreshed due to request from
// virtmem (Mapping.remmap_blk, see below).
//
// In wendelin.core a bigfile has semantic that it is infinite in size and
// reads as all zeros beyond region initialized with data. Memory-mapping of
// OS-level files can also go beyond file size, however accessing memory
// corresponding to file region after file.size triggers SIGBUS. To preserve
// wendelin.core semantic wcfs client mmaps-in zeros for Mapping regions after
// wcfs/head/f.size. For simplicity it is assumed that bigfiles only grow and
// never shrink. It is indeed currently so, but will have to be revisited
// if/when wendelin.core adds bigfile truncation. Wcfs client restats
// wcfs/head/f at every transaction boundary (Conn.resync) and remembers f.size
// in FileH._headfsize for use during one transaction.
//
//
// Integration with wendelin.core virtmem layer
//
// Wcfs client integrates with virtmem layer to support virtmem handle
// dirtying pages of read-only base-layer that wcfs client provides via
// isolated Mapping. For wcfs-backed bigfiles every virtmem VMA is interlinked
// with Mapping:
//
//       VMA     -> BigFileH -> ZBigFile -----> Z
//        ↑↓                                    O
//      Mapping  -> FileH    -> wcfs server --> DB
//
// When a page is write-accessed, virtmem mmaps in a page of RAM in place of
// accessed virtual memory, copies base-layer content provided by Mapping into
// there, and marks that page as read-write.
//
// Upon receiving pin message, the pinner consults virtmem, whether
// corresponding page was already dirtied in virtmem's BigFileH (call to
// __fileh_page_isdirty), and if it was, the pinner does not remmap Mapping
// part to wcfs/@revX/f and just leaves dirty page in its place, remembering
// pin information in fileh._pinned.
//
// Once dirty pages are no longer needed (either after discard/abort or
// writeout/commit), virtmem asks wcfs client to remmap corresponding regions
// of Mapping in its place again via calls to Mapping.remmap_blk for previously
// dirtied blocks.
//
// The scheme outlined above does not need to split Mapping upon dirtying an
// inner page.
//
// See bigfile_ops interface (wendelin/bigfile/file.h) that explains base-layer
// and overlaying from virtmem point of view. For wcfs this interface is
// provided by small wcfs client wrapper in bigfile/file_zodb.cpp.
//
// --------
//
// (*) see wcfs.go documentation for WCFS isolation protocol overview and details.
// (+) currently, for simplicity, there is one pinner thread for each connection.
//     In the future, for efficiency, it might be reworked to be one pinner thread
//     that serves all connections simultaneously.


// Wcfs client locking organization
//
// XXX locking -> explain atMu + slaves and refer to "Locking" in wcfs.go
//
// Conn.atMu > Conn.mu > FileH.mu
//
// Several locks are RWMutex instead of just Mutex not only to allow more
// concurrency, but, in the first place for correctness: pinner thread being
// core element in handling WCFS isolation protocol, is effectively invoked
// synchronously from other threads via messages coming through wcfs server.
// For example Conn.resync sends watch request to wcfs and waits for the
// answer. Wcfs server, in turn, might send corresponding pin messages to the
// pinner and _wait_ for the answer before answering to resync:
//
//        - - - - - -
//       |       .···|·····.        ---->   = request
//          pinner <------.↓        <····   = response
//       |           |   wcfs
//          resync -------^↓
//       |      `····|·····
//        - - - - - -
//       client process
//
// This creates the necessity to use RWMutex for locks that pinner and other
// parts of the code could be using at the same time in synchronous scenarious
// similar to the above. This locks are:
//
//      - Conn.atMu
//      - Conn.mu
//
// XXX pinner takes the following locks (XXX recheck)
//
//      - wconn.mu.W
//      - wconn.mu.R
//
//      - wconn.atMu.R
//      - wconn.mu.R
//      - fileh.mu (R:.mmaps W:.pinned)
//
//      - virt_lock
//
//


#include "wcfs_misc.h"
#include "wcfs.h"
#include "wcfs_watchlink.h"

#include <wendelin/bigfile/virtmem.h>
#include <wendelin/bigfile/ram.h>

#include <golang/errors.h>
#include <golang/fmt.h>
#include <golang/io.h>

#include <algorithm>
#include <string>
#include <vector>

#include <sys/types.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>

using std::min;
using std::max;
using std::vector;

// wcfs::
namespace wcfs {

static error mmap_zero_into_ro(void *addr, size_t size);
static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size);
static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset);

// connect creates new Conn viewing WCFS state as of @at.
pair<Conn, error> WCFS::connect(zodb::Tid at) {
    WCFS *wc = this;
    xerr::Contextf E("wcfs %s: connect @%s", v(wc->mountpoint), v(at));

    // TODO support !isolated mode

    WatchLink wlink;
    error err;
    tie(wlink, err) = wc->_openwatch();
    if (err != nil)
        return make_pair(nil, E(err));

    Conn wconn = adoptref(new _Conn());
    wconn->_wc      = wc;
    wconn->at       = at;
    wconn->_wlink   = wlink;

    context::Context pinCtx;
    tie(pinCtx, wconn->_pinCancel) = context::with_cancel(context::background());
    wconn->_pinWG = sync::NewWorkGroup(pinCtx);
    wconn->_pinWG->go([wconn](context::Context ctx) -> error {
        return wconn->_pinner(ctx);
    });

    // need to wait till `wcfs/head/at ≥ at` because e.g. Conn.open stats
    // head/f to get f.headfsize.
    // XXX atMu.RLock ?
    err = wconn->_headWait(at);
    if (err != nil) {
        // XXX bring conn down - stop pinner
        return make_pair(nil, E(err));
    }

    return make_pair(wconn, nil);
}


// _headWait waits till wcfs/head/at becomes ≥ at.
//
// XXX locks condition?
// XXX place?
error _Conn::_headWait(zodb::Tid at) {  // XXX +ctx ?
    // FIXME implement
    return nil;
}

static global<error> errConnClosed = errors::New("connection closed");

// close releases resources associated with wconn.
//
// opened fileh and mappings become invalid to use except close and unmap.
error _Conn::close() {
    _Conn& wconn = *this;

    wconn._atMu.RLock();
    defer([&]() {
        wconn._atMu.RUnlock();
    });

    xerr::Contextf E("%s: close", v(wconn));

    error err, eret;
    auto reterr1 = [&eret](error err) {
        if (eret == nil && err != nil)
            eret = err;
    };

    bool alreadyClosed = false;
    wconn._mu.Lock();
    alreadyClosed = (wconn._downErr == errConnClosed);
    wconn._downErr = errConnClosed;
    wconn._mu.Unlock();
    if (alreadyClosed)
        return nil;

    // close wlink and signal to pinner to stop outside of wconn.mu
    err = wconn._wlink->close();    // XXX ok under atMu?
    if (err != nil)
        reterr1(err);
    wconn._pinCancel();
    err = wconn._pinWG->wait();     // XXX ok under atMu?
    if (!errors::Is(err, context::canceled)) // canceled - ok
        reterr1(err);

    // pinner is stopped - now close all files - both that have no mappings and
    // that still have opened mappings.
    //
    // NOTE after file is closed mappings could continue to survive, but we can no
    // longer maintain consistent view. For this reason we change mappings to
    // something that gives EFAULT on access. XXX implement
    wconn._mu.Lock();
    defer([&]() {
        wconn._mu.Unlock();
    });

    // XXX f locking
    for (auto _ : wconn._filehTab) {
        auto f = _.second;
        err = f->_headf->close(); // XXX mark fileh as down so that fileh.close does not say "bad fd"
        if (err != nil)
            reterr1(err);
        f->_headf = nil;

        // XXX stop watching f      XXX ok under mu?
    }

    wconn._filehTab.clear();

    return E(eret);
}

// _pinner receives pin messages from wcfs and adjusts wconn file mappings.
error _Conn::_pinner(context::Context ctx) {
    _Conn& wconn = *this;
    error err = wconn.__pinner(ctx);

    // if pinner fails, wcfs will kill us.
    // log pinner error so that the error is not hidden.
    // print to stderr as well as by default log does not print to there.
    // XXX also catch panic/exc ?
    if (!(err == nil || errors::Is(err, context::canceled))) { // canceled = .close asks pinner to stop
        log::Fatalf("CRITICAL: %s", v(err));
        log::Fatalf("CRITICAL: wcfs server will likely kill us soon.");
        fprintf(stderr, "CRITICAL: %s\n", v(err));
        fprintf(stderr, "CRITICAL: wcfs server will likely kill us soon.\n");
    }

    // mark the connection non-operational if the pinner fails
    // XXX deadlock wrt resync? (who read-locks wconn.mu)
    // XXX -> mu -> downMu ?
    wconn._mu.Lock();   // XXX locking ok? -> merge into below where lock is held?
    if (wconn._downErr == nil) {
        wconn._downErr = fmt::errorf("no longer operational due to: %w",
                                        err != nil ? err : fmt::errorf("pinner exit"));
        // XXX make all fileh and mapping invalid.
    }
    wconn._mu.Unlock();

    return err;
}

error _Conn::__pinner(context::Context ctx) {
    _Conn& wconn = *this;
    xerr::Contextf E("pinner"); // NOTE pinner error goes to Conn::close who has its own context

    PinReq req;
    error  err;

    while (1) {
        err = wconn._wlink->recvReq(ctx, &req);
        if (err != nil) {
            // it is ok if we receive EOF due to us (client) closing the connection
            if (err == io::EOF_) {
                wconn._mu.RLock();
                err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF;
                wconn._mu.RUnlock();
            }
            return E(err);
        }

        // we received request to pin/unpin file block. handle it
        err = wconn._pin1(&req);
        if (err != nil) {
            return E(err);
        }
    }
}

// pin1 handles one pin request received from wcfs.
error _Conn::_pin1(PinReq *req) {
    _Conn& wconn = *this;
    xerr::Contextf E("pin f<%s> #%ld @%s", v(req->foid), req->blk, v(req->at));

    error err = wconn.__pin1(req);

    // reply either ack or nak on error
    string ack = "ack";
    if (err != nil)
        ack = fmt::sprintf("nak: %s", v(err));
    // NOTE ctx=bg to always send reply even if we are canceled
    error err2 = wconn._wlink->replyReq(context::background(), req, ack);
    if (err == nil)
        err = err2;

    return E(err);
}

error _Conn::__pin1(PinReq *req) {
    _Conn& wconn = *this;

    FileH f;
    bool  ok;

    wconn._atMu.RLock();
    defer([&]() {
        wconn._atMu.RUnlock();
    });

    // XXX deadlock wrt Conn.resync which locks wconn.mu and does "watch" ?
    wconn._mu.RLock();
    // XXX +incref f, so that simultaneous close does not remove f from wconn.filehTab ?
    // XXX or just make FileH.close lock f too to synchronize with pinner?
    tie(f, ok) = wconn._filehTab.get_(req->foid);
    if (!ok) {
        wconn._mu.RUnlock();
        // why wcfs sent us this update?
        return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid));
    }

    // XXX <- f._openReady ?

    wconn._mu.RUnlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close?
    f->_mu.lock();
    defer([&]() {
        f->_mu.unlock();
    });

    for (auto mmap : f->_mmaps) {   // TODO use ↑blk_start for binary search
        if (!(mmap->blk_start <= req->blk && req->blk < mmap->blk_stop()))
            continue;   // blk ∉ mmap

        //trace("\tremmapblk %d @%s" % (req->blk, (v(req.at) if req.at else "head")))

        // pin only if virtmem did not dirtied page corresponding to this block already
        // if virtmem dirtied the page - it will ask us to remmap it again after commit or abort.
        bool do_pin= true;
        error err;
        if (mmap->vma != nil) {
            mmap->_assertVMAOk();
            // XXX recheck wrt Mapping.unmap: there it locks:
            // 1. virt_lock
            // 2. f.mu
            //
            // -> if here we do
            // 1. f.mu
            // 2. virt_lock
            //
            // -> deadlock
            virt_lock();
            BigFileH *virt_fileh = mmap->vma->fileh;
            TODO (mmap->fileh->blksize != virt_fileh->ramh->ram->pagesize);
            do_pin = !__fileh_page_isdirty(virt_fileh, req->blk);
        }

        if (do_pin)
            err = mmap->_remmapblk(req->blk, req->at);

        if (mmap->vma != nil)
            virt_unlock();

        // on error don't need to continue with other mappings - all fileh and
        // all mappings become marked invalid on pinner failure.
        // XXX all call wconn._down from here under wconn._mu lock?
        if (err != nil)
            return err;

        //trace("\t-> remmaped");   XXX
    }

    // update f._pinned
    if (req->at == TidHead) {
        f->_pinned.erase(req->blk);      // unpin to @head
    }
    else {
        f->_pinned[req->blk] = req->at;
    }

    return nil;
}

// resync resyncs connection and its file mappings onto different database view.
//
// bigfile/_file_zob.pyx arranges to call Conn.resync at transaction boundaries
// to keep Conn view in sync with updated zconn database view.
error _Conn::resync(zodb::Tid at) {
    _Conn& wconn = *this;
    error err;

    wconn._atMu.RLock();
    xerr::Contextf E("%s: resync -> @%s", v(wconn), v(at));
    wconn._atMu.RUnlock();

    // XXX downErr -> E
    // XXX at ^ (increases)

    // first wait for wcfs/head to be >= at.
    // we need this e.g. to be sure that head/f.size is at least as big that it will be @at state.
    err = wconn._headWait(at);
    if (err != nil)
        return err; // XXX -> wconn down on err ?


    // lock wconn._atMu.W . This excludes everything else, and in
    // particular _pinner_, from running and mutating files and mappings.
    //
    // NOTE we'll relock atMu as R in the second part of resync, so we prelock
    // wconn._mu.R as well while under atMu.W, to be sure that set of opened
    // files stays the same during whole resync.
    bool atMuWLocked = true;
    wconn._atMu.Lock();
    wconn._mu.RLock();
    defer([&]() {
        wconn._mu.RUnlock();
        if (atMuWLocked)
            wconn._atMu.Unlock();
        else
            wconn._atMu.RUnlock();
    });

    err = wconn._downErr;
    if (err != nil)
        return E(err);

    bool retok = false;
    defer([&]() {
        if (!retok)
            //panic("TODO: bring wconn + fileh + mmaps down on error");  // XXX
            fprintf(stderr, "\n\nTODO: bring wconn + fileh + mmaps down on error\n\n\n");
    });

    // set new wconn.at early, so that e.g. Conn.open running simultaneously
    // to second part of resync (see below) uses new at.
    // XXX no need since wconn._mu is locked? -> no - it is *needed* after wconn.mu became RWMutex
    wconn.at = at;

    // go through all files opened under wconn and pre-adjust their mappings
    // for viewing data as of new @at state.
    //
    // We are still holding atMu.W, so we are the only mutators of mappings,
    // because, in particular, pinner is not running.
    //
    // Don't send watch updates for opened files to wcfs yet - without running
    // pinner those updates will get stuck.
    for (auto fit : wconn._filehTab) {
        //zodb::Oid  foid = fit.first;
        FileH        f    = fit.second;

        // TODO if file has no mappings and was not used during whole prev
        // cycle - forget and stop watching it

        // XXX not yet ready f

        // update f._headfsize and remmap to head/f zero regions that are now covered by head/f
        struct stat st;
        err = f->_headf->stat(&st);
        if (err != nil)
            return E(err);

        if ((size_t)st.st_blksize != f->blksize)    // blksize must not change
            return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize));
        auto headfsize = st.st_size;
        if (!(f->_headfsize <= headfsize))          // head/file size ↑=
            return E(fmt::errorf("wcfs bug: head/file size not ↑="));
        if (!(headfsize % f->blksize == 0))
            return E(fmt::errorf("wcfs bug: head/file size %% blksize != 0"));

        // replace zero regions in f mappings in accordance to adjusted f._headfsize.
        // NOTE it is ok to access f._mmaps without locking f._mu because we hold wconn.atMu.W
        for (auto mmap : f->_mmaps) {
            //printf("  resync -> %s: unzero [%lu:%lu)", v(at), f->_headfsize/f->blksize, headfsize/f->blksize);
            uint8_t *mem_unzero_start = min(mmap->mem_stop,
                            mmap->mem_start + (f->_headfsize - mmap->blk_start*f->blksize));
            uint8_t *mem_unzero_stop  = min(mmap->mem_stop,
                            mmap->mem_start + (   headfsize  - mmap->blk_start*f->blksize));

            if (mem_unzero_stop - mem_unzero_start > 0) {
                err = mmap_into_ro(mem_unzero_start, mem_unzero_stop-mem_unzero_start, f->_headf, f->_headfsize);
                if (err != nil)
                    return E(err);
            }
        }

        f->_headfsize = headfsize;
    }

    // atomically downgrade atMu.W to atMu.R before issuing watch updates to wcfs.
    // - we need atMu to be not Wlocked, because under atMu.W pinner cannot run simultaneously to us.
    // - we need to hold atMu.R to avoid race wrt e.g. other resync which changes at.
    // - we cannot just do regular `atMu.Unlock + atMu.RLock()` because then
    //   there is e.g. a race window in between Unlock and RLock where wconn.at can be changed.
    //   XXX also deadlock, because it will become wconn._mu.lock + wconn._atMu lock
    //
    // Now other calls, e.g. Conn.open, can be running simultaneously to us,
    // but since we already set wconn.at to new value it is ok. For example
    // Conn.open, for not-yet-opened file, will use new at to send "watch".
    // XXX ^^^ not possible since wconn._mu is locked ?
    //     -> no, possible, wconn._mu became RWMutex
    //
    // XXX we are still holding wconn._mu.R, so wconn._filehTab is the
    // same as in previous pass above.
    wconn._atMu.UnlockToRLock();
    atMuWLocked = false;

    // send watch updates to wcfs.
    // the pinner is now running and will be able to serve pin requests triggered by out watch.
    for (auto fit : wconn._filehTab) {
        zodb::Oid  foid = fit.first;
        //FileH    f    = fit.second;

        // XXX need to lock f.mu because wconn.atMu is only R now.
        // XXX need to coordinate with e.g. FileH.close -> "if f.state != CLOSING" ?
        // XXX need to coordinate with e.g. Conn.open   -> "if f.state != OPENING" ?
        string ack;
        // XXX f._watchMu.lock() + unlock()
        // XXX + recheck status before sending the watch?
        tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(at)));
        if (err != nil)
            return E(err);
        if (ack != "ok") {
            return E(fmt::errorf("%s", v(ack)));
        }
    }

//  wconn.at = at;
    retok = true;
    return nil;
}

// open opens FileH corresponding to ZBigFile foid.
pair<FileH, error> _Conn::open(zodb::Oid foid) {
    _Conn& wconn = *this;
    error err;

    wconn._atMu.RLock();
    defer([&]() {
        wconn._atMu.RUnlock();
    });

    xerr::Contextf E("%s: open f<%s>", v(wconn), v(foid));

    wconn._mu.Lock();

    if (wconn._downErr != nil) {
        err = wconn._downErr;
        wconn._mu.Unlock();
        return make_pair(nil, E(err));
    }

    FileH f; bool ok;
    tie(f, ok) = wconn._filehTab.get_(foid);
    if (ok) {
        f->_nopen++; // XXX lock by f.mu ?
        wconn._mu.Unlock();

        // wait for: "opening" -> opened
        f->_openReady.recv();
        if (f->_openErr != nil) {
            // don't care about f->_nopen-- since f is not returned anywhere
            return make_pair(nil, E(f->_openErr));
        }

        return make_pair(f, nil);
    }

    // create "opening" FileH entry and perform open with wconn._mu released
    // NOTE wconn._atMu.R is still held because FileH._open relies on wconn.at being stable.
    f = adoptref(new _FileH());
    f->wconn      = newref(&wconn);
    f->foid       = foid;
    f->_openReady = makechan<structZ>();
    f->_openErr   = nil;
    f->_headf     = nil;
    f->blksize    = 0;
    f->_headfsize = 0;
    f->_nopen     = 1;
    f->_closed    = false;

    bool retok = false;
    wconn._filehTab[foid] = f;
    defer([&]() {
        if (!retok) {
            wconn._mu.Lock();
            // don't care about f->_nopen-- since f is not returned anywhere
            if (wconn._filehTab.get(foid) != f) {
                wconn._mu.Unlock();
                panic("BUG: wconn.open: wconn.filehTab[foid] mutated while file open was in progress");
            }
            wconn._filehTab.erase(foid);
            wconn._mu.Unlock();
        }
        f->_openReady.close();
    });
    wconn._mu.Unlock();

    f->_openErr = f->_open();
    if (f->_openErr != nil)
        return make_pair(nil, E(f->_openErr));

    retok = true;
    return make_pair(f, nil);
}

// _open performs actual open of FileH marked as "in-flight-open" in wconn.filehTab.
//
// Called with:
// - wconn.atMu held
// - wconn.mu   not locked
// - f.mu       not locked
error _FileH::_open() {
    _FileH* f = this;
    Conn    wconn = f->wconn;
    error err;

    tie(f->_headf, err)
                = wconn->_wc->_open(fmt::sprintf("head/bigfile/%s", v(foid)));
    if (err != nil)
        return err;

    bool retok = false;
    defer([&]() {
        if (!retok)
            f->_headf->close();
    });

    struct stat st;
    err = f->_headf->stat(&st);
    if (err != nil)
        return err;
    f->blksize    = st.st_blksize;
    f->_headfsize = st.st_size;
    if (!(f->_headfsize % f->blksize == 0))
        return fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0",
                        v(f->_headf->name()), f->_headfsize, f->blksize);

    // start watching f
    // NOTE we are _not_ holding wconn.mu nor f.mu - only wconn.atMu to rely on wconn.at being stable.
    // NOTE wcfs will reply "ok" only after wcfs/head/at ≥ wconn.at
    string ack;
    // XXX f._watchMu.lock() + unlock ?
    // XXX + recheck status before sending the watch?
    tie(ack, err) = wconn->_wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn->at)));
    if (err != nil)
        return err;
    if (ack != "ok")
        return fmt::errorf("watch: %s", v(ack));

    retok = true;
    return nil;
}

// close releases resources associated with FileH.
//
// Left fileh mappings become invalid to use except unmap.
error _FileH::close() {
    _FileH& fileh = *this;
    Conn    wconn = fileh.wconn;

    // XXX locking ok?
    wconn->_atMu.RLock();
    fileh._mu.lock();       // XXX for fileh ._nopen, ._closed  ->better use wconn._mu for those too?
    defer([&]() {
        fileh._mu.unlock();
        wconn->_atMu.RUnlock();
    });

    // fileh.close can be called several times. just return nil for second close.
    if (fileh._closed)
        return nil;

    // decref open count; do real close only when last open goes away.
    if (fileh._nopen <= 0)
        panic("BUG: fileh.close: fileh._nopen <= 0");
    fileh._nopen--;
    if (fileh._nopen > 0)
        return nil;

    // last open went away - real close.
    xerr::Contextf E("%s: close f<%s>", v(wconn), v(fileh.foid));

    error err, eret;
    auto reterr1 = [&eret](error err) {
        if (eret == nil && err != nil)
            eret = err;
    };

    // XXX change all fileh.mmaps to cause EFAULT on any access after fileh.close

    // stop watching f          XXX ok under f.mu ?
    string ack;
    // XXX f._watchMu.lock() + unlock
    // XXX + recheck status before sending the watch?
    tie(ack, err) = wconn->_wlink->sendReq(context::background(), fmt::sprintf("watch %s -", v(foid)));
    if (err != nil)
        reterr1(err);
    else if (ack != "ok")
        reterr1(fmt::errorf("unwatch: %s", v(ack)));

    // remove fileh from wconn._filehTab
    wconn->_mu.Lock();  // FIXME lock order vs fileh._mu
    if (wconn->_filehTab.get(fileh.foid)._ptr() != &fileh)
        panic("BUG: fileh.close: wconn.filehTab[fileh.foid] != fileh");
    wconn->_filehTab.erase(fileh.foid);
    wconn->_mu.Unlock();

    reterr1(fileh._headf->close());

    fileh._closed = true;
    return E(eret);
}

// mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state.
//
// If vma != nil, created mapping is associated with that vma of user-space virtual memory manager:
// virtmem calls FileH::mmap under virtmem lock when virtmem fileh is mmapped into vma.
pair<Mapping, error> _FileH::mmap(int64_t blk_start, int64_t blk_len, VMA *vma) {
    _FileH& f = *this;

    // NOTE virtmem lock is held by virtmem caller
    // XXX locking ok?
    f.wconn->_atMu.RLock(); // e.g. f._headfsize
    f._mu.lock();           // f._pinned, f._mmaps
    defer([&]() {
        f._mu.unlock();
        f.wconn->_atMu.RUnlock();
    });

    xerr::Contextf E("%s: mmap f<%s> [blk%ld +blk%ld)", v(f.wconn), v(f.foid), blk_start, blk_len);

    if (f._closed)  // XXX locking
        return make_pair(nil, E(os::ErrClosed));

    error err;

    if (blk_start < 0)
        panic("blk_start < 0");
    if (blk_len < 0)
        panic("blk_len < 0");

    int64_t blk_stop; // = blk_start + blk_len
    if (__builtin_add_overflow(blk_start, blk_len, &blk_stop))
        panic("blk_start + blk_len overflow int64");

    int64_t stop;//  = blk_stop *f.blksize;
    if (__builtin_mul_overflow(blk_stop, f.blksize, &stop))
        panic("(blk_start + blk_len)*f.blksize overflow int64");
    int64_t start    = blk_start*f.blksize;


    // create memory with head/f mapping and applied pins
    // mmap-in zeros after f.size (else access to memory after file.size will raise SIGBUS)
    uint8_t *mem_start, *mem_stop;
    tie(mem_start, err) = mmap_ro(f._headf, start, blk_len*f.blksize);
    if (err != nil)
        return make_pair(nil, E(err));
    mem_stop = mem_start + blk_len*f.blksize;

    bool retok = false;
    defer([&]() {
        if (!retok)
            mm::unmap(mem_start, mem_stop - mem_start); // ignore error
    });

    if (stop > f._headfsize) {
        uint8_t *zmem_start = mem_start + (max(f._headfsize/*XXX -1 ?*/, start) - start);
        err = mmap_zero_into_ro(zmem_start, mem_stop - zmem_start);
        if (err != nil)
            return make_pair(nil, E(err));
    }

    Mapping mmap = adoptref(new _Mapping());
    mmap->fileh     = newref(&f);
    mmap->blk_start = blk_start;
    mmap->mem_start = mem_start;
    mmap->mem_stop  = mem_stop;
    mmap->vma       = vma;

    for (auto _ : f._pinned) {  // TODO keep f._pinned ↑blk and use binary search
        int64_t    blk = _.first;
        zodb::Tid  rev = _.second;
        if (!(blk_start <= blk && blk < blk_stop))
            continue;   // blk ∉ this mapping
        err = mmap->_remmapblk(blk, rev);
        if (err != nil)
            return make_pair(nil, E(err));
    }

    if (vma != nil) {
        if (vma->mmap_overlay_server != nil)
            panic("vma is already associated with overlay server");
        if (!(vma->addr_start == 0 && vma->addr_stop == 0))
            panic("vma already covers !nil virtual memory area");
        mmap->incref(); // vma->mmap_overlay_server is keeping ref to mmap
        vma->mmap_overlay_server = mmap._ptr();
        vma->addr_start = (uintptr_t)mmap->mem_start;
        vma->addr_stop  = (uintptr_t)mmap->mem_stop;
        mmap->_assertVMAOk(); // just in case
    }

    f._mmaps.push_back(mmap);   // TODO keep f._mmaps ↑blk_start

    retok = true;
    return make_pair(mmap, nil);
}

// unmap releases mapping memory from address space.
//
// After call to unmap the mapping must no longer be used.
// The association in between mapping and linked virtmem VMA is reset.
//
// Virtmem calls Mapping.unmap under virtmem lock when VMA is unmapped.
error _Mapping::unmap() {
    Mapping mmap = newref(this); // newref for std::remove
    FileH f = mmap->fileh;

    // NOTE virtmem lock is held by virtmem caller
    // XXX locking ok?
    f->wconn->_atMu.RLock();
    f->_mu.lock();          // f._mmaps
    defer([&]() {
        f->_mu.unlock();
        f->wconn->_atMu.RUnlock();
    });

    xerr::Contextf E("%s: f<%s>: unmap", v(f->wconn), v(f->foid));

    if (mmap->vma != nil) {
        mmap->_assertVMAOk();
        VMA *vma = mmap->vma;
        vma->mmap_overlay_server = nil;
        mmap->decref(); // vma->mmap_overlay_server was holding a ref to mmap
        vma->addr_start = 0;
        vma->addr_stop  = 0;
        mmap->vma = nil;
    }

    error err = mm::unmap(mmap->mem_start, mmap->mem_stop - mmap->mem_start);
    mmap->mem_start = nil;
    mmap->mem_stop  = nil;
    // XXX clear other fields?

    // XXX do it first? (to avoid pinner going through f.mmaps and hitting unmapped memory)
    //     -> no need: both pinner and unmap lock on f.mu
    //f->_mmaps.remove(mmap);
    f->_mmaps.erase(
        std::remove(f->_mmaps.begin(), f->_mmaps.end(), mmap),
        f->_mmaps.end());

    return E(err);
}

// _remmapblk remmaps mapping memory for file[blk] to be viewing database as of @at state.
//
// at=TidHead means unpin to head/ .
// NOTE this does not check whether virtmem already mapped blk as RW.
//
// The following locks must be held by caller:
// - f.wconn.atMu
// - f._mu
error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) {
    _Mapping *mmap = this;
    FileH f = mmap->fileh;
    xerr::Contextf E("%s: f<%s>: remmapblk #%ld @%s", v(f->wconn), v(f->foid), blk, v(at));

    ASSERT(mmap->blk_start <= blk && blk < mmap->blk_stop());
    error err;

    uint8_t *blkmem = mmap->mem_start + (blk - mmap->blk_start)*f->blksize;
    os::File fsfile;
    bool fclose = false;
    if (at == TidHead) {
        fsfile = f->_headf;
    }
    else {
        // TODO share @rev fd until wconn is resynced?
        tie(fsfile, err) = f->wconn->_wc->_open(
                fmt::sprintf("@%s/bigfile/%s", v(at), v(f->foid)));
        if (err != nil)
            return E(err);
        fclose = true;
    }
    defer([&]() {
        if (fclose)
            fsfile->close();
    });

    struct stat st;
    err = fsfile->stat(&st);
    if (err != nil)
        return E(err);
    if ((size_t)st.st_blksize != f->blksize)
        return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize));

    // block is beyond file size - mmap with zeros - else access to memory
    // after file.size will raise SIGBUS. (assumes head/f size ↑=)
    if ((blk+1)*f->blksize > (size_t)st.st_size) {
        err = mmap_zero_into_ro(blkmem, 1*f->blksize);
        if (err != nil)
            return E(err);
    }
    // block is inside file - mmap in file data
    else {
        err = mmap_into_ro(blkmem, 1*f->blksize, fsfile, blk*f->blksize);
        if (err != nil)
            return E(err);
    }

    return nil;
}

// remmap_blk remmaps file[blk] in its place again.
//
// Virtmem calls Mapping.remmap_blk under virtmem lock to remmap a block after
// RW dirty page was e.g. discarded.
error _Mapping::remmap_blk(int64_t blk) {
    _Mapping& mmap = *this;
    FileH f = mmap.fileh;

    // NOTE virtmem lock is held by virtmem caller
    // XXX locking ok?
    f->wconn->_atMu.RLock();
    f->_mu.lock();
    defer([&]() {
        f->_mu.unlock();
        f->wconn->_atMu.RUnlock();
    });

    if (!(mmap.blk_start <= blk && blk < mmap.blk_stop()))
        panic("remmap_blk: blk out of Mapping range");

    // blkrev = rev | @head
    zodb::Tid blkrev; bool ok;
    tie(blkrev, ok) = f->_pinned.get_(blk);
    if (!ok)
        blkrev = TidHead;

    error err = mmap._remmapblk(blk, blkrev);
    if (err != nil)
        return err; // errctx is good in _remmapblk

    return nil;
}

// ---- WCFS raw file access ----

// _path returns path for object on wcfs.
// - str:        wcfs root + obj;
string WCFS::_path(const string &obj) {
    WCFS *wc = this;

    return wc->mountpoint + "/" + obj;
}

tuple<os::File, error> WCFS::_open(const string &path, int flags) {
    WCFS *wc = this;
    string path_ = wc->_path(path);
    return os::open(path_, flags);
}


// ---- misc ----

// mmap_zero_into_ro mmaps read-only zeros into [addr +size) so that region is all zeros.
// created mapping, even after it is accessed, does not consume memory.
static error mmap_zero_into_ro(void *addr, size_t size) {
    xerr::Contextf E("mmap zero");

    // mmap /dev/zero with MAP_NORESERVE and MAP_SHARED
    // this way the mapping will be able to be read, but no memory will be allocated to keep it.
    os::File z;
    error err;
    tie(z, err) = os::open("/dev/zero");
    if (err != nil)
        return E(err);
    defer([&]() {
        z->close();
    });
    err = mm::map_into(addr, size, PROT_READ, MAP_SHARED | MAP_NORESERVE, z, 0);
    if (err != nil)
        return E(err);
    return nil;
}

// mmap_ro mmaps read-only fd[offset +size).
// The mapping is created with MAP_SHARED.
static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size) {
    return mm::map(PROT_READ, MAP_SHARED, f, offset, size);
}

// mmap_into_ro mmaps read-only fd[offset +size) into [addr +size).
// The mapping is created with MAP_SHARED.
static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset) {
    return mm::map_into(addr, size, PROT_READ, MAP_SHARED, f, offset);
}


// _assertVMAOk() verifies that mmap and vma are related to each other and cover
// exactly the same virtual memory range.
//
// It panics if mmap and vma do not exactly relate to each other or cover
// different virtual memory range.
void _Mapping::_assertVMAOk() {
    _Mapping* mmap = this;
    VMA *vma = mmap->vma;

    if (!(vma->mmap_overlay_server == static_cast<void*>(mmap)))
        panic("BUG: mmap and vma do not link to each other");
    if (!(vma->addr_start == uintptr_t(mmap->mem_start) &&
          vma->addr_stop  == uintptr_t(mmap->mem_stop)))
        panic("BUG: mmap and vma cover different virtual memory ranges");

    // verified ok
}


string WCFS::String() const {
    const WCFS& wc = *this;
    return fmt::sprintf("wcfs %s", v(wc.mountpoint));
}

// NOTE String must be called with Conn.atMu locked.
string _Conn::String() const {
    const _Conn& wconn = *this;
    // XXX don't include wcfs as prefix here?
    // (e.g. to use Conn.String in tracing without wcfs prefix)
    // (if yes -> go and correct all xerr::Contextf calls)
    return fmt::sprintf("%s: conn%d @%s", v(wconn._wc), wconn._wlink->fd(), v(wconn.at));
}

_Conn::_Conn()  {}
_Conn::~_Conn() {}
void _Conn::decref() {
    if (__decref())
        delete this;
}

_FileH::_FileH()  {}
_FileH::~_FileH() {}
void _FileH::decref() {
    if (__decref())
        delete this;
}

_Mapping::_Mapping()  {}
_Mapping::~_Mapping() {}
void _Mapping::decref() {
    if (__decref())
        delete this;
}

dict<int64_t, zodb::Tid> _tfileh_pinned(FileH fileh) {
    return fileh->_pinned;
}

}   // wcfs::