// Copyright (C) 2018-2019  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

// wcfs_virtmem provides wcfs client integrated with user-spce virtmem layer.
//
// XXX provides isolated view.

#include <golang/libgolang.h>
using namespace golang;

// XXX hack: C++ does not have __builtin_types_compatible_p, but CCAN configure
// think it does because CCAN is configired via C, not C++
#include <config.h>
#undef  HAVE_BUILTIN_TYPES_COMPATIBLE_P
#define HAVE_BUILTIN_TYPES_COMPATIBLE_P 0

#include <wendelin/bigfile/virtmem.h>
#include <wendelin/bigfile/ram.h>
#include <wendelin/bug.h>

#include <unordered_map>
#include <vector>
#include <stdint.h>


#include "wcfs_misc.h"


template<typename Key, typename Value>
using dict = std::unordered_map<Key, Value>;

using std::vector;

typedef uint64_t Tid;   // XXX ok?
typedef uint64_t Oid;   // XXX ok?

// TidHead is invalid Tid which is larged Tid value and means @head
const Tid TidHead = -1ULL;

// XXX ok?
struct IContext {
    virtual chan<structZ> done() = 0;
};

struct Conn;
struct _File;
struct _Mapping;
struct WatchLink;
struct SrvReq;

// WCFS represents filesystem-level connection to wcfs server.
// XXX doc
struct WCFS {

    Conn *connect(Tid at);
};

// Conn represents logical connection that provides view of data on wcfs
// filesystem as of particular database state.
//
// XXX doc
struct Conn {
    WCFS        *_wc;
    Tid         at;
    WatchLink   *_wlink;

    sync::Mutex        _filemu;
    dict<Oid, _File*>  _filetab; // {} foid -> _file


private:
    void _pinner(IContext *ctx);
    void _pin1(SrvReq *req);
};

// _File represent isolated file view under Conn.
//
// XXX doc
struct _File {
    Conn    *wconn;
    Oid     foid;       // hex of ZBigFile root object ID
    size_t  blksize;    // block size of this file
    // .headf        file object of head/file
    // .headfsize    head/file size is known to be at least headfsize (size ↑=)
    dict<int64_t, Tid> pinned;  // {} blk -> rev   that wcfs already sent us for this file
    vector<_Mapping*> mmaps;    // []_Mapping ↑blk_start    mappings of this file
};

// _Mapping represents one mapping of _File.
struct _Mapping {
    _File   *file;
    int     blk_start;  // offset of this mapping in file

    BigFileH *fileh;        // mmapped under this file handle
    uint8_t  *mem_start;    // mmapped memory [mem_start, mem_stop)
    uint8_t  *mem_stop;

    int64_t blk_stop() const {
        ASSERT((mem_stop - mem_start) % file->blksize == 0);
        return blk_start + (mem_stop - mem_start) / file->blksize;
    }

    void _remmapblk(int64_t blk, Tid at);
};


// XXX struct WatchLink
struct WatchLink {
    // XXX

    SrvReq *recvReq(IContext *ctx);
};

// SrvReq represents 1 server-initiated wcfs request received over /head/watch link.
// XXX -> PinReq?
struct SrvReq {
    // XXX
    Oid     foid;   // request is about this file
    int64_t blk;    // ----//---- about this block
    Tid     at;     // pin to this at       XXX ffff = None = unpin
};


// connect creates new Conn viewing WCFS state as of @at.
Conn *WCFS::connect(Tid at) {
    WCFS *wc = this;

    Conn *wconn = new Conn();
    // TODO support !isolated mode
    wconn->_wc      = wc;
    wconn->at       = at;
//  wconn._wlink    = WatchLink(wc)     XXX

    // XXX reenable
#if 0
    pinCtx, wconn._pinCancel = context.with_cancel(context.background())
    wconn._pinWG = sync.WorkGroup(pinCtx)
    wconn._pinWG.go(wconn._pinner)
#endif

    return wconn;
}

// XXX Conn::close

// _pinner receives pin messages from wcfs and adjusts wconn mappings.
void Conn::_pinner(IContext *ctx) {
    Conn *wconn = this;

    // XXX panic/exc -> log.CRITICAL

    while (1) {
        SrvReq *req = wconn->_wlink->recvReq(ctx);
        if (req == NULL)
            return  // XXX ok? (EOF - when wcfs closes wlink)

        // we received request to pin/unpin file block. handle it
        wconn->_pin1(req);

        // XXX free req?
    }
}

// pin1 handles one pin request received from wcfs.
void Conn::_pin1(SrvReq *req) {
    Conn *wconn = this;

    // XXX defer: reply either ack or nak on error

    wconn->_filemu.lock();
    auto _ = wconn->_filetab.find(req->foid);
    if (_ == wconn->_filetab.end()) {
        // XXX unlock
        // XXX err = we are not watching the file - why wcfs sent us this update?
    }

    _File *f = _->second;

    // XXX relock wconn -> f

    for (auto mmap : f->mmaps) {    // XXX use ↑blk_start for binary search
        if (!(mmap->blk_start <= req->blk && req->blk < mmap->blk_stop()))
            continue;   // blk ∉ mmap

        // XXX reenable
        //trace("\tremmapblk %d @%s" % (req->blk, (h(req.at) if req.at else "head")))
        // check if virtmem did not dirtied page corresponding to this block already
        virt_lock();
        TODO (mmap->file->blksize != mmap->fileh->ramh->ram->pagesize);
        if (!__fileh_page_isdirty(mmap->fileh, req->blk))
            mmap->_remmapblk(req->blk, req->at);
        virt_unlock();
        //trace("\t-> remmaped");   XXX
    }

    // update f.pinned
    // XXX do it before ^^^ remmapblk (so that e.g. concurrent
    // discard/writeout see correct f.pinned) ?
#if 0
    if req.at is None:
        f.pinned.pop(req.blk, None)     # = delete(f.pinned, req.blk)  -- unpin to @head
    else:
        f.pinned[req.blk] = req.at
    }
#endif

    wconn->_filemu.unlock();
}

// _remmapblk remmaps mapping memory for file[blk] to be viewing database as of @at state.
//
// at=None means unpin to head/ .   XXX -> C
// NOTE this does not check wrt virtmem already mapped blk as RW.
void _Mapping::_remmapblk(int64_t blk, Tid at) {
    // XXX err context?
    _Mapping *mmap = this;

    ASSERT(mmap->blk_start <= blk && blk < mmap->blk_stop());
    _File *f = mmap->file;

    uint8_t *blkmem = mmap->mem_start + (blk - mmap->blk_start)*f->blksize;
    osfile fsfile;
    if (at == TidHead) {
        fsfile = f->headf;
    }
    else {
        // TODO share @rev fd until wconn is resynced?
        fsfile = f->wconn->_wc._open("@%s/bigfile/%s" % (h(at), h(f->foid)), "rb")
        defer(fsfile.close)
    }

    struct stat st;
    err = fsfile.stat(&st);
    if (err != nil)
        return err;
    ASSERT(st.st_blksize == f->blksize);  // FIXME assert

    // block is beyond file size - mmap with zeros   (assumes head/f size ↑=)
    if ((blk+1)*f->blksize > st.st_size) {
        mm.map_zero_into_ro(blkmem);
    }
    // block is inside file - mmap file data
    else {
        mm.map_into_ro(blkmem, fsfile.fileno(), blk*f->blksize);
    }
}