// Copyright (C) 2018-2019 Nexedi SA and Contributors. // Kirill Smelkov <kirr@nexedi.com> // // This program is free software: you can Use, Study, Modify and Redistribute // it under the terms of the GNU General Public License version 3, or (at your // option) any later version, as published by the Free Software Foundation. // // You can also Link and Combine this program with other software covered by // the terms of any of the Free Software licenses or any of the Open Source // Initiative approved licenses and Convey the resulting work. Corresponding // source of such a combination shall include the source code for all other // software used. // // This program is distributed WITHOUT ANY WARRANTY; without even the implied // warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // // See COPYING file for full licensing terms. // See https://www.nexedi.com/licensing for rationale and options. // wcfs_virtmem provides wcfs client integrated with user-spce virtmem layer. // // XXX provides isolated view. #include <golang/libgolang.h> using namespace golang; // XXX hack: C++ does not have __builtin_types_compatible_p, but CCAN configure // think it does because CCAN is configired via C, not C++ #include <config.h> #undef HAVE_BUILTIN_TYPES_COMPATIBLE_P #define HAVE_BUILTIN_TYPES_COMPATIBLE_P 0 #include <wendelin/bigfile/virtmem.h> #include <wendelin/bigfile/ram.h> #include <wendelin/bug.h> #include <unordered_map> #include <vector> #include <stdint.h> #include "wcfs_misc.h" template<typename Key, typename Value> using dict = std::unordered_map<Key, Value>; using std::vector; typedef uint64_t Tid; // XXX ok? typedef uint64_t Oid; // XXX ok? // TidHead is invalid Tid which is larged Tid value and means @head const Tid TidHead = -1ULL; // XXX ok? struct IContext { virtual chan<structZ> done() = 0; }; struct Conn; struct _File; struct _Mapping; struct WatchLink; struct SrvReq; // WCFS represents filesystem-level connection to wcfs server. // XXX doc struct WCFS { Conn *connect(Tid at); }; // Conn represents logical connection that provides view of data on wcfs // filesystem as of particular database state. // // XXX doc struct Conn { WCFS *_wc; Tid at; WatchLink *_wlink; sync::Mutex _filemu; dict<Oid, _File*> _filetab; // {} foid -> _file private: void _pinner(IContext *ctx); void _pin1(SrvReq *req); }; // _File represent isolated file view under Conn. // // XXX doc struct _File { Conn *wconn; Oid foid; // hex of ZBigFile root object ID size_t blksize; // block size of this file // .headf file object of head/file // .headfsize head/file size is known to be at least headfsize (size ↑=) dict<int64_t, Tid> pinned; // {} blk -> rev that wcfs already sent us for this file vector<_Mapping*> mmaps; // []_Mapping ↑blk_start mappings of this file }; // _Mapping represents one mapping of _File. struct _Mapping { _File *file; int blk_start; // offset of this mapping in file BigFileH *fileh; // mmapped under this file handle uint8_t *mem_start; // mmapped memory [mem_start, mem_stop) uint8_t *mem_stop; int64_t blk_stop() const { ASSERT((mem_stop - mem_start) % file->blksize == 0); return blk_start + (mem_stop - mem_start) / file->blksize; } void _remmapblk(int64_t blk, Tid at); }; // XXX struct WatchLink struct WatchLink { // XXX SrvReq *recvReq(IContext *ctx); }; // SrvReq represents 1 server-initiated wcfs request received over /head/watch link. // XXX -> PinReq? struct SrvReq { // XXX Oid foid; // request is about this file int64_t blk; // ----//---- about this block Tid at; // pin to this at XXX ffff = None = unpin }; // connect creates new Conn viewing WCFS state as of @at. Conn *WCFS::connect(Tid at) { WCFS *wc = this; Conn *wconn = new Conn(); // TODO support !isolated mode wconn->_wc = wc; wconn->at = at; // wconn._wlink = WatchLink(wc) XXX // XXX reenable #if 0 pinCtx, wconn._pinCancel = context.with_cancel(context.background()) wconn._pinWG = sync.WorkGroup(pinCtx) wconn._pinWG.go(wconn._pinner) #endif return wconn; } // XXX Conn::close // _pinner receives pin messages from wcfs and adjusts wconn mappings. void Conn::_pinner(IContext *ctx) { Conn *wconn = this; // XXX panic/exc -> log.CRITICAL while (1) { SrvReq *req = wconn->_wlink->recvReq(ctx); if (req == NULL) return // XXX ok? (EOF - when wcfs closes wlink) // we received request to pin/unpin file block. handle it wconn->_pin1(req); // XXX free req? } } // pin1 handles one pin request received from wcfs. void Conn::_pin1(SrvReq *req) { Conn *wconn = this; // XXX defer: reply either ack or nak on error wconn->_filemu.lock(); auto _ = wconn->_filetab.find(req->foid); if (_ == wconn->_filetab.end()) { // XXX unlock // XXX err = we are not watching the file - why wcfs sent us this update? } _File *f = _->second; // XXX relock wconn -> f for (auto mmap : f->mmaps) { // XXX use ↑blk_start for binary search if (!(mmap->blk_start <= req->blk && req->blk < mmap->blk_stop())) continue; // blk ∉ mmap // XXX reenable //trace("\tremmapblk %d @%s" % (req->blk, (h(req.at) if req.at else "head"))) // check if virtmem did not dirtied page corresponding to this block already virt_lock(); TODO (mmap->file->blksize != mmap->fileh->ramh->ram->pagesize); if (!__fileh_page_isdirty(mmap->fileh, req->blk)) mmap->_remmapblk(req->blk, req->at); virt_unlock(); //trace("\t-> remmaped"); XXX } // update f.pinned // XXX do it before ^^^ remmapblk (so that e.g. concurrent // discard/writeout see correct f.pinned) ? #if 0 if req.at is None: f.pinned.pop(req.blk, None) # = delete(f.pinned, req.blk) -- unpin to @head else: f.pinned[req.blk] = req.at } #endif wconn->_filemu.unlock(); } // _remmapblk remmaps mapping memory for file[blk] to be viewing database as of @at state. // // at=None means unpin to head/ . XXX -> C // NOTE this does not check wrt virtmem already mapped blk as RW. void _Mapping::_remmapblk(int64_t blk, Tid at) { // XXX err context? _Mapping *mmap = this; ASSERT(mmap->blk_start <= blk && blk < mmap->blk_stop()); _File *f = mmap->file; uint8_t *blkmem = mmap->mem_start + (blk - mmap->blk_start)*f->blksize; osfile fsfile; if (at == TidHead) { fsfile = f->headf; } else { // TODO share @rev fd until wconn is resynced? fsfile = f->wconn->_wc._open("@%s/bigfile/%s" % (h(at), h(f->foid)), "rb") defer(fsfile.close) } struct stat st; err = fsfile.stat(&st); if (err != nil) return err; ASSERT(st.st_blksize == f->blksize); // FIXME assert // block is beyond file size - mmap with zeros (assumes head/f size ↑=) if ((blk+1)*f->blksize > st.st_size) { mm.map_zero_into_ro(blkmem); } // block is inside file - mmap file data else { mm.map_into_ro(blkmem, fsfile.fileno(), blk*f->blksize); } }