// Copyright (C) 2018-2020 Nexedi SA and Contributors. // Kirill Smelkov <kirr@nexedi.com> // // This program is free software: you can Use, Study, Modify and Redistribute // it under the terms of the GNU General Public License version 3, or (at your // option) any later version, as published by the Free Software Foundation. // // You can also Link and Combine this program with other software covered by // the terms of any of the Free Software licenses or any of the Open Source // Initiative approved licenses and Convey the resulting work. Corresponding // source of such a combination shall include the source code for all other // software used. // // This program is distributed WITHOUT ANY WARRANTY; without even the implied // warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // // See COPYING file for full licensing terms. // See https://www.nexedi.com/licensing for rationale and options. // Package wcfs provides WCFS client integrated with user-space virtual memory manager. // See wcfs.h for package overview. // Wcfs client organization // // Wcfs client provides to its users isolated bigfile views backed by data on // WCFS filesystem. In the absence of Isolation property, wcfs client would // reduce to just directly using OS-level file wcfs/head/f for a bigfile f. On // the other hand there is a simple, but inefficient, way to support isolation: // for @at database view of bigfile f - directly use OS-level file wcfs/@at/f. // The latter works, but is very inefficient because OS-cache for f data is not // shared in between two connections with @at1 and @at2 views. The cache is // also lost when connection view of the database is resynced on transaction // boundary. To support isolation efficiently, wcfs client uses wcfs/head/f // most of the time, but injects wcfs/@revX/f parts into mappings to maintain // f@at view driven by pin messages that wcfs server sends to client in // accordance to WCFS isolation protocol(*). // // Wcfs server sends pin messages synchronously triggered by access to mmaped // memory. That means that a client thread, that is accessing wcfs/head/f mmap, // is completely blocked while wcfs server sends pins and waits to receive acks // from all clients. In other words on-client handling of pins has to be done // in separate thread, because wcfs server can also send pins to client that // triggered the access. // // Wcfs client implements pins handling in so-called "pinner" thread(+). The // pinner thread receives pin requests from wcfs server via watchlink handle // opened through wcfs/head/watch. For every pin request the pinner finds // corresponding Mappings and injects wcfs/@revX/f parts via Mapping._remmapblk // appropriately. // // The same watchlink handle is used to send client-originated requests to wcfs // server. The requests are sent to tell wcfs that client wants to observe a // particular bigfile as of particular revision, or to stop watching for it. // Such requests originate from regular client threads - not pinner - via entry // points like Conn.open, Conn.resync and FileH.close. // // Every FileH maintains fileh._pinned {} with currently pinned blk -> rev. This // dict is updated by pinner driven by pin messages, and is used when either // new fileh Mapping is created (FileH.mmap) or refreshed due to request from // virtmem (Mapping.remmap_blk, see below). // // In wendelin.core a bigfile has semantic that it is infinite in size and // reads as all zeros beyond region initialized with data. Memory-mapping of // OS-level files can also go beyond file size, however accessing memory // corresponding to file region after file.size triggers SIGBUS. To preserve // wendelin.core semantic wcfs client mmaps-in zeros for Mapping regions after // wcfs/head/f.size. For simplicity it is assumed that bigfiles only grow and // never shrink. It is indeed currently so, but will have to be revisited // if/when wendelin.core adds bigfile truncation. Wcfs client restats // wcfs/head/f at every transaction boundary (Conn.resync) and remembers f.size // in FileH._headfsize for use during one transaction. // // // Integration with wendelin.core virtmem layer // // Wcfs client integrates with virtmem layer to support virtmem handle // dirtying pages of read-only base-layer that wcfs client provides via // isolated Mapping. For wcfs-backed bigfiles every virtmem VMA is interlinked // with Mapping: // // VMA -> BigFileH -> ZBigFile -----> Z // ↑↓ O // Mapping -> FileH -> wcfs server --> DB // // When a page is write-accessed, virtmem mmaps in a page of RAM in place of // accessed virtual memory, copies base-layer content provided by Mapping into // there, and marks that page as read-write. // // Upon receiving pin message, the pinner consults virtmem, whether // corresponding page was already dirtied in virtmem's BigFileH (call to // __fileh_page_isdirty), and if it was, the pinner does not remmap Mapping // part to wcfs/@revX/f and just leaves dirty page in its place, remembering // pin information in fileh._pinned. // // Once dirty pages are no longer needed (either after discard/abort or // writeout/commit), virtmem asks wcfs client to remmap corresponding regions // of Mapping in its place again via calls to Mapping.remmap_blk for previously // dirtied blocks. // // The scheme outlined above does not need to split Mapping upon dirtying an // inner page. // // See bigfile_ops interface (wendelin/bigfile/file.h) that explains base-layer // and overlaying from virtmem point of view. For wcfs this interface is // provided by small wcfs client wrapper in bigfile/file_zodb.cpp. // // -------- // // (*) see wcfs.go documentation for WCFS isolation protocol overview and details. // (+) currently, for simplicity, there is one pinner thread for each connection. // In the future, for efficiency, it might be reworked to be one pinner thread // that serves all connections simultaneously. // Wcfs client locking organization // // XXX locking -> explain atMu + slaves and refer to "Locking" in wcfs.go // // Conn.atMu > Conn.mu > FileH.mu // // Several locks are RWMutex instead of just Mutex not only to allow more // concurrency, but, in the first place for correctness: pinner thread being // core element in handling WCFS isolation protocol, is effectively invoked // synchronously from other threads via messages coming through wcfs server. // For example Conn.resync sends watch request to wcfs and waits for the // answer. Wcfs server, in turn, might send corresponding pin messages to the // pinner and _wait_ for the answer before answering to resync: // // - - - - - - // | .···|·····. ----> = request // pinner <------.↓ <···· = response // | | wcfs // resync -------^↓ // | `····|····· // - - - - - - // client process // // This creates the necessity to use RWMutex for locks that pinner and other // parts of the code could be using at the same time in synchronous scenarious // similar to the above. This locks are: // // - Conn.atMu // - Conn.mu // // XXX pinner takes the following locks (XXX recheck) // // - wconn.mu.W // - wconn.mu.R // // - wconn.atMu.R // - wconn.mu.R // - fileh.mu (R:.mmaps W:.pinned) // // - virt_lock // // #include "wcfs_misc.h" #include "wcfs.h" #include "wcfs_watchlink.h" #include <wendelin/bigfile/virtmem.h> #include <wendelin/bigfile/ram.h> #include <golang/errors.h> #include <golang/fmt.h> #include <golang/io.h> #include <algorithm> #include <string> #include <vector> #include <sys/types.h> #include <sys/mman.h> #include <sys/stat.h> #include <unistd.h> using std::min; using std::max; using std::vector; // wcfs:: namespace wcfs { static error mmap_zero_into_ro(void *addr, size_t size); static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size); static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset); // connect creates new Conn viewing WCFS state as of @at. pair<Conn, error> WCFS::connect(zodb::Tid at) { WCFS *wc = this; xerr::Contextf E("wcfs %s: connect @%s", v(wc->mountpoint), v(at)); // TODO support !isolated mode WatchLink wlink; error err; tie(wlink, err) = wc->_openwatch(); if (err != nil) return make_pair(nil, E(err)); Conn wconn = adoptref(new _Conn()); wconn->_wc = wc; wconn->at = at; wconn->_wlink = wlink; context::Context pinCtx; tie(pinCtx, wconn->_pinCancel) = context::with_cancel(context::background()); wconn->_pinWG = sync::NewWorkGroup(pinCtx); wconn->_pinWG->go([wconn](context::Context ctx) -> error { return wconn->_pinner(ctx); }); // need to wait till `wcfs/head/at ≥ at` because e.g. Conn.open stats // head/f to get f.headfsize. // XXX atMu.RLock ? err = wconn->_headWait(at); if (err != nil) { // XXX bring conn down - stop pinner return make_pair(nil, E(err)); } return make_pair(wconn, nil); } // _headWait waits till wcfs/head/at becomes ≥ at. // // XXX locks condition? // XXX place? error _Conn::_headWait(zodb::Tid at) { // XXX +ctx ? // FIXME implement return nil; } static global<error> errConnClosed = errors::New("connection closed"); // close releases resources associated with wconn. // // opened fileh and mappings become invalid to use except close and unmap. error _Conn::close() { _Conn& wconn = *this; wconn._atMu.RLock(); defer([&]() { wconn._atMu.RUnlock(); }); xerr::Contextf E("%s: close", v(wconn)); error err, eret; auto reterr1 = [&eret](error err) { if (eret == nil && err != nil) eret = err; }; bool alreadyClosed = false; wconn._mu.Lock(); alreadyClosed = (wconn._downErr == errConnClosed); wconn._downErr = errConnClosed; wconn._mu.Unlock(); if (alreadyClosed) return nil; // close wlink and signal to pinner to stop outside of wconn.mu err = wconn._wlink->close(); // XXX ok under atMu? if (err != nil) reterr1(err); wconn._pinCancel(); err = wconn._pinWG->wait(); // XXX ok under atMu? if (!errors::Is(err, context::canceled)) // canceled - ok reterr1(err); // pinner is stopped - now close all files - both that have no mappings and // that still have opened mappings. // // NOTE after file is closed mappings could continue to survive, but we can no // longer maintain consistent view. For this reason we change mappings to // something that gives EFAULT on access. XXX implement wconn._mu.Lock(); defer([&]() { wconn._mu.Unlock(); }); // XXX f locking for (auto _ : wconn._filehTab) { auto f = _.second; err = f->_headf->close(); // XXX mark fileh as down so that fileh.close does not say "bad fd" if (err != nil) reterr1(err); f->_headf = nil; // XXX stop watching f XXX ok under mu? } wconn._filehTab.clear(); return E(eret); } // _pinner receives pin messages from wcfs and adjusts wconn file mappings. error _Conn::_pinner(context::Context ctx) { _Conn& wconn = *this; error err = wconn.__pinner(ctx); // if pinner fails, wcfs will kill us. // log pinner error so that the error is not hidden. // print to stderr as well as by default log does not print to there. // XXX also catch panic/exc ? if (!(err == nil || errors::Is(err, context::canceled))) { // canceled = .close asks pinner to stop log::Fatalf("CRITICAL: %s", v(err)); log::Fatalf("CRITICAL: wcfs server will likely kill us soon."); fprintf(stderr, "CRITICAL: %s\n", v(err)); fprintf(stderr, "CRITICAL: wcfs server will likely kill us soon.\n"); } // mark the connection non-operational if the pinner fails // XXX deadlock wrt resync? (who read-locks wconn.mu) // XXX -> mu -> downMu ? wconn._mu.Lock(); // XXX locking ok? -> merge into below where lock is held? if (wconn._downErr == nil) { wconn._downErr = fmt::errorf("no longer operational due to: %w", err != nil ? err : fmt::errorf("pinner exit")); // XXX make all fileh and mapping invalid. } wconn._mu.Unlock(); return err; } error _Conn::__pinner(context::Context ctx) { _Conn& wconn = *this; xerr::Contextf E("pinner"); // NOTE pinner error goes to Conn::close who has its own context PinReq req; error err; while (1) { err = wconn._wlink->recvReq(ctx, &req); if (err != nil) { // it is ok if we receive EOF due to us (client) closing the connection if (err == io::EOF_) { wconn._mu.RLock(); err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF; wconn._mu.RUnlock(); } return E(err); } // we received request to pin/unpin file block. handle it err = wconn._pin1(&req); if (err != nil) { return E(err); } } } // pin1 handles one pin request received from wcfs. error _Conn::_pin1(PinReq *req) { _Conn& wconn = *this; xerr::Contextf E("pin f<%s> #%ld @%s", v(req->foid), req->blk, v(req->at)); error err = wconn.__pin1(req); // reply either ack or nak on error string ack = "ack"; if (err != nil) ack = fmt::sprintf("nak: %s", v(err)); // NOTE ctx=bg to always send reply even if we are canceled error err2 = wconn._wlink->replyReq(context::background(), req, ack); if (err == nil) err = err2; return E(err); } error _Conn::__pin1(PinReq *req) { _Conn& wconn = *this; FileH f; bool ok; wconn._atMu.RLock(); defer([&]() { wconn._atMu.RUnlock(); }); // XXX deadlock wrt Conn.resync which locks wconn.mu and does "watch" ? wconn._mu.RLock(); // XXX +incref f, so that simultaneous close does not remove f from wconn.filehTab ? // XXX or just make FileH.close lock f too to synchronize with pinner? tie(f, ok) = wconn._filehTab.get_(req->foid); if (!ok) { wconn._mu.RUnlock(); // why wcfs sent us this update? return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid)); } // XXX <- f._openReady ? wconn._mu.RUnlock(); // XXX maybe `f.mu.lock() -> wconn.mu.unlock()` to avoid race with FileH close? f->_mu.lock(); defer([&]() { f->_mu.unlock(); }); for (auto mmap : f->_mmaps) { // TODO use ↑blk_start for binary search if (!(mmap->blk_start <= req->blk && req->blk < mmap->blk_stop())) continue; // blk ∉ mmap //trace("\tremmapblk %d @%s" % (req->blk, (v(req.at) if req.at else "head"))) // pin only if virtmem did not dirtied page corresponding to this block already // if virtmem dirtied the page - it will ask us to remmap it again after commit or abort. bool do_pin= true; error err; if (mmap->vma != nil) { mmap->_assertVMAOk(); // XXX recheck wrt Mapping.unmap: there it locks: // 1. virt_lock // 2. f.mu // // -> if here we do // 1. f.mu // 2. virt_lock // // -> deadlock virt_lock(); BigFileH *virt_fileh = mmap->vma->fileh; TODO (mmap->fileh->blksize != virt_fileh->ramh->ram->pagesize); do_pin = !__fileh_page_isdirty(virt_fileh, req->blk); } if (do_pin) err = mmap->_remmapblk(req->blk, req->at); if (mmap->vma != nil) virt_unlock(); // on error don't need to continue with other mappings - all fileh and // all mappings become marked invalid on pinner failure. // XXX all call wconn._down from here under wconn._mu lock? if (err != nil) return err; //trace("\t-> remmaped"); XXX } // update f._pinned if (req->at == TidHead) { f->_pinned.erase(req->blk); // unpin to @head } else { f->_pinned[req->blk] = req->at; } return nil; } // resync resyncs connection and its file mappings onto different database view. // // bigfile/_file_zob.pyx arranges to call Conn.resync at transaction boundaries // to keep Conn view in sync with updated zconn database view. error _Conn::resync(zodb::Tid at) { _Conn& wconn = *this; error err; wconn._atMu.RLock(); xerr::Contextf E("%s: resync -> @%s", v(wconn), v(at)); wconn._atMu.RUnlock(); // XXX downErr -> E // XXX at ^ (increases) // first wait for wcfs/head to be >= at. // we need this e.g. to be sure that head/f.size is at least as big that it will be @at state. err = wconn._headWait(at); if (err != nil) return err; // XXX -> wconn down on err ? // lock wconn._atMu.W . This excludes everything else, and in // particular _pinner_, from running and mutating files and mappings. // // NOTE we'll relock atMu as R in the second part of resync, so we prelock // wconn._mu.R as well while under atMu.W, to be sure that set of opened // files stays the same during whole resync. bool atMuWLocked = true; wconn._atMu.Lock(); wconn._mu.RLock(); defer([&]() { wconn._mu.RUnlock(); if (atMuWLocked) wconn._atMu.Unlock(); else wconn._atMu.RUnlock(); }); err = wconn._downErr; if (err != nil) return E(err); bool retok = false; defer([&]() { if (!retok) //panic("TODO: bring wconn + fileh + mmaps down on error"); // XXX fprintf(stderr, "\n\nTODO: bring wconn + fileh + mmaps down on error\n\n\n"); }); // set new wconn.at early, so that e.g. Conn.open running simultaneously // to second part of resync (see below) uses new at. // XXX no need since wconn._mu is locked? -> no - it is *needed* after wconn.mu became RWMutex wconn.at = at; // go through all files opened under wconn and pre-adjust their mappings // for viewing data as of new @at state. // // We are still holding atMu.W, so we are the only mutators of mappings, // because, in particular, pinner is not running. // // Don't send watch updates for opened files to wcfs yet - without running // pinner those updates will get stuck. for (auto fit : wconn._filehTab) { //zodb::Oid foid = fit.first; FileH f = fit.second; // TODO if file has no mappings and was not used during whole prev // cycle - forget and stop watching it // XXX not yet ready f // update f._headfsize and remmap to head/f zero regions that are now covered by head/f struct stat st; err = f->_headf->stat(&st); if (err != nil) return E(err); if ((size_t)st.st_blksize != f->blksize) // blksize must not change return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize)); auto headfsize = st.st_size; if (!(f->_headfsize <= headfsize)) // head/file size ↑= return E(fmt::errorf("wcfs bug: head/file size not ↑=")); if (!(headfsize % f->blksize == 0)) return E(fmt::errorf("wcfs bug: head/file size %% blksize != 0")); // replace zero regions in f mappings in accordance to adjusted f._headfsize. // NOTE it is ok to access f._mmaps without locking f._mu because we hold wconn.atMu.W for (auto mmap : f->_mmaps) { //printf(" resync -> %s: unzero [%lu:%lu)", v(at), f->_headfsize/f->blksize, headfsize/f->blksize); uint8_t *mem_unzero_start = min(mmap->mem_stop, mmap->mem_start + (f->_headfsize - mmap->blk_start*f->blksize)); uint8_t *mem_unzero_stop = min(mmap->mem_stop, mmap->mem_start + ( headfsize - mmap->blk_start*f->blksize)); if (mem_unzero_stop - mem_unzero_start > 0) { err = mmap_into_ro(mem_unzero_start, mem_unzero_stop-mem_unzero_start, f->_headf, f->_headfsize); if (err != nil) return E(err); } } f->_headfsize = headfsize; } // atomically downgrade atMu.W to atMu.R before issuing watch updates to wcfs. // - we need atMu to be not Wlocked, because under atMu.W pinner cannot run simultaneously to us. // - we need to hold atMu.R to avoid race wrt e.g. other resync which changes at. // - we cannot just do regular `atMu.Unlock + atMu.RLock()` because then // there is e.g. a race window in between Unlock and RLock where wconn.at can be changed. // XXX also deadlock, because it will become wconn._mu.lock + wconn._atMu lock // // Now other calls, e.g. Conn.open, can be running simultaneously to us, // but since we already set wconn.at to new value it is ok. For example // Conn.open, for not-yet-opened file, will use new at to send "watch". // XXX ^^^ not possible since wconn._mu is locked ? // -> no, possible, wconn._mu became RWMutex // // XXX we are still holding wconn._mu.R, so wconn._filehTab is the // same as in previous pass above. wconn._atMu.UnlockToRLock(); atMuWLocked = false; // send watch updates to wcfs. // the pinner is now running and will be able to serve pin requests triggered by out watch. for (auto fit : wconn._filehTab) { zodb::Oid foid = fit.first; //FileH f = fit.second; // XXX need to lock f.mu because wconn.atMu is only R now. // XXX need to coordinate with e.g. FileH.close -> "if f.state != CLOSING" ? // XXX need to coordinate with e.g. Conn.open -> "if f.state != OPENING" ? string ack; // XXX f._watchMu.lock() + unlock() // XXX + recheck status before sending the watch? tie(ack, err) = wconn._wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(at))); if (err != nil) return E(err); if (ack != "ok") { return E(fmt::errorf("%s", v(ack))); } } // wconn.at = at; retok = true; return nil; } // open opens FileH corresponding to ZBigFile foid. pair<FileH, error> _Conn::open(zodb::Oid foid) { _Conn& wconn = *this; error err; wconn._atMu.RLock(); defer([&]() { wconn._atMu.RUnlock(); }); xerr::Contextf E("%s: open f<%s>", v(wconn), v(foid)); wconn._mu.Lock(); if (wconn._downErr != nil) { err = wconn._downErr; wconn._mu.Unlock(); return make_pair(nil, E(err)); } FileH f; bool ok; tie(f, ok) = wconn._filehTab.get_(foid); if (ok) { f->_nopen++; // XXX lock by f.mu ? wconn._mu.Unlock(); // wait for: "opening" -> opened f->_openReady.recv(); if (f->_openErr != nil) { // don't care about f->_nopen-- since f is not returned anywhere return make_pair(nil, E(f->_openErr)); } return make_pair(f, nil); } // create "opening" FileH entry and perform open with wconn._mu released // NOTE wconn._atMu.R is still held because FileH._open relies on wconn.at being stable. f = adoptref(new _FileH()); f->wconn = newref(&wconn); f->foid = foid; f->_openReady = makechan<structZ>(); f->_openErr = nil; f->_headf = nil; f->blksize = 0; f->_headfsize = 0; f->_nopen = 1; f->_closed = false; bool retok = false; wconn._filehTab[foid] = f; defer([&]() { if (!retok) { wconn._mu.Lock(); // don't care about f->_nopen-- since f is not returned anywhere if (wconn._filehTab.get(foid) != f) { wconn._mu.Unlock(); panic("BUG: wconn.open: wconn.filehTab[foid] mutated while file open was in progress"); } wconn._filehTab.erase(foid); wconn._mu.Unlock(); } f->_openReady.close(); }); wconn._mu.Unlock(); f->_openErr = f->_open(); if (f->_openErr != nil) return make_pair(nil, E(f->_openErr)); retok = true; return make_pair(f, nil); } // _open performs actual open of FileH marked as "in-flight-open" in wconn.filehTab. // // Called with: // - wconn.atMu held // - wconn.mu not locked // - f.mu not locked error _FileH::_open() { _FileH* f = this; Conn wconn = f->wconn; error err; tie(f->_headf, err) = wconn->_wc->_open(fmt::sprintf("head/bigfile/%s", v(foid))); if (err != nil) return err; bool retok = false; defer([&]() { if (!retok) f->_headf->close(); }); struct stat st; err = f->_headf->stat(&st); if (err != nil) return err; f->blksize = st.st_blksize; f->_headfsize = st.st_size; if (!(f->_headfsize % f->blksize == 0)) return fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0", v(f->_headf->name()), f->_headfsize, f->blksize); // start watching f // NOTE we are _not_ holding wconn.mu nor f.mu - only wconn.atMu to rely on wconn.at being stable. // NOTE wcfs will reply "ok" only after wcfs/head/at ≥ wconn.at string ack; // XXX f._watchMu.lock() + unlock ? // XXX + recheck status before sending the watch? tie(ack, err) = wconn->_wlink->sendReq(context::background(), fmt::sprintf("watch %s @%s", v(foid), v(wconn->at))); if (err != nil) return err; if (ack != "ok") return fmt::errorf("watch: %s", v(ack)); retok = true; return nil; } // close releases resources associated with FileH. // // Left fileh mappings become invalid to use except unmap. error _FileH::close() { _FileH& fileh = *this; Conn wconn = fileh.wconn; // XXX locking ok? wconn->_atMu.RLock(); fileh._mu.lock(); // XXX for fileh ._nopen, ._closed ->better use wconn._mu for those too? defer([&]() { fileh._mu.unlock(); wconn->_atMu.RUnlock(); }); // fileh.close can be called several times. just return nil for second close. if (fileh._closed) return nil; // decref open count; do real close only when last open goes away. if (fileh._nopen <= 0) panic("BUG: fileh.close: fileh._nopen <= 0"); fileh._nopen--; if (fileh._nopen > 0) return nil; // last open went away - real close. xerr::Contextf E("%s: close f<%s>", v(wconn), v(fileh.foid)); error err, eret; auto reterr1 = [&eret](error err) { if (eret == nil && err != nil) eret = err; }; // XXX change all fileh.mmaps to cause EFAULT on any access after fileh.close // stop watching f XXX ok under f.mu ? string ack; // XXX f._watchMu.lock() + unlock // XXX + recheck status before sending the watch? tie(ack, err) = wconn->_wlink->sendReq(context::background(), fmt::sprintf("watch %s -", v(foid))); if (err != nil) reterr1(err); else if (ack != "ok") reterr1(fmt::errorf("unwatch: %s", v(ack))); // remove fileh from wconn._filehTab wconn->_mu.Lock(); // FIXME lock order vs fileh._mu if (wconn->_filehTab.get(fileh.foid)._ptr() != &fileh) panic("BUG: fileh.close: wconn.filehTab[fileh.foid] != fileh"); wconn->_filehTab.erase(fileh.foid); wconn->_mu.Unlock(); reterr1(fileh._headf->close()); fileh._closed = true; return E(eret); } // mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state. // // If vma != nil, created mapping is associated with that vma of user-space virtual memory manager: // virtmem calls FileH::mmap under virtmem lock when virtmem fileh is mmapped into vma. pair<Mapping, error> _FileH::mmap(int64_t blk_start, int64_t blk_len, VMA *vma) { _FileH& f = *this; // NOTE virtmem lock is held by virtmem caller // XXX locking ok? f.wconn->_atMu.RLock(); // e.g. f._headfsize f._mu.lock(); // f._pinned, f._mmaps defer([&]() { f._mu.unlock(); f.wconn->_atMu.RUnlock(); }); xerr::Contextf E("%s: mmap f<%s> [blk%ld +blk%ld)", v(f.wconn), v(f.foid), blk_start, blk_len); if (f._closed) // XXX locking return make_pair(nil, E(os::ErrClosed)); error err; if (blk_start < 0) panic("blk_start < 0"); if (blk_len < 0) panic("blk_len < 0"); int64_t blk_stop; // = blk_start + blk_len if (__builtin_add_overflow(blk_start, blk_len, &blk_stop)) panic("blk_start + blk_len overflow int64"); int64_t stop;// = blk_stop *f.blksize; if (__builtin_mul_overflow(blk_stop, f.blksize, &stop)) panic("(blk_start + blk_len)*f.blksize overflow int64"); int64_t start = blk_start*f.blksize; // create memory with head/f mapping and applied pins // mmap-in zeros after f.size (else access to memory after file.size will raise SIGBUS) uint8_t *mem_start, *mem_stop; tie(mem_start, err) = mmap_ro(f._headf, start, blk_len*f.blksize); if (err != nil) return make_pair(nil, E(err)); mem_stop = mem_start + blk_len*f.blksize; bool retok = false; defer([&]() { if (!retok) mm::unmap(mem_start, mem_stop - mem_start); // ignore error }); if (stop > f._headfsize) { uint8_t *zmem_start = mem_start + (max(f._headfsize/*XXX -1 ?*/, start) - start); err = mmap_zero_into_ro(zmem_start, mem_stop - zmem_start); if (err != nil) return make_pair(nil, E(err)); } Mapping mmap = adoptref(new _Mapping()); mmap->fileh = newref(&f); mmap->blk_start = blk_start; mmap->mem_start = mem_start; mmap->mem_stop = mem_stop; mmap->vma = vma; for (auto _ : f._pinned) { // TODO keep f._pinned ↑blk and use binary search int64_t blk = _.first; zodb::Tid rev = _.second; if (!(blk_start <= blk && blk < blk_stop)) continue; // blk ∉ this mapping err = mmap->_remmapblk(blk, rev); if (err != nil) return make_pair(nil, E(err)); } if (vma != nil) { if (vma->mmap_overlay_server != nil) panic("vma is already associated with overlay server"); if (!(vma->addr_start == 0 && vma->addr_stop == 0)) panic("vma already covers !nil virtual memory area"); mmap->incref(); // vma->mmap_overlay_server is keeping ref to mmap vma->mmap_overlay_server = mmap._ptr(); vma->addr_start = (uintptr_t)mmap->mem_start; vma->addr_stop = (uintptr_t)mmap->mem_stop; mmap->_assertVMAOk(); // just in case } f._mmaps.push_back(mmap); // TODO keep f._mmaps ↑blk_start retok = true; return make_pair(mmap, nil); } // unmap releases mapping memory from address space. // // After call to unmap the mapping must no longer be used. // The association in between mapping and linked virtmem VMA is reset. // // Virtmem calls Mapping.unmap under virtmem lock when VMA is unmapped. error _Mapping::unmap() { Mapping mmap = newref(this); // newref for std::remove FileH f = mmap->fileh; // NOTE virtmem lock is held by virtmem caller // XXX locking ok? f->wconn->_atMu.RLock(); f->_mu.lock(); // f._mmaps defer([&]() { f->_mu.unlock(); f->wconn->_atMu.RUnlock(); }); xerr::Contextf E("%s: f<%s>: unmap", v(f->wconn), v(f->foid)); if (mmap->vma != nil) { mmap->_assertVMAOk(); VMA *vma = mmap->vma; vma->mmap_overlay_server = nil; mmap->decref(); // vma->mmap_overlay_server was holding a ref to mmap vma->addr_start = 0; vma->addr_stop = 0; mmap->vma = nil; } error err = mm::unmap(mmap->mem_start, mmap->mem_stop - mmap->mem_start); mmap->mem_start = nil; mmap->mem_stop = nil; // XXX clear other fields? // XXX do it first? (to avoid pinner going through f.mmaps and hitting unmapped memory) // -> no need: both pinner and unmap lock on f.mu //f->_mmaps.remove(mmap); f->_mmaps.erase( std::remove(f->_mmaps.begin(), f->_mmaps.end(), mmap), f->_mmaps.end()); return E(err); } // _remmapblk remmaps mapping memory for file[blk] to be viewing database as of @at state. // // at=TidHead means unpin to head/ . // NOTE this does not check whether virtmem already mapped blk as RW. // // The following locks must be held by caller: // - f.wconn.atMu // - f._mu error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) { _Mapping *mmap = this; FileH f = mmap->fileh; xerr::Contextf E("%s: f<%s>: remmapblk #%ld @%s", v(f->wconn), v(f->foid), blk, v(at)); ASSERT(mmap->blk_start <= blk && blk < mmap->blk_stop()); error err; uint8_t *blkmem = mmap->mem_start + (blk - mmap->blk_start)*f->blksize; os::File fsfile; bool fclose = false; if (at == TidHead) { fsfile = f->_headf; } else { // TODO share @rev fd until wconn is resynced? tie(fsfile, err) = f->wconn->_wc->_open( fmt::sprintf("@%s/bigfile/%s", v(at), v(f->foid))); if (err != nil) return E(err); fclose = true; } defer([&]() { if (fclose) fsfile->close(); }); struct stat st; err = fsfile->stat(&st); if (err != nil) return E(err); if ((size_t)st.st_blksize != f->blksize) return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize)); // block is beyond file size - mmap with zeros - else access to memory // after file.size will raise SIGBUS. (assumes head/f size ↑=) if ((blk+1)*f->blksize > (size_t)st.st_size) { err = mmap_zero_into_ro(blkmem, 1*f->blksize); if (err != nil) return E(err); } // block is inside file - mmap in file data else { err = mmap_into_ro(blkmem, 1*f->blksize, fsfile, blk*f->blksize); if (err != nil) return E(err); } return nil; } // remmap_blk remmaps file[blk] in its place again. // // Virtmem calls Mapping.remmap_blk under virtmem lock to remmap a block after // RW dirty page was e.g. discarded. error _Mapping::remmap_blk(int64_t blk) { _Mapping& mmap = *this; FileH f = mmap.fileh; // NOTE virtmem lock is held by virtmem caller // XXX locking ok? f->wconn->_atMu.RLock(); f->_mu.lock(); defer([&]() { f->_mu.unlock(); f->wconn->_atMu.RUnlock(); }); if (!(mmap.blk_start <= blk && blk < mmap.blk_stop())) panic("remmap_blk: blk out of Mapping range"); // blkrev = rev | @head zodb::Tid blkrev; bool ok; tie(blkrev, ok) = f->_pinned.get_(blk); if (!ok) blkrev = TidHead; error err = mmap._remmapblk(blk, blkrev); if (err != nil) return err; // errctx is good in _remmapblk return nil; } // ---- WCFS raw file access ---- // _path returns path for object on wcfs. // - str: wcfs root + obj; string WCFS::_path(const string &obj) { WCFS *wc = this; return wc->mountpoint + "/" + obj; } tuple<os::File, error> WCFS::_open(const string &path, int flags) { WCFS *wc = this; string path_ = wc->_path(path); return os::open(path_, flags); } // ---- misc ---- // mmap_zero_into_ro mmaps read-only zeros into [addr +size) so that region is all zeros. // created mapping, even after it is accessed, does not consume memory. static error mmap_zero_into_ro(void *addr, size_t size) { xerr::Contextf E("mmap zero"); // mmap /dev/zero with MAP_NORESERVE and MAP_SHARED // this way the mapping will be able to be read, but no memory will be allocated to keep it. os::File z; error err; tie(z, err) = os::open("/dev/zero"); if (err != nil) return E(err); defer([&]() { z->close(); }); err = mm::map_into(addr, size, PROT_READ, MAP_SHARED | MAP_NORESERVE, z, 0); if (err != nil) return E(err); return nil; } // mmap_ro mmaps read-only fd[offset +size). // The mapping is created with MAP_SHARED. static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size) { return mm::map(PROT_READ, MAP_SHARED, f, offset, size); } // mmap_into_ro mmaps read-only fd[offset +size) into [addr +size). // The mapping is created with MAP_SHARED. static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset) { return mm::map_into(addr, size, PROT_READ, MAP_SHARED, f, offset); } // _assertVMAOk() verifies that mmap and vma are related to each other and cover // exactly the same virtual memory range. // // It panics if mmap and vma do not exactly relate to each other or cover // different virtual memory range. void _Mapping::_assertVMAOk() { _Mapping* mmap = this; VMA *vma = mmap->vma; if (!(vma->mmap_overlay_server == static_cast<void*>(mmap))) panic("BUG: mmap and vma do not link to each other"); if (!(vma->addr_start == uintptr_t(mmap->mem_start) && vma->addr_stop == uintptr_t(mmap->mem_stop))) panic("BUG: mmap and vma cover different virtual memory ranges"); // verified ok } string WCFS::String() const { const WCFS& wc = *this; return fmt::sprintf("wcfs %s", v(wc.mountpoint)); } // NOTE String must be called with Conn.atMu locked. string _Conn::String() const { const _Conn& wconn = *this; // XXX don't include wcfs as prefix here? // (e.g. to use Conn.String in tracing without wcfs prefix) // (if yes -> go and correct all xerr::Contextf calls) return fmt::sprintf("%s: conn%d @%s", v(wconn._wc), wconn._wlink->fd(), v(wconn.at)); } _Conn::_Conn() {} _Conn::~_Conn() {} void _Conn::decref() { if (__decref()) delete this; } _FileH::_FileH() {} _FileH::~_FileH() {} void _FileH::decref() { if (__decref()) delete this; } _Mapping::_Mapping() {} _Mapping::~_Mapping() {} void _Mapping::decref() { if (__decref()) delete this; } dict<int64_t, zodb::Tid> _tfileh_pinned(FileH fileh) { return fileh->_pinned; } } // wcfs::