Commit 10f7153a authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: client: Provide client package to care about isolation protocol details

This patch follows-up on previous patch, that added server-side part of
isolation protocol handling, and adds client package that takes care about
WCFS isolation protocol details and provides to clients simple interface to
isolated view of bigfile data on WCFS similar to regular files: given a
particular revision of database @at, it provides synthetic read-only bigfile
memory mappings with data corresponding to @at state, but using /head/bigfile/*
most of the time to build and maintain the mappings.

The patch is organized as follows:

- wcfs.h and wcfs.cpp brings in usage documentation, internal overview and the
  main part of the implementation.

- wcfs/client/client_test.py is tests.

- The rest of the changes in wcfs/client/ are to support the implementation and tests.

Quoting package documentation for the reference:

---- 8< ----

Package wcfs provides WCFS client.

This client package takes care about WCFS isolation protocol details and
provides to clients simple interface to isolated view of bigfile data on
WCFS similar to regular files: given a particular revision of database @at,
it provides synthetic read-only bigfile memory mappings with data
corresponding to @at state, but using /head/bigfile/* most of the time to
build and maintain the mappings.

For its data a mapping to bigfile X mostly reuses kernel cache for
/head/bigfile/X with amount of data not associated with kernel cache for
/head/bigfile/X being proportional to δ(bigfile/X, at..head). In the usual
case where many client workers simultaneously serve requests, their database
views are a bit outdated, but close to head, which means that in practice
the kernel cache for /head/bigfile/* is being used almost 100% of the time.

A mapping for bigfile X@at is built from OS-level memory mappings of
on-WCFS files as follows:

                                          ___        /@revA/bigfile/X
        __                                           /@revB/bigfile/X
               _                                     /@revC/bigfile/X
                           +                         ...
     ───  ───── ──────────────────────────   ─────   /head/bigfile/X

where @revR mmaps are being dynamically added/removed by this client package
to maintain X@at data view according to WCFS isolation protocol(*).

API overview

 - `WCFS` represents filesystem-level connection to wcfs server.
 - `Conn` represents logical connection that provides view of data on wcfs
   filesystem as of particular database state.
 - `FileH` represent isolated file view under Conn.
 - `Mapping` represents one memory mapping of FileH.

A path from WCFS to Mapping is as follows:

 WCFS.connect(at)                    -> Conn
 Conn.open(foid)                     -> FileH
 FileH.mmap([blk_start +blk_len))    -> Mapping

A connection can be resynced to another database view via Conn.resync(at').

Documentation for classes provides more thorough overview and API details.

--------

(*) see wcfs.go documentation for WCFS isolation protocol overview and details.

.

Wcfs client organization
~~~~~~~~~~~~~~~~~~~~~~~~

Wcfs client provides to its users isolated bigfile views backed by data on
WCFS filesystem. In the absence of Isolation property, wcfs client would
reduce to just directly using OS-level file wcfs/head/f for a bigfile f. On
the other hand there is a simple, but inefficient, way to support isolation:
for @at database view of bigfile f - directly use OS-level file wcfs/@at/f.
The latter works, but is very inefficient because OS-cache for f data is not
shared in between two connections with @at1 and @at2 views. The cache is
also lost when connection view of the database is resynced on transaction
boundary. To support isolation efficiently, wcfs client uses wcfs/head/f
most of the time, but injects wcfs/@revX/f parts into mappings to maintain
f@at view driven by pin messages that wcfs server sends to client in
accordance to WCFS isolation protocol(*).

Wcfs server sends pin messages synchronously triggered by access to mmaped
memory. That means that a client thread, that is accessing wcfs/head/f mmap,
is completely blocked while wcfs server sends pins and waits to receive acks
from all clients. In other words on-client handling of pins has to be done
in separate thread, because wcfs server can also send pins to client that
triggered the access.

Wcfs client implements pins handling in so-called "pinner" thread(+). The
pinner thread receives pin requests from wcfs server via watchlink handle
opened through wcfs/head/watch. For every pin request the pinner finds
corresponding Mappings and injects wcfs/@revX/f parts via Mapping._remmapblk
appropriately.

The same watchlink handle is used to send client-originated requests to wcfs
server. The requests are sent to tell wcfs that client wants to observe a
particular bigfile as of particular revision, or to stop watching it.
Such requests originate from regular client threads - not pinner - via entry
points like Conn.open, Conn.resync and FileH.close.

Every FileH maintains fileh._pinned {} with currently pinned blk -> rev. This
dict is updated by pinner driven by pin messages, and is used when
new fileh Mapping is created (FileH.mmap).

In wendelin.core a bigfile has semantic that it is infinite in size and
reads as all zeros beyond region initialized with data. Memory-mapping of
OS-level files can also go beyond file size, however accessing memory
corresponding to file region after file.size triggers SIGBUS. To preserve
wendelin.core semantic wcfs client mmaps-in zeros for Mapping regions after
wcfs/head/f.size. For simplicity it is assumed that bigfiles only grow and
never shrink. It is indeed currently so, but will have to be revisited
if/when wendelin.core adds bigfile truncation. Wcfs client restats
wcfs/head/f at every transaction boundary (Conn.resync) and remembers f.size
in FileH._headfsize for use during one transaction(%).

--------

(*) see wcfs.go documentation for WCFS isolation protocol overview and details.
(+) currently, for simplicity, there is one pinner thread for each connection.
    In the future, for efficiency, it might be reworked to be one pinner thread
    that serves all connections simultaneously.
(%) see _headWait comments on how this has to be reworked.

Wcfs client locking organization

Wcfs client needs to synchronize regular user threads vs each other and vs
pinner. A major lock Conn.atMu protects updates to changes to Conn's view of
the database. Whenever atMu.W is taken - Conn.at is changing (Conn.resync),
and contrary whenever atMu.R is taken - Conn.at is stable (roughly speaking
Conn.resync is not running).

Similarly to wcfs.go(*) several locks that protect internal data structures
are minor to Conn.atMu - they need to be taken only under atMu.R (to
synchronize e.g. multiple fileh open running simultaneously), but do not
need to be taken at all if atMu.W is taken. In data structures such locks
are noted as follows

     sync::Mutex xMu;    // atMu.W  |  atMu.R + xMu

After atMu, Conn.filehMu protects registry of opened file handles
(Conn._filehTab), and FileH.mmapMu protects registry of created Mappings
(FileH.mmaps) and FileH.pinned.

Several locks are RWMutex instead of just Mutex not only to allow more
concurrency, but, in the first place for correctness: pinner thread being
core element in handling WCFS isolation protocol, is effectively invoked
synchronously from other threads via messages coming through wcfs server.
For example Conn.resync sends watch request to wcfs server and waits for the
answer. Wcfs server, in turn, might send corresponding pin messages to the
pinner and _wait_ for the answer before answering to resync:

       - - - - - -
      |       .···|·····.        ---->   = request
         pinner <------.↓        <····   = response
      |           |   wcfs
         resync -------^↓
      |      `····|·····
       - - - - - -
      client process

This creates the necessity to use RWMutex for locks that pinner and other
parts of the code could be using at the same time in synchronous scenarios
similar to the above. This locks are:

     - Conn.atMu
     - Conn.filehMu

Note that FileH.mmapMu is regular - not RW - mutex, since nothing in wcfs
client calls into wcfs server via watchlink with mmapMu held.

The ordering of locks is:

     Conn.atMu > Conn.filehMu > FileH.mmapMu

The pinner takes the following locks:

     - wconn.atMu.R
     - wconn.filehMu.R
     - fileh.mmapMu (to read .mmaps  +  write .pinned)

(*) see "Wcfs locking organization" in wcfs.go

Handling of fork

When a process calls fork, OS copies its memory and creates child process
with only 1 thread. That child inherits file descriptors and memory mappings
from parent. To correctly continue using Conn, FileH and Mappings, the child
must recreate pinner thread and reconnect to wcfs via reopened watchlink.
The reason here is that without reconnection - by using watchlink file
descriptor inherited from parent - the child would interfere into
parent-wcfs exchange and neither parent nor child could continue normal
protocol communication with WCFS.

For simplicity, since fork is seldomly used for things besides followup
exec, wcfs client currently takes straightforward approach by disabling
mappings and detaching from WCFS server in the child right after fork. This
ensures that there is no interference into parent-wcfs exchange should child
decide not to exec and to continue running in the forked thread. Without
this protection the interference might come even automatically via e.g.
Python GC -> PyFileH.__del__ -> FileH.close -> message to WCFS.

----------------------------------------

Some preliminary history:

kirr/wendelin.core@a8fa9178    X wcfs: move client tests into client/
kirr/wendelin.core@990afac1    X wcfs/client: Package overview (draft)
kirr/wendelin.core@3f83469c    X wcfs: client: Handle fork
kirr/wendelin.core@0ed6b8b6    fixup! X wcfs: client: Handle fork
kirr/wendelin.core@24378c46    X wcfs: client: Provide Conn.at()
parent 6f0cdaff
......@@ -28,6 +28,27 @@ represents filesystem-level connection to joined wcfs server. If wcfs server
for zurl is not yet running, it will be automatically started if join is given
`autostart=True` option.
The rest of wcfs.py merely wraps C++ wcfs client package:
- `WCFS` represents filesystem-level connection to wcfs server.
- `Conn` represents logical connection that provides view of data on wcfs
filesystem as of particular database state.
- `FileH` represent isolated file view under Conn.
- `Mapping` represents one memory mapping of FileH.
A path from WCFS to Mapping is as follows:
WCFS.connect(at) -> Conn
Conn.open(foid) -> FileH
FileH.mmap([blk_start +blk_len)) -> Mapping
Classes in wcfs.py logically mirror classes in ZODB:
wcfs.WCFS <-> ZODB.DB
wcfs.Conn <-> ZODB.Connection
Please see wcfs/client/wcfs.h for more thorough overview and further details.
Environment variables
---------------------
......@@ -80,6 +101,10 @@ class Server:
#
# Use join to create it.
#
# The primary way to access wcfs is to open logical connection viewing on-wcfs
# data as of particular database state, and use that logical connection to create
# base-layer mappings. See .connect and Conn in C++ API for details.
#
# Raw files on wcfs can be accessed with ._path/._read/._stat/._open .
#
# WCFS logically mirrors ZODB.DB .
......
......@@ -23,12 +23,13 @@
# Package _wcfs provides Python-wrappers for C++ wcfs client package.
#
# It wraps WCFS and WatchLink.
# It wraps WCFS/Conn/FileH/Mapping and WatchLink to help client_test.py unit-test
# WCFS base-layer mmap functionality.
from golang cimport chan, structZ, string, error, refptr
from golang cimport context
from golang cimport context, cxx
from libc.stdint cimport int64_t, uint64_t
from libc.stdint cimport int64_t, uint64_t, uint8_t
from libcpp.utility cimport pair
from libcpp.vector cimport vector
......@@ -78,6 +79,53 @@ cdef extern from "wcfs/client/wcfs.h" namespace "wcfs" nogil:
string mountpoint
pair[WatchLink, error] _openwatch()
pair[Conn, error] connect(Tid at)
cppclass _Conn:
Tid at()
pair[FileH, error] open(Oid foid)
error close()
error resync(Tid at)
cppclass Conn (refptr[_Conn]):
# Conn.X = Conn->X in C++
Tid at "_ptr()->at" ()
pair[FileH, error] open "_ptr()->open" (Oid foid)
error close "_ptr()->close" ()
error resync "_ptr()->resync" (Tid at)
cppclass _FileH:
size_t blksize
error close()
pair[Mapping, error] mmap(int64_t blk_start, int64_t blk_len) # `VMA *vma=nil` not exposed
cppclass FileH (refptr[_FileH]):
# FileH.X = FileH->X in C++
size_t blksize "_ptr()->blksize"
error close "_ptr()->close" ()
pair[Mapping, error] mmap "_ptr()->mmap" (int64_t blk_start, int64_t blk_len)
cppclass _Mapping:
FileH fileh
int64_t blk_start
int64_t blk_stop() const
uint8_t *mem_start
uint8_t *mem_stop
error unmap()
cppclass Mapping (refptr[_Mapping]):
# Mapping.X = Mapping->X in C++
FileH fileh "_ptr()->fileh"
int64_t blk_start "_ptr()->blk_start"
int64_t blk_stop "_ptr()->blk_stop" () const
uint8_t *mem_start "_ptr()->mem_start"
uint8_t *mem_stop "_ptr()->mem_stop"
error unmap "_ptr()->unmap" ()
cxx.dict[int64_t, Tid] _tfileh_pinned(FileH wfileh)
# ---- python bits ----
......@@ -85,6 +133,17 @@ cdef extern from "wcfs/client/wcfs.h" namespace "wcfs" nogil:
cdef class PyWCFS:
cdef WCFS wc
cdef class PyConn:
cdef Conn wconn
cdef readonly PyWCFS wc # PyWCFS that was used to create this PyConn
cdef class PyFileH:
cdef FileH wfileh
cdef class PyMapping:
cdef Mapping wmmap
cdef readonly PyFileH fileh
cdef class PyWatchLink:
cdef WatchLink wlink
......
......@@ -28,7 +28,13 @@
from golang cimport pychan, pyerror, nil
from golang cimport io
from ZODB.utils import p64
cdef extern from *:
ctypedef bint cbool "bool"
from ZODB.utils import p64, u64
from cpython cimport PyBuffer_FillInfo
from libcpp.unordered_map cimport unordered_map
cdef class PyWCFS:
......@@ -38,6 +44,130 @@ cdef class PyWCFS:
def __set__(PyWCFS pywc, string v):
pywc.wc.mountpoint = v
def connect(PyWCFS pywc, pyat): # -> PyConn
cdef Tid at = u64(pyat)
with nogil:
_ = wcfs_connect_pyexc(&pywc.wc, at)
wconn = _.first
err = _.second
if err != nil:
raise pyerr(err)
cdef PyConn pywconn = PyConn.__new__(PyConn)
pywconn.wconn = wconn
pywconn.wc = pywc
return pywconn
cdef class PyConn:
def __dealloc__(PyConn pywconn):
pywconn.wconn = nil
def at(PyConn pywconn):
with nogil:
at = wconn_at_pyexc(pywconn.wconn)
return p64(at)
def close(PyConn pywconn):
with nogil:
err = wconn_close_pyexc(pywconn.wconn)
if err != nil:
raise pyerr(err)
def open(PyConn pywconn, pyfoid): # -> FileH
cdef Oid foid = u64(pyfoid)
with nogil:
_ = wconn_open_pyexc(pywconn.wconn, foid)
wfileh = _.first
err = _.second
if err != nil:
raise pyerr(err)
cdef PyFileH pywfileh = PyFileH.__new__(PyFileH)
pywfileh.wfileh = wfileh
return pywfileh
def resync(PyConn pywconn, pyat):
cdef Tid at = u64(pyat)
with nogil:
err = wconn_resync_pyexc(pywconn.wconn, at)
if err != nil:
raise pyerr(err)
cdef class PyFileH:
def __dealloc__(PyFileH pywfileh):
pywfileh.wfileh = nil
def close(PyFileH pywfileh):
with nogil:
err = wfileh_close_pyexc(pywfileh.wfileh)
if err != nil:
raise pyerr(err)
def mmap(PyFileH pywfileh, int64_t blk_start, int64_t blk_len):
with nogil:
_ = wfileh_mmap_pyexc(pywfileh.wfileh, blk_start, blk_len)
wmmap = _.first
err = _.second
if err != nil:
raise pyerr(err)
assert wmmap.fileh .eq (pywfileh.wfileh)
cdef PyMapping pywmmap = PyMapping.__new__(PyMapping)
pywmmap.wmmap = wmmap
pywmmap.fileh = pywfileh
return pywmmap
property blksize:
def __get__(PyFileH pywfileh):
return pywfileh.wfileh.blksize
# XXX for tests
property pinned:
def __get__(PyFileH pywfileh):
# XXX cast: needed for cython to automatically convert to py dict
cdef dict p = <unordered_map[int64_t, Tid]> _tfileh_pinned(pywfileh.wfileh)
for blk in p:
p[blk] = p64(p[blk]) # rev(int64) -> rev(bytes)
return p
cdef class PyMapping:
def __dealloc__(PyMapping pywmmap):
# unmap just in case (double unmap is ok)
with nogil:
err = wmmap_unmap_pyexc(pywmmap.wmmap)
pywmmap.wmmap = nil
if err != nil:
raise pyerr(err)
property blk_start:
def __get__(PyMapping pywmmap):
return pywmmap.wmmap.blk_start
property blk_stop:
def __get__(PyMapping pywmmap):
return pywmmap.wmmap.blk_stop()
def __getbuffer__(PyMapping pywmmap, Py_buffer *view, int flags):
PyBuffer_FillInfo(view, pywmmap, pywmmap.wmmap.mem_start,
pywmmap.wmmap.mem_stop - pywmmap.wmmap.mem_start, readonly=1, flags=flags)
property mem:
def __get__(PyMapping pywmmap) -> memoryview:
return memoryview(pywmmap)
def unmap(PyMapping pywmmap):
with nogil:
err = wmmap_unmap_pyexc(pywmmap.wmmap)
if err != nil:
raise pyerr(err)
# ----------------------------------------
cdef class PyWatchLink:
......@@ -153,6 +283,30 @@ cdef nogil:
pair[WatchLink, error] wcfs_openwatch_pyexc(WCFS *wcfs) except +topyexc:
return wcfs._openwatch()
pair[Conn, error] wcfs_connect_pyexc(WCFS *wcfs, Tid at) except +topyexc:
return wcfs.connect(at)
Tid wconn_at_pyexc(Conn wconn) except +topyexc:
return wconn.at()
error wconn_close_pyexc(Conn wconn) except +topyexc:
return wconn.close()
pair[FileH, error] wconn_open_pyexc(Conn wconn, Oid foid) except +topyexc:
return wconn.open(foid)
error wconn_resync_pyexc(Conn wconn, Tid at) except +topyexc:
return wconn.resync(at)
error wfileh_close_pyexc(FileH wfileh) except +topyexc:
return wfileh.close()
pair[Mapping, error] wfileh_mmap_pyexc(FileH wfileh, int64_t blk_start, int64_t blk_len) except +topyexc:
return wfileh.mmap(blk_start, blk_len)
error wmmap_unmap_pyexc(Mapping wmmap) except +topyexc:
return wmmap.unmap()
error wlink_close_pyexc(WatchLink wlink) except +topyexc:
return wlink.close()
......
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""client_test.py unit-tests virtmem layer provided by wcfs client.
WCFS filesystem itself is unit-tested by wcfs/wcfs_test.py .
"""
from __future__ import print_function, absolute_import
from golang import func, defer, error, b
from wendelin.bigfile.file_zodb import ZBigFile
from wendelin.wcfs.wcfs_test import tDB, tAt, timeout, eprint
from wendelin.wcfs import _waitfor_ as waitfor_
from wendelin.wcfs import wcfs_test
from wendelin.wcfs.internal.wcfs_test import read_mustfault
from wendelin.wcfs.internal import mm
from pytest import raises
import os, multiprocessing, gc
# so that e.g. testdb is set up + ...
def setup_module(): wcfs_test.setup_module()
def teardown_module(): wcfs_test.teardown_module()
def setup_function(f): wcfs_test.setup_function(f)
def teardown_function(f): wcfs_test.teardown_function(f)
# tMapping provides testing environment for Mapping.
class tMapping(object):
def __init__(t, tdb, mmap):
t.tdb = tdb
t.mmap = mmap
# assertBlk asserts that mmap[·] with · corresponding to blk reads as dataok.
# pinnedOK: {} blk -> rev of t.mmap.fileh.pinned after access.
#
# see also: tFile.assertBlk .
# NOTE contrary to tFile, pinnedOK represents full fh.pinned state, not
# only pins that wcfs sent to client after tested access.
def assertBlk(t, blk, dataok, pinnedOK):
assert t.mmap.blk_start <= blk < t.mmap.blk_stop
blk_inmmap = blk - t.mmap.blk_start
dataok = b(dataok)
fh = t.mmap.fileh
assert len(dataok) <= fh.blksize
dataok += b'\0'*(fh.blksize - len(dataok)) # trailing zeros
blkview = t.mmap.mem[blk_inmmap*fh.blksize:][:fh.blksize]
# NOTE access to memory goes _with_ GIL: this verifies that wcfs pinner
# is implemented in fully nogil mode because if that was not the case,
# the pinner would deadlock trying to acquire GIL in its thread while
# user thread that triggered the access is already holding the GIL.
#
# - - - - - -
# | |
# pinner <------.
# | | wcfs
# client -------^
# | |
# - - - - - -
# client process
#
_ = blkview[0]
assert _ == dataok[0]
assert blkview.tobytes() == dataok
assert fhpinned(t.tdb, fh) == pinnedOK
# assertBlkFaults asserts that mmap[·] with · corresponding to blk raises
# SIGSEGV on read access.
def assertBlkFaults(t, blk):
assert t.mmap.blk_start <= blk < t.mmap.blk_stop
blk_inmmap = blk - t.mmap.blk_start
fh = t.mmap.fileh
blkview = t.mmap.mem[blk_inmmap*fh.blksize:][:fh.blksize]
for i in range(0, len(blkview), mm.PAGE_SIZE):
read_mustfault(blkview[i:][:1])
# fhpinned(fh) returns fh.pinned with rev wrapped into tAt.
# XXX better wrap FileH into tFileH and do this automatically in .pinned ?
def fhpinned(t, fh):
p = fh.pinned.copy()
for blk in p:
p[blk] = tAt(t, p[blk])
return p
# test_wcfs_client unit-tests virtmem layer of wcfs client.
@func
def test_wcfs_client():
t = tDB(); zf = t.zfile; at0=t.at0
defer(t.close)
pinned = lambda fh: fhpinned(t, fh)
at1 = t.commit(zf, {2:'c1', 3:'d1'})
at2 = t.commit(zf, {2:'c2'})
wconn = t.wc.connect(at1)
defer(wconn.close)
fh = wconn.open(zf._p_oid)
defer(fh.close)
# create mmap with 1 block beyond file size
m1 = fh.mmap(2, 3)
defer(m1.unmap)
assert m1.blk_start == 2
assert m1.blk_stop == 5
assert len(m1.mem) == 3*zf.blksize
tm1 = tMapping(t, m1)
assert pinned(fh) == {}
# verify initial data reads
tm1.assertBlk(2, 'c1', {2:at1})
tm1.assertBlk(3, 'd1', {2:at1})
tm1.assertBlk(4, '', {2:at1})
# commit with growing file size -> verify data read as the same, #3 pinned.
# (#4 is not yet pinned because it was not accessed)
at3 = t.commit(zf, {3:'d3', 4:'e3'})
assert pinned(fh) == {2:at1}
tm1.assertBlk(2, 'c1', {2:at1})
tm1.assertBlk(3, 'd1', {2:at1, 3:at1})
tm1.assertBlk(4, '', {2:at1, 3:at1})
# resync at1 -> at2: #2 must unpin to @head; #4 must stay as zero
wconn.resync(at2)
assert pinned(fh) == {3:at1}
tm1.assertBlk(2, 'c2', { 3:at1})
tm1.assertBlk(3, 'd1', { 3:at1})
tm1.assertBlk(4, '', { 3:at1, 4:at0}) # XXX at0->ø ?
# resync at2 -> at3: #3 must unpin to @head; #4 - start to read with data
wconn.resync(at3)
assert pinned(fh) == {}
tm1.assertBlk(2, 'c2', {})
tm1.assertBlk(3, 'd3', {})
tm1.assertBlk(4, 'e3', {})
# mmap after .size completely (start > size)
m2 = fh.mmap(5, 2); defer(m2.unmap); tm2 = tMapping(t, m2)
tm2.assertBlk(5, '', {})
tm2.assertBlk(6, '', {})
# open same fh twice, close once - fh2 continue to work ok
fh2 = wconn.open(zf._p_oid)
defer(fh2.close)
mfh2 = fh2.mmap(2, 3); defer(mfh2.unmap); tmfh2 = tMapping(t, mfh2)
tm1.assertBlk(2, 'c2', {}); tmfh2.assertBlk(2, 'c2', {})
tm1.assertBlk(3, 'd3', {}); tmfh2.assertBlk(3, 'd3', {})
tm1.assertBlk(4, 'e3', {}); tmfh2.assertBlk(4, 'e3', {})
fh2.close()
tm1.assertBlk(2, 'c2', {}); tmfh2.assertBlk(2, 'c2', {})
tm1.assertBlk(3, 'd3', {}); tmfh2.assertBlk(3, 'd3', {})
tm1.assertBlk(4, 'e3', {}); tmfh2.assertBlk(4, 'e3', {})
m3 = fh.mmap(2, 1); defer(m3.unmap); tm3 = tMapping(t, m3)
tm3.assertBlk(2, 'c2', {})
# resync ↓ -> "forbidden" (reject is from server) -> wconn is down.
with raises(error, match=": going back in history is forbidden"): wconn.resync(at2)
with raises(error, match=".*: connection closed"): wconn.open(zf._p_oid)
# verify that on Conn/FileH down/closed -> Mappings switch to EFAULT on access.
@func
def test_wcfs_client_down_efault():
t = tDB(); zf1 = t.zfile; at0=t.at0
defer(t.close)
at1 = t.commit(zf1, {2:'c1', 3:'d1'})
zf2 = t.root['zfile2'] = ZBigFile(zf1.blksize)
at2 = t.commit()
at3 = t.commit(zf2, {1:'β3', 2:'γ3'})
wconn = t.wc.connect(at3)
defer(wconn.close)
fh1 = wconn.open(zf1._p_oid); defer(fh1.close)
fh2 = wconn.open(zf2._p_oid); defer(fh2.close)
m11 = fh1.mmap(1, 4); defer(m11.unmap); tm11 = tMapping(t, m11)
m12 = fh1.mmap(3, 3); defer(m12.unmap); tm12 = tMapping(t, m12)
m21 = fh2.mmap(0, 4); defer(m21.unmap); tm21 = tMapping(t, m21)
m22 = fh2.mmap(2, 3); defer(m22.unmap); tm22 = tMapping(t, m22)
# initially fh1 and fh2 mmaps read ok.
tm11.assertBlk(1, '', {})
tm11.assertBlk(2, 'c1', {})
tm11.assertBlk(3, 'd1', {}); tm12.assertBlk(3, 'd1', {})
tm11.assertBlk(4, '', {}); tm12.assertBlk(4, '', {})
pass; tm12.assertBlk(5, '', {})
tm21.assertBlk(0, '', {})
tm21.assertBlk(1, 'β3', {})
tm21.assertBlk(2, 'γ3', {}); tm22.assertBlk(2, 'γ3', {})
tm21.assertBlk(3, '', {}); tm22.assertBlk(3, '', {})
pass; tm22.assertBlk(4, '', {})
# close fh1 -> all fh1 mmaps must turn into efaulting memory; fh2 mmaps continue to work ok.
fh1.close()
tm11.assertBlkFaults(1)
tm11.assertBlkFaults(2)
tm11.assertBlkFaults(3); tm12.assertBlkFaults(3)
tm11.assertBlkFaults(4); tm12.assertBlkFaults(4)
pass; tm12.assertBlkFaults(5)
tm21.assertBlk(0, '', {})
tm21.assertBlk(1, 'β3', {})
tm21.assertBlk(2, 'γ3', {}); tm22.assertBlk(2, 'γ3', {})
tm21.assertBlk(3, '', {}); tm22.assertBlk(3, '', {})
pass; tm22.assertBlk(4, '', {})
# open f1 again - mapping created via old fh1 continue to efault; new mappings work ok.
fh1_ = wconn.open(zf1._p_oid); defer(fh1_.close)
m11_ = fh1_.mmap(1, 4); defer(m11_.unmap); tm11_ = tMapping(t, m11_)
tm11.assertBlkFaults(1); tm11_.assertBlk(1, '', {})
tm11.assertBlkFaults(2); tm11_.assertBlk(2, 'c1', {})
tm11.assertBlkFaults(3); tm11_.assertBlk(3, 'd1', {}); tm12.assertBlkFaults(3)
tm11.assertBlkFaults(4); tm11_.assertBlk(4, '', {}); tm12.assertBlkFaults(4)
pass; tm12.assertBlkFaults(5)
tm21.assertBlk(0, '', {})
tm21.assertBlk(1, 'β3', {})
tm21.assertBlk(2, 'γ3', {}); tm22.assertBlk(2, 'γ3', {})
tm21.assertBlk(3, '', {}); tm22.assertBlk(3, '', {})
pass; tm22.assertBlk(4, '', {})
# close wconn -> fh2 and fh1_ mmaps must turn into efaulting too.
wconn.close()
tm11.assertBlkFaults(1); tm11_.assertBlkFaults(1)
tm11.assertBlkFaults(2); tm11_.assertBlkFaults(2)
tm11.assertBlkFaults(3); tm11_.assertBlkFaults(3); tm12.assertBlkFaults(3)
tm11.assertBlkFaults(4); tm11_.assertBlkFaults(4); tm12.assertBlkFaults(4)
pass; tm12.assertBlkFaults(5)
tm21.assertBlkFaults(0)
tm21.assertBlkFaults(1)
tm21.assertBlkFaults(2); tm22.assertBlkFaults(2)
tm21.assertBlkFaults(3); tm22.assertBlkFaults(3)
pass; tm22.assertBlkFaults(4)
# XXX vvv -> separate test?
# verify that after wconn.close()
# wconn.open(), wconn.resync(), fh.mmap() -> error
with raises(error, match=".*: connection closed"): wconn.open(zf1._p_oid)
with raises(error, match=".*: connection closed"): wconn.resync(at3)
with raises(error, match=".*: file already closed"): fh2.mmap(2, 3) # NOTE we did not close fh2 yet
# ----//---- after fileh.close
with raises(error, match=".*: file already closed"): fh1.mmap(2, 3) # fh1 was explicitly closed ^^^
# verify that on fork client turns all child's wcfs mappings to efault and
# detaches from wcfs. (else even automatic FileH.__del__ - caused by GC in child
# - can send message to wcfs server and this way break parent-wcfs exchange).
@func
def test_wcfs_client_afterfork():
t = tDB(); zf = t.zfile; at0=t.at0
defer(t.close)
# initial setup
at1 = t.commit(zf, {1:'b1', 3:'d1'})
wconn = t.wc.connect(at1)
defer(wconn.close)
fh = wconn.open(zf._p_oid); defer(fh.close)
m = fh.mmap(0, 4); tm = tMapping(t, m)
tm.assertBlk(0, '', {})
tm.assertBlk(1, 'b1', {})
tm.assertBlk(2, '', {})
tm.assertBlk(3, 'd1', {})
# fork child and verify that it does not interact with wcfs
def forkedchild():
tm.assertBlkFaults(0)
tm.assertBlkFaults(1)
tm.assertBlkFaults(2)
tm.assertBlkFaults(3)
fh.close() # must be noop in child
gc.collect()
os._exit(0) # NOTE not sys.exit not to execute deferred cleanup prepared by parent
p = multiprocessing.Process(target=forkedchild)
p.start()
if not waitfor_(timeout(), lambda: p.exitcode is not None):
eprint("\nC: child stuck")
eprint("-> kill it (pid %s) ...\n" % p.pid)
p.terminate()
p.join()
assert p.exitcode == 0
# make sure that parent can continue using wcfs normally
at2 = t.commit(zf, {1:'b2'})
tm.assertBlk(0, '', {})
tm.assertBlk(1, 'b1', {1:at1}) # pinned @at1
tm.assertBlk(2, '', {1:at1})
tm.assertBlk(3, 'd1', {1:at1})
wconn.resync(at2) # unpins 1 to @head
tm.assertBlk(0, '', {})
tm.assertBlk(1, 'b2', {})
tm.assertBlk(2, '', {})
tm.assertBlk(3, 'd1', {})
# TODO try to unit test at wcfs client level wcfs.Mapping with dirty RW page -
# that it stays in sync with DB after dirty discard.
# verify that read_mustfault works as expected.
def test_read_mustfault():
mem = mm.map_zero_ro(mm.PAGE_SIZE)
with raises(AssertionError, match="not faulted"): read_mustfault(mem[:1])
mm.protect(mem, mm.PROT_NONE)
read_mustfault(mem[:1])
mm.protect(mem, mm.PROT_READ)
with raises(AssertionError, match="not faulted"): read_mustfault(mem[:1])
......@@ -18,6 +18,147 @@
// See https://www.nexedi.com/licensing for rationale and options.
// Package wcfs provides WCFS client.
// See wcfs.h for package overview.
// Wcfs client organization
//
// Wcfs client provides to its users isolated bigfile views backed by data on
// WCFS filesystem. In the absence of Isolation property, wcfs client would
// reduce to just directly using OS-level file wcfs/head/f for a bigfile f. On
// the other hand there is a simple, but inefficient, way to support isolation:
// for @at database view of bigfile f - directly use OS-level file wcfs/@at/f.
// The latter works, but is very inefficient because OS-cache for f data is not
// shared in between two connections with @at1 and @at2 views. The cache is
// also lost when connection view of the database is resynced on transaction
// boundary. To support isolation efficiently, wcfs client uses wcfs/head/f
// most of the time, but injects wcfs/@revX/f parts into mappings to maintain
// f@at view driven by pin messages that wcfs server sends to client in
// accordance to WCFS isolation protocol(*).
//
// Wcfs server sends pin messages synchronously triggered by access to mmaped
// memory. That means that a client thread, that is accessing wcfs/head/f mmap,
// is completely blocked while wcfs server sends pins and waits to receive acks
// from all clients. In other words on-client handling of pins has to be done
// in separate thread, because wcfs server can also send pins to client that
// triggered the access.
//
// Wcfs client implements pins handling in so-called "pinner" thread(+). The
// pinner thread receives pin requests from wcfs server via watchlink handle
// opened through wcfs/head/watch. For every pin request the pinner finds
// corresponding Mappings and injects wcfs/@revX/f parts via Mapping._remmapblk
// appropriately.
//
// The same watchlink handle is used to send client-originated requests to wcfs
// server. The requests are sent to tell wcfs that client wants to observe a
// particular bigfile as of particular revision, or to stop watching it.
// Such requests originate from regular client threads - not pinner - via entry
// points like Conn.open, Conn.resync and FileH.close.
//
// Every FileH maintains fileh._pinned {} with currently pinned blk -> rev. This
// dict is updated by pinner driven by pin messages, and is used when
// new fileh Mapping is created (FileH.mmap).
//
// In wendelin.core a bigfile has semantic that it is infinite in size and
// reads as all zeros beyond region initialized with data. Memory-mapping of
// OS-level files can also go beyond file size, however accessing memory
// corresponding to file region after file.size triggers SIGBUS. To preserve
// wendelin.core semantic wcfs client mmaps-in zeros for Mapping regions after
// wcfs/head/f.size. For simplicity it is assumed that bigfiles only grow and
// never shrink. It is indeed currently so, but will have to be revisited
// if/when wendelin.core adds bigfile truncation. Wcfs client restats
// wcfs/head/f at every transaction boundary (Conn.resync) and remembers f.size
// in FileH._headfsize for use during one transaction(%).
//
// --------
//
// (*) see wcfs.go documentation for WCFS isolation protocol overview and details.
// (+) currently, for simplicity, there is one pinner thread for each connection.
// In the future, for efficiency, it might be reworked to be one pinner thread
// that serves all connections simultaneously.
// (%) see _headWait comments on how this has to be reworked.
// Wcfs client locking organization
//
// Wcfs client needs to synchronize regular user threads vs each other and vs
// pinner. A major lock Conn.atMu protects updates to changes to Conn's view of
// the database. Whenever atMu.W is taken - Conn.at is changing (Conn.resync),
// and contrary whenever atMu.R is taken - Conn.at is stable (roughly speaking
// Conn.resync is not running).
//
// Similarly to wcfs.go(*) several locks that protect internal data structures
// are minor to Conn.atMu - they need to be taken only under atMu.R (to
// synchronize e.g. multiple fileh open running simultaneously), but do not
// need to be taken at all if atMu.W is taken. In data structures such locks
// are noted as follows
//
// sync::Mutex xMu; // atMu.W | atMu.R + xMu
//
// After atMu, Conn.filehMu protects registry of opened file handles
// (Conn._filehTab), and FileH.mmapMu protects registry of created Mappings
// (FileH.mmaps) and FileH.pinned.
//
// Several locks are RWMutex instead of just Mutex not only to allow more
// concurrency, but, in the first place for correctness: pinner thread being
// core element in handling WCFS isolation protocol, is effectively invoked
// synchronously from other threads via messages coming through wcfs server.
// For example Conn.resync sends watch request to wcfs server and waits for the
// answer. Wcfs server, in turn, might send corresponding pin messages to the
// pinner and _wait_ for the answer before answering to resync:
//
// - - - - - -
// | .···|·····. ----> = request
// pinner <------.↓ <···· = response
// | | wcfs
// resync -------^↓
// | `····|·····
// - - - - - -
// client process
//
// This creates the necessity to use RWMutex for locks that pinner and other
// parts of the code could be using at the same time in synchronous scenarios
// similar to the above. This locks are:
//
// - Conn.atMu
// - Conn.filehMu
//
// Note that FileH.mmapMu is regular - not RW - mutex, since nothing in wcfs
// client calls into wcfs server via watchlink with mmapMu held.
//
// The ordering of locks is:
//
// Conn.atMu > Conn.filehMu > FileH.mmapMu
//
// The pinner takes the following locks:
//
// - wconn.atMu.R
// - wconn.filehMu.R
// - fileh.mmapMu (to read .mmaps + write .pinned)
//
//
// (*) see "Wcfs locking organization" in wcfs.go
// Handling of fork
//
// When a process calls fork, OS copies its memory and creates child process
// with only 1 thread. That child inherits file descriptors and memory mappings
// from parent. To correctly continue using Conn, FileH and Mappings, the child
// must recreate pinner thread and reconnect to wcfs via reopened watchlink.
// The reason here is that without reconnection - by using watchlink file
// descriptor inherited from parent - the child would interfere into
// parent-wcfs exchange and neither parent nor child could continue normal
// protocol communication with WCFS.
//
// For simplicity, since fork is seldomly used for things besides followup
// exec, wcfs client currently takes straightforward approach by disabling
// mappings and detaching from WCFS server in the child right after fork. This
// ensures that there is no interference into parent-wcfs exchange should child
// decide not to exec and to continue running in the forked thread. Without
// this protection the interference might come even automatically via e.g.
// Python GC -> PyFileH.__del__ -> FileH.close -> message to WCFS.
#include "wcfs_misc.h"
#include "wcfs.h"
......@@ -25,11 +166,1071 @@
#include <golang/errors.h>
#include <golang/fmt.h>
#include <golang/io.h>
#include <golang/time.h>
#include <algorithm>
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
using std::min;
using std::max;
using std::vector;
namespace ioutil = io::ioutil;
#define TRACE 0
#if TRACE
# define trace(format, ...) log::Debugf(format, ##__VA_ARGS__)
#else
# define trace(format, ...) do {} while (0)
#endif
// trace with op prefix taken from E.
#define etrace(format, ...) trace("%s", v(E(fmt::errorf(format, ##__VA_ARGS__))))
#define ASSERT(expr) do { \
if (!(expr)) \
panic("assert failed: " #expr); \
} while(0)
// wcfs::
namespace wcfs {
static error mmap_zero_into_ro(void *addr, size_t size);
static error mmap_efault_into(void *addr, size_t size);
static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size);
static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset);
// _headWait waits till wcfs/head/at becomes ≥ at.
//
// _headWait is currently needed, because client stats wcfs/head/f to get f
// size assuming that f size only ↑. The assumption is not generally valid
// (e.g. f might be truncated = hole punched for block at tail), but holds true
// for now. However to get correct results wcfs/head/f has to be statt'ed
// _after_ wcfs view of the database becomes ≥ wconn.at.
//
// TODO extend isolation protocol to report f size as of @at database state at
// watch init/update(*). This way there won't be need for headWait as correct
// file size @at will be returned by wcfs itself, which will also work if
// wcfs/head/f size is changed arbitrarily.
//
// (*) equivalent might be to send something like "pin #<bsize>.. Z" (pin
// blocks bsize till ∞ to zeros).
error WCFS::_headWait(zodb::Tid at) {
WCFS& wc = *this;
xerr::Contextf E("%s: headWait @%s", v(wc), v(at));
etrace("");
zodb::Tid xat;
string xatStr;
error err;
// XXX dumb implementation, because _headWait should go away.
while (1) {
tie(xatStr, err) = ioutil::ReadFile(wc._path("head/at"));
if (err != nil)
return E(err);
tie(xat, err) = xstrconv::parseHex64(xatStr);
if (err != nil)
return E(fmt::errorf("head/at: %w", err));
if (xat >= at)
break;
time::sleep(1*time::millisecond);
}
return nil;
}
// connect creates new Conn viewing WCFS state as of @at.
pair<Conn, error> WCFS::connect(zodb::Tid at) {
WCFS *wc = this;
xerr::Contextf E("%s: connect @%s", v(wc), v(at));
etrace("");
error err;
// TODO support !isolated mode
// need to wait till `wcfs/head/at ≥ at` because e.g. Conn.open stats
// head/f to get f.headfsize.
err = wc->_headWait(at);
if (err != nil) {
return make_pair(nil, E(err));
}
WatchLink wlink;
tie(wlink, err) = wc->_openwatch();
if (err != nil)
return make_pair(nil, E(err));
Conn wconn = adoptref(new _Conn());
wconn->_wc = wc;
wconn->_at = at;
wconn->_wlink = wlink;
os::RegisterAfterFork(newref(
static_cast<os::_IAfterFork*>( wconn._ptr() )
));
context::Context pinCtx;
tie(pinCtx, wconn->_pinCancel) = context::with_cancel(context::background());
wconn->_pinWG = sync::NewWorkGroup(pinCtx);
wconn->_pinWG->go([wconn](context::Context ctx) -> error {
return wconn->_pinner(ctx);
});
return make_pair(wconn, nil);
}
static global<error> errConnClosed = errors::New("connection closed");
// close releases resources associated with wconn.
//
// opened fileh and mappings become invalid to use except close and unmap.
error _Conn::close() {
// NOTE keep in sync with Conn.afterFork
_Conn& wconn = *this;
wconn._atMu.RLock();
defer([&]() {
wconn._atMu.RUnlock();
});
xerr::Contextf E("%s: close", v(wconn));
etrace("");
error err, eret;
auto reterr1 = [&eret](error err) {
if (eret == nil && err != nil)
eret = err;
};
// mark wconn as closed, so that no new wconn.open might be spawned.
bool alreadyClosed = false;
wconn._filehMu.Lock();
alreadyClosed = (wconn._downErr == errConnClosed);
wconn._downErr = errConnClosed;
wconn._filehMu.Unlock();
if (alreadyClosed)
return nil;
// close all files - both that have no mappings and that still have opened
// mappings. We have to close files before shutting down pinner, because
// wcfs might send pin messages due to file access by other clients. So to
// avoid being killed we have to unwatch all files before stopping the
// pinner.
//
// NOTE after file is closed, its mappings could continue to survive, but
// we can no longer maintain consistent view. For this reason we change
// mappings to give EFAULT on access.
while (1) {
FileH f = nil;
bool opening;
// pick up any fileh
wconn._filehMu.Lock();
if (!wconn._filehTab.empty()) {
f = wconn._filehTab.begin()->second;
opening = (f->_state < _FileHOpened);
}
wconn._filehMu.Unlock();
if (f == nil)
break; // all closed
// if fileh was "opening" - wait for the open to complete before calling close.
if (opening) {
f->_openReady.recv();
if (f->_openErr != nil)
continue; // failed open; f should be removed from wconn._filehTab by Conn.open itself
}
// force fileh close.
// - wconn.atMu.R
// - wconn.filehMu unlocked
err = f->_closeLocked(/*force=*/true);
if (err != nil)
reterr1(err);
// wait for f close to complete, as it might be that f.close was called
// simultaneously to us or just before. f is removed from
// wconn.filehTab only after close is complete.
f->_closedq.recv();
}
// close wlink and signal to pinner to stop.
err = wconn._wlink->close();
if (err != nil)
reterr1(err);
wconn._pinCancel();
err = wconn._pinWG->wait();
if (!errors::Is(err, context::canceled)) // canceled - ok
reterr1(err);
os::UnregisterAfterFork(newref(
static_cast<os::_IAfterFork*>( &wconn )
));
return E(eret);
}
// afterFork detaches from wcfs in child process right after fork.
//
// opened fileh are closed abruptly without sending "bye" not to interfere into
// parent-wcfs exchange. Existing mappings become invalid to use.
void _Conn::afterFork() {
// NOTE keep in sync with Conn.close
_Conn& wconn = *this;
// ↓↓↓ parallels Conn::close, but without locking and exchange with wcfs.
//
// After fork in child we are the only thread that exists/runs.
// -> no need to lock anything; trying to use locks could even deadlock,
// because locks state is snapshotted from at fork time, when a lock could
// be already locked by some thread.
bool alreadyClosed = (wconn._downErr == errConnClosed);
if (alreadyClosed)
return;
// close all files and make mappings efault.
while (!wconn._filehTab.empty()) {
FileH f = wconn._filehTab.begin()->second;
// close f even if f->_state < _FileHOpened
// (in parent closure of opening-in-progress files is done by
// Conn::open, but in child we are the only one to release resources)
f->_afterFork();
}
// NOTE no need to wlink->close() - wlink handles afterFork by itself.
// NOTE no need to signal pinner, as fork does not clone the pinner into child.
}
// _pinner receives pin messages from wcfs and adjusts wconn file mappings.
error _Conn::_pinner(context::Context ctx) {
Conn wconn = newref(this); // newref for go
error err = wconn->__pinner(ctx);
// if pinner fails, wcfs will kill us.
// log pinner error so that the error is not hidden.
// print to stderr as well as by default log does not print to there.
// TODO also catch panic/exc ?
if (!(err == nil || errors::Is(err, context::canceled))) { // canceled = .close asks pinner to stop
log::Fatalf("CRITICAL: %s", v(err));
log::Fatalf("CRITICAL: wcfs server will likely kill us soon.");
fprintf(stderr, "CRITICAL: %s\n", v(err));
fprintf(stderr, "CRITICAL: wcfs server will likely kill us soon.\n");
// mark the connection non-operational if pinner fails.
//
// XXX go because wconn.close might deadlock wrt Conn.resync on
// wconn._filehMu, because Conn.resync sends "watch" updates under
// wconn._filehMu (however Conn.open and FileH.close send "watch"
// _without_ wconn._filehMu). If pinner fails - we already have serious
// problems... TODO try to resolve the deadlock.
go([wconn]() {
wconn->close();
});
}
return err;
}
error _Conn::__pinner(context::Context ctx) {
_Conn& wconn = *this;
xerr::Contextf E("pinner"); // NOTE pinner error goes to Conn::close who has its own context
etrace("");
PinReq req;
error err;
while (1) {
err = wconn._wlink->recvReq(ctx, &req);
if (err != nil) {
// it is ok if we receive EOF due to us (client) closing the connection
if (err == io::EOF_) {
wconn._filehMu.RLock();
err = (wconn._downErr == errConnClosed) ? nil : io::ErrUnexpectedEOF;
wconn._filehMu.RUnlock();
}
return E(err);
}
// we received request to pin/unpin file block. handle it
err = wconn._pin1(&req);
if (err != nil) {
return E(err);
}
}
}
// pin1 handles one pin request received from wcfs.
error _Conn::_pin1(PinReq *req) {
_Conn& wconn = *this;
xerr::Contextf E("pin f<%s> #%ld @%s", v(req->foid), req->blk, v(req->at));
etrace("");
error err = wconn.__pin1(req);
// reply either ack or nak on error
string ack = "ack";
if (err != nil)
ack = fmt::sprintf("nak: %s", v(err));
// NOTE ctx=bg to always send reply even if we are canceled
error err2 = wconn._wlink->replyReq(context::background(), req, ack);
if (err == nil)
err = err2;
return E(err);
}
error _Conn::__pin1(PinReq *req) {
_Conn& wconn = *this;
FileH f;
bool ok;
wconn._atMu.RLock();
defer([&]() {
wconn._atMu.RUnlock();
});
// lock wconn.filehMu.R to lookup fileh in wconn.filehTab.
//
// keep wconn.filehMu.R locked during whole __pin1 run to make sure that
// e.g. simultaneous FileH.close does not remove f from wconn.filehTab.
// TODO keeping filehMu.R during whole pin1 is not needed and locking can be made more granular.
//
// NOTE no deadlock wrt Conn.resync, Conn.open, FileH.close - they all send
// "watch" requests to wcfs server outside of wconn.filehMu.
wconn._filehMu.RLock();
defer([&]() {
wconn._filehMu.RUnlock();
});
tie(f, ok) = wconn._filehTab.get_(req->foid);
if (!ok) {
// why wcfs sent us this update?
return fmt::errorf("unexpected pin: f<%s> not watched", v(req->foid));
}
// NOTE no need to check f._state as we need to go only through f.mmaps, and
// wcfs server can send us pins at any state, including "opening" - to pin
// our view to requested @at, and "closing" - due to other clients
// accessing wcfs/head/f simultaneously.
f->_mmapMu.lock();
defer([&]() {
f->_mmapMu.unlock();
});
for (auto mmap : f->_mmaps) { // TODO use ↑blk_start for binary search
if (!(mmap->blk_start <= req->blk && req->blk < mmap->blk_stop()))
continue; // blk ∉ mmap
trace("\tremmapblk %d @%s", req->blk, (req->at == TidHead ? "head" : v(req->at)));
error err = mmap->_remmapblk(req->blk, req->at);
// on error don't need to continue with other mappings - all fileh and
// all mappings become marked invalid on pinner failure.
if (err != nil)
return err;
trace("\t-> remmaped");
}
// update f._pinned
if (req->at == TidHead) {
f->_pinned.erase(req->blk); // unpin to @head
}
else {
f->_pinned[req->blk] = req->at;
}
return nil;
}
// at returns database state corresponding to the connection.
zodb::Tid _Conn::at() {
_Conn& wconn = *this;
wconn._atMu.RLock();
defer([&]() {
wconn._atMu.RUnlock();
});
return wconn._at;
}
// resync resyncs connection and its file mappings onto different database view.
//
// bigfile/_file_zodb.pyx arranges to call Conn.resync at transaction boundaries
// to keep Conn view in sync with updated zconn database view.
error _Conn::resync(zodb::Tid at) {
_Conn& wconn = *this;
error err;
wconn._atMu.RLock();
xerr::Contextf E("%s: resync -> @%s", v(wconn), v(at));
etrace("");
wconn._filehMu.RLock();
err = wconn._downErr;
wconn._filehMu.RUnlock();
wconn._atMu.RUnlock();
if (err != nil)
return E(err);
// wait for wcfs/head to be >= at.
// we need this e.g. to be sure that head/f.size is at least as big that it will be @at state.
err = wconn._wc->_headWait(at);
if (err != nil)
return E(err);
// bring wconn + fileh + mmaps down on error
bool retok = false;
defer([&]() {
if (!retok)
wconn.close(); // ignore error
});
// lock wconn._atMu.W . This excludes everything else, and in
// particular _pinner_, from running and mutating files and mappings.
//
// NOTE we'll relock atMu as R in the second part of resync, so we prelock
// wconn._filehMu.R as well while under atMu.W, to be sure that set of opened
// files and their states stay the same during whole resync.
bool atMuWLocked = true;
wconn._atMu.Lock();
wconn._filehMu.RLock();
defer([&]() {
wconn._filehMu.RUnlock();
if (atMuWLocked)
wconn._atMu.Unlock();
else
wconn._atMu.RUnlock();
});
err = wconn._downErr;
if (err != nil)
return E(err);
// set new wconn.at early, so that e.g. Conn.open running simultaneously
// to second part of resync (see below) uses new at.
wconn._at = at;
// go through all files opened under wconn and pre-adjust their mappings
// for viewing data as of new @at state.
//
// We are still holding atMu.W, so we are the only mutators of mappings,
// because, in particular, pinner is not running.
//
// Don't send watch updates for opened files to wcfs yet - without running
// pinner those updates will get stuck.
for (auto fit : wconn._filehTab) {
//zodb::Oid foid = fit.first;
FileH f = fit.second;
// TODO if file has no mappings and was not used during whole prev
// cycle - forget and stop watching it?
// "opening" or "closing" fileh - their setup/teardown is currently
// handled by Conn.open and FileH.close correspondingly.
if (f->_state != _FileHOpened)
continue;
// update f._headfsize and remmap to head/f zero regions that are now covered by head/f
struct stat st;
err = f->_headf->stat(&st);
if (err != nil)
return E(err);
if ((size_t)st.st_blksize != f->blksize) // blksize must not change
return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize));
auto headfsize = st.st_size;
if (!(f->_headfsize <= headfsize)) // head/file size ↑=
return E(fmt::errorf("wcfs bug: head/file size not ↑="));
if (!(headfsize % f->blksize == 0))
return E(fmt::errorf("wcfs bug: head/file size %% blksize != 0"));
// replace zero regions in f mappings in accordance to adjusted f._headfsize.
// NOTE it is ok to access f._mmaps without locking f._mmapMu because we hold wconn.atMu.W
for (auto mmap : f->_mmaps) {
//trace(" resync -> %s: unzero [%lu:%lu)", v(at), f->_headfsize/f->blksize, headfsize/f->blksize);
uint8_t *mem_unzero_start = min(mmap->mem_stop,
mmap->mem_start + (f->_headfsize - mmap->blk_start*f->blksize));
uint8_t *mem_unzero_stop = min(mmap->mem_stop,
mmap->mem_start + ( headfsize - mmap->blk_start*f->blksize));
if (mem_unzero_stop - mem_unzero_start > 0) {
err = mmap_into_ro(mem_unzero_start, mem_unzero_stop-mem_unzero_start, f->_headf, f->_headfsize);
if (err != nil)
return E(err);
}
}
f->_headfsize = headfsize;
}
// atomically downgrade atMu.W to atMu.R before issuing watch updates to wcfs.
// - we need atMu to be not Wlocked, because under atMu.W pinner cannot run simultaneously to us.
// - we need to hold atMu.R to avoid race wrt e.g. other resync which changes at.
// - we cannot just do regular `atMu.Unlock + atMu.RLock()` because then
// there is e.g. a race window in between Unlock and RLock where wconn.at can be changed.
// Also if we Unlock and Rlock, it will produce deadlock, because locking
// order will change to reverse: wconn._filehMu.R + wconn._atMu.R
//
// Now other calls, e.g. Conn.open, can be running simultaneously to us,
// but since we already set wconn.at to new value it is ok. For example
// Conn.open, for not-yet-opened file, will use new at to send "watch".
//
// NOTE we are still holding wconn._filehMu.R, so wconn._filehTab and fileh
// states are the same as in previous pass above.
wconn._atMu.UnlockToRLock();
atMuWLocked = false;
// send watch updates to wcfs.
// the pinner is now running and will be able to serve pin requests triggered by our watch.
//
// update only fileh in "opened" state - for fileh in "opening" and
// "closing" states, watch setup/teardown is currently in-progress and
// performed by Conn.open and FileH.close correspondingly.
for (auto fit : wconn._filehTab) {
zodb::Oid foid = fit.first;
FileH f = fit.second;
if (f->_state != _FileHOpened)
continue;
string ack;
tie(ack, err) = wconn._wlink->sendReq(context::background(),
fmt::sprintf("watch %s @%s", v(foid), v(at)));
if (err != nil)
return E(err);
if (ack != "ok")
return E(fmt::errorf("%s", v(ack)));
}
retok = true;
return nil;
}
// open opens FileH corresponding to ZBigFile foid.
pair<FileH, error> _Conn::open(zodb::Oid foid) {
_Conn& wconn = *this;
error err;
wconn._atMu.RLock();
defer([&]() {
wconn._atMu.RUnlock();
});
xerr::Contextf E("%s: open f<%s>", v(wconn), v(foid));
etrace("");
retry:
wconn._filehMu.Lock();
if (wconn._downErr != nil) {
err = wconn._downErr;
wconn._filehMu.Unlock();
return make_pair(nil, E(err));
}
// TODO ensure f<foid>@ wconn.at exists - else we get pins to non-existing
// state from wcfs, pinner replies nak, wcfs sends SIGBUS.
// TODO -> better teach wcfs to reject "watch <foid> @at" for @at where f did not existed.
// (see test_wcfs_watch_before_create)
FileH f; bool ok;
tie(f, ok) = wconn._filehTab.get_(foid);
if (ok) {
bool closing;
if (f->_state <= _FileHOpened) {
f->_nopen++;
closing = false;
} else {
closing = true;
}
wconn._filehMu.Unlock();
// if the file was closing|closed, we should wait for the close to
// complete and retry the open.
if (closing) {
f->_closedq.recv();
goto retry;
}
// the file was opening|opened. wait for open to complete and return the result.
// we can be sure there won't be last close simultaneous to us as we did ._nopen++
f->_openReady.recv();
if (f->_openErr != nil) {
// don't care about f->_nopen-- since f is not returned anywhere
return make_pair(nil, E(f->_openErr));
}
return make_pair(f, nil);
}
// create "opening" FileH entry and perform open with wconn._filehMu released.
// NOTE wconn._atMu.R is still held because FileH._open relies on wconn.at being stable.
f = adoptref(new _FileH());
f->wconn = newref(&wconn);
f->foid = foid;
f->_openReady = makechan<structZ>();
f->_closedq = makechan<structZ>();
f->_openErr = nil;
f->_headf = nil;
f->blksize = 0;
f->_headfsize = 0;
f->_state = _FileHOpening;
f->_nopen = 1;
bool retok = false;
wconn._filehTab[foid] = f;
wconn._filehMu.Unlock();
defer([&]() {
wconn._filehMu.Lock();
if (wconn._filehTab.get(foid) != f) {
wconn._filehMu.Unlock();
panic("BUG: wconn.open: wconn.filehTab[foid] mutated while file open was in progress");
}
if (!retok) {
// don't care about f->_nopen-- since f is not returned anywhere
wconn._filehTab.erase(foid);
} else {
f->_state = _FileHOpened;
}
wconn._filehMu.Unlock();
f->_openReady.close();
});
// do the actual open.
// we hold only wconn.atMu.R, but neither wconn.filehMu, nor f.mmapMu .
f->_openErr = f->_open();
if (f->_openErr != nil)
return make_pair(nil, E(f->_openErr));
// NOTE no need to recheck that wconn was not closed while the open was in
// progress: we'll return "success" but Conn.close will close the fileh.
// However it is indistinguishable from the following scenario:
//
// T1 T2
//
// f = wconn.open()
// # completes ok
// wconn.close()
//
// # use f -> error
retok = true;
return make_pair(f, nil);
}
// _open performs actual open of FileH marked as "in-flight-open" in wconn.filehTab.
//
// Called with:
// - wconn.atMu held
// - wconn.filehMu not locked
// - f.mmapMu not locked
error _FileH::_open() {
_FileH& f = *this;
Conn wconn = f.wconn;
error err;
tie(f._headf, err)
= wconn->_wc->_open(fmt::sprintf("head/bigfile/%s", v(foid)));
if (err != nil)
return err;
bool retok = false;
defer([&]() {
if (!retok)
f._headf->close();
});
struct stat st;
err = f._headf->stat(&st);
if (err != nil)
return err;
f.blksize = st.st_blksize;
f._headfsize = st.st_size;
if (!(f._headfsize % f.blksize == 0))
return fmt::errorf("wcfs bug: %s size (%d) %% blksize (%d) != 0",
v(f._headf->name()), f._headfsize, f.blksize);
// start watching f
// NOTE we are _not_ holding wconn.filehMu nor f.mmapMu - only wconn.atMu to rely on wconn.at being stable.
// NOTE wcfs will reply "ok" only after wcfs/head/at ≥ wconn.at
string ack;
tie(ack, err) = wconn->_wlink->sendReq(context::background(),
fmt::sprintf("watch %s @%s", v(foid), v(wconn->_at)));
if (err != nil)
return err;
if (ack != "ok")
return fmt::errorf("watch: %s", v(ack));
retok = true;
return nil;
}
// close releases resources associated with FileH.
//
// Left fileh mappings become invalid to use except unmap.
error _FileH::close() {
_FileH& fileh = *this;
Conn wconn = fileh.wconn;
wconn->_atMu.RLock();
defer([&]() {
wconn->_atMu.RUnlock();
});
return fileh._closeLocked(/*force=*/false);
}
// _closeLocked serves FileH.close and Conn.close.
//
// Must be called with the following locks held by caller:
// - wconn.atMu
error _FileH::_closeLocked(bool force) {
// NOTE keep in sync with FileH._afterFork
_FileH& fileh = *this;
Conn wconn = fileh.wconn;
wconn->_filehMu.Lock();
defer([&]() {
wconn->_filehMu.Unlock();
});
// fileh.close can be called several times. just return nil for second close.
if (fileh._state >= _FileHClosing)
return nil;
// decref open count; do real close only when last open goes away.
if (fileh._nopen <= 0)
panic("BUG: fileh.close: fileh._nopen <= 0");
fileh._nopen--;
if (fileh._nopen > 0 && !force)
return nil;
// last open went away - real close.
xerr::Contextf E("%s: %s: close", v(wconn), v(fileh));
etrace("");
ASSERT(fileh._state == _FileHOpened); // there can be no open-in-progress, because
fileh._state = _FileHClosing; // .close() can be called only on "opened" fileh
// unlock wconn._filehMu to stop watching the file outside of this lock.
// we'll relock wconn._filehMu again before updating wconn.filehTab.
wconn->_filehMu.Unlock();
error err, eret;
auto reterr1 = [&eret](error err) {
if (eret == nil && err != nil)
eret = err;
};
// stop watching f
string ack;
tie(ack, err) = wconn->_wlink->sendReq(context::background(),
fmt::sprintf("watch %s -", v(foid)));
if (err != nil)
reterr1(err);
else if (ack != "ok")
reterr1(fmt::errorf("unwatch: %s", v(ack)));
// relock wconn._filehMu again and remove fileh from wconn._filehTab
wconn->_filehMu.Lock();
if (wconn->_filehTab.get(fileh.foid)._ptr() != &fileh)
panic("BUG: fileh.close: wconn.filehTab[fileh.foid] != fileh");
wconn->_filehTab.erase(fileh.foid);
reterr1(fileh._headf->close());
// change all fileh.mmaps to cause EFAULT on any access after fileh.close
fileh._mmapMu.lock();
defer([&]() {
fileh._mmapMu.unlock();
});
for (auto mmap : fileh._mmaps) {
err = mmap->__remmapAsEfault();
if (err != nil)
reterr1(err);
}
// fileh close complete
fileh._state = _FileHClosed;
fileh._closedq.close();
return E(eret);
}
// _afterFork is similar to _closeLocked and releases FileH resource and
// mappings right after fork.
void _FileH::_afterFork() {
// NOTE keep in sync with FileH._closeLocked
_FileH& fileh = *this;
Conn wconn = fileh.wconn;
// ↓↓↓ parallels FileH._closeLocked but without locking and wcfs exchange.
//
// There is no locking (see Conn::afterFork for why) and we shutdown file
// even if ._state == _FileHClosing, because that state was copied from
// parent and it is inside parent where it is another thread that is
// currently closing *parent's* FileH.
if (fileh._state == _FileHClosed) // NOTE _not_ >= _FileHClosing
return;
// don't send to wcfs "stop watch f" not to disrupt parent-wcfs exchange.
// just close the file.
if (wconn->_filehTab.get(fileh.foid)._ptr() != &fileh)
panic("BUG: fileh.closeAfterFork: wconn.filehTab[fileh.foid] != fileh");
wconn->_filehTab.erase(fileh.foid);
fileh._headf->close(); // ignore err
// change all fileh.mmaps to cause EFAULT on access
for (auto mmap : fileh._mmaps) {
mmap->__remmapAsEfault(); // ignore err
}
// done
fileh._state = _FileHClosed;
}
// mmap creates file mapping representing file[blk_start +blk_len) data as of wconn.at database state.
pair<Mapping, error> _FileH::mmap(int64_t blk_start, int64_t blk_len) {
_FileH& f = *this;
f.wconn->_atMu.RLock(); // e.g. f._headfsize
f.wconn->_filehMu.RLock(); // f._state TODO -> finer grained (currently too coarse)
f._mmapMu.lock(); // f._pinned, f._mmaps
defer([&]() {
f._mmapMu.unlock();
f.wconn->_filehMu.RUnlock();
f.wconn->_atMu.RUnlock();
});
xerr::Contextf E("%s: %s: mmap [#%ld +%ld)", v(f.wconn), v(f), blk_start, blk_len);
etrace("");
if (f._state >= _FileHClosing)
return make_pair(nil, E(os::ErrClosed));
error err;
if (blk_start < 0)
panic("blk_start < 0");
if (blk_len < 0)
panic("blk_len < 0");
int64_t blk_stop; // = blk_start + blk_len
if (__builtin_add_overflow(blk_start, blk_len, &blk_stop))
panic("blk_start + blk_len overflow int64");
int64_t stop;// = blk_stop *f.blksize;
if (__builtin_mul_overflow(blk_stop, f.blksize, &stop))
panic("(blk_start + blk_len)*f.blksize overflow int64");
int64_t start = blk_start*f.blksize;
// create memory with head/f mapping and applied pins
// mmap-in zeros after f.size (else access to memory after file.size will raise SIGBUS)
uint8_t *mem_start, *mem_stop;
tie(mem_start, err) = mmap_ro(f._headf, start, blk_len*f.blksize);
if (err != nil)
return make_pair(nil, E(err));
mem_stop = mem_start + blk_len*f.blksize;
bool retok = false;
defer([&]() {
if (!retok)
mm::unmap(mem_start, mem_stop - mem_start); // ignore error
});
// part of mmapped region is beyond file size - mmap that with zeros - else
// access to memory after file.size will raise SIGBUS. (assumes head/f size ↑=)
if (stop > f._headfsize) {
uint8_t *zmem_start = mem_start + (max(f._headfsize, start) - start);
err = mmap_zero_into_ro(zmem_start, mem_stop - zmem_start);
if (err != nil)
return make_pair(nil, E(err));
}
Mapping mmap = adoptref(new _Mapping());
mmap->fileh = newref(&f);
mmap->blk_start = blk_start;
mmap->mem_start = mem_start;
mmap->mem_stop = mem_stop;
mmap->efaulted = false;
for (auto _ : f._pinned) { // TODO keep f._pinned ↑blk and use binary search
int64_t blk = _.first;
zodb::Tid rev = _.second;
if (!(blk_start <= blk && blk < blk_stop))
continue; // blk ∉ this mapping
err = mmap->_remmapblk(blk, rev);
if (err != nil)
return make_pair(nil, E(err));
}
f._mmaps.push_back(mmap); // TODO keep f._mmaps ↑blk_start
retok = true;
return make_pair(mmap, nil);
}
// __remmapAsEfault remmaps Mapping memory to cause SIGSEGV on access.
//
// It is used on FileH shutdown to turn all fileh mappings into incorrect ones,
// because after fileh is down, it is not possible to continue to provide
// correct f@at data view.
//
// Must be called with the following locks held by caller:
// - fileh.mmapMu
error _Mapping::__remmapAsEfault() {
_Mapping& mmap = *this;
FileH f = mmap.fileh;
// errctx: no need for wconn and f: __remmapAsEfault is called only from
// FileH._closeLocked who adds them.
xerr::Contextf E("%s: remmap as efault", v(mmap));
etrace("");
error err = mmap_efault_into(mmap.mem_start, mmap.mem_stop - mmap.mem_start);
mmap.efaulted = true;
return E(err);
}
// __remmapBlkAsEfault is similar to __remmapAsEfault, but remmaps memory of only 1 block.
// blk must be in mapped range.
error _Mapping::__remmapBlkAsEfault(int64_t blk) {
_Mapping& mmap = *this;
FileH f = mmap.fileh;
xerr::Contextf E("%s: remmapblk #%ld as efault", v(mmap), blk);
etrace("");
ASSERT(mmap.blk_start <= blk && blk < mmap.blk_stop());
uint8_t *blkmem = mmap.mem_start + (blk - mmap.blk_start)*f->blksize;
error err = mmap_efault_into(blkmem, 1*f->blksize);
return E(err);
}
// unmap releases mapping memory from address space.
//
// After call to unmap the mapping must no longer be used.
error _Mapping::unmap() {
Mapping mmap = newref(this); // newref for std::remove
FileH f = mmap->fileh;
f->wconn->_atMu.RLock();
f->_mmapMu.lock();
defer([&]() {
f->_mmapMu.unlock();
f->wconn->_atMu.RUnlock();
});
xerr::Contextf E("%s: %s: %s: unmap", v(f->wconn), v(f), v(mmap));
etrace("");
// double unmap = ok
if (mmap->mem_start == nil)
return nil;
error err = mm::unmap(mmap->mem_start, mmap->mem_stop - mmap->mem_start);
mmap->mem_start = nil;
mmap->mem_stop = nil;
//f->_mmaps.remove(mmap);
f->_mmaps.erase(
std::remove(f->_mmaps.begin(), f->_mmaps.end(), mmap),
f->_mmaps.end());
return E(err);
}
// _remmapblk remmaps mapping memory for file[blk] to be viewing database as of @at state.
//
// at=TidHead means unpin to head/ .
//
// _remmapblk must not be called after Mapping is switched to efault.
//
// The following locks must be held by caller:
// - f.wconn.atMu
// - f._mmapMu
error _Mapping::_remmapblk(int64_t blk, zodb::Tid at) {
_Mapping& mmap = *this;
FileH f = mmap.fileh;
xerr::Contextf E("_remmapblk #%ld @%s", blk, v(at));
etrace("");
ASSERT(mmap.blk_start <= blk && blk < mmap.blk_stop());
// a mmapping is efaulted only for closed files, i.e. fileh is removed from wconn._filehTab
// -> pinner should not see the fileh and so should not see this mapping.
ASSERT(!mmap.efaulted);
uint8_t *blkmem = mmap.mem_start + (blk - mmap.blk_start)*f->blksize;
error err;
os::File fsfile;
bool fclose = false;
if (at == TidHead) {
fsfile = f->_headf;
}
else {
// TODO share @rev fd until wconn is resynced?
tie(fsfile, err) = f->wconn->_wc->_open(
fmt::sprintf("@%s/bigfile/%s", v(at), v(f->foid)));
if (err != nil)
return E(err);
fclose = true;
}
defer([&]() {
if (fclose)
fsfile->close();
});
struct stat st;
err = fsfile->stat(&st);
if (err != nil)
return E(err);
if ((size_t)st.st_blksize != f->blksize)
return E(fmt::errorf("wcfs bug: blksize changed: %zd -> %ld", f->blksize, st.st_blksize));
// block is beyond file size - mmap with zeros - else access to memory
// after file.size will raise SIGBUS. (assumes head/f size ↑=)
if ((blk+1)*f->blksize > (size_t)st.st_size) {
err = mmap_zero_into_ro(blkmem, 1*f->blksize);
if (err != nil)
return E(err);
}
// block is inside file - mmap in file data
else {
err = mmap_into_ro(blkmem, 1*f->blksize, fsfile, blk*f->blksize);
if (err != nil)
return E(err);
}
return nil;
}
// ---- WCFS raw file access ----
......@@ -49,9 +1250,109 @@ tuple<os::File, error> WCFS::_open(const string &path, int flags) {
// ---- misc ----
// mmap_zero_into serves mmap_zero_into_ro and mmap_efault_into.
static error mmap_zero_into(void *addr, size_t size, int prot) {
xerr::Contextf E("mmap zero");
etrace("");
// mmap /dev/zero with MAP_NORESERVE and MAP_SHARED
// this way the mapping will be able to be read, but no memory will be allocated to keep it.
os::File z;
error err;
tie(z, err) = os::open("/dev/zero");
if (err != nil)
return E(err);
defer([&]() {
z->close();
});
err = mm::map_into(addr, size, prot, MAP_SHARED | MAP_NORESERVE, z, 0);
if (err != nil)
return E(err);
return nil;
}
// mmap_zero_into_ro mmaps read-only zeros into [addr +size) so that region is all zeros.
// created mapping, even after it is accessed, does not consume memory.
static error mmap_zero_into_ro(void *addr, size_t size) {
return mmap_zero_into(addr, size, PROT_READ);
}
// mmap_efault_into changes [addr +size) region to generate SIGSEGV on read/write access.
// Any previous mapping residing in that virtual address range is released.
static error mmap_efault_into(void *addr, size_t size) {
xerr::Contextf E("mmap efault");
etrace("");
// mmaping /dev/zero with PROT_NONE gives what we need.
return E(mmap_zero_into(addr, size, PROT_NONE));
}
// mmap_ro mmaps read-only fd[offset +size).
// The mapping is created with MAP_SHARED.
static tuple<uint8_t*, error> mmap_ro(os::File f, off_t offset, size_t size) {
return mm::map(PROT_READ, MAP_SHARED, f, offset, size);
}
// mmap_into_ro mmaps read-only fd[offset +size) into [addr +size).
// The mapping is created with MAP_SHARED.
static error mmap_into_ro(void *addr, size_t size, os::File f, off_t offset) {
return mm::map_into(addr, size, PROT_READ, MAP_SHARED, f, offset);
}
string WCFS::String() const {
const WCFS& wc = *this;
return fmt::sprintf("wcfs %s", v(wc.mountpoint));
}
// NOTE String must be called with Conn.atMu locked.
string _Conn::String() const {
const _Conn& wconn = *this;
// XXX don't include wcfs as prefix here?
// (e.g. to use Conn.String in tracing without wcfs prefix)
// (if yes -> go and correct all xerr::Contextf calls)
return fmt::sprintf("%s: conn%d @%s", v(wconn._wc), wconn._wlink->fd(), v(wconn._at));
}
string _FileH::String() const {
const _FileH& f = *this;
return fmt::sprintf("f<%s>", v(f.foid));
}
string _Mapping::String() const {
const _Mapping& mmap = *this;
return fmt::sprintf("m[#%ld +%ld) v[%p +%lx)",
mmap.blk_start, mmap.blk_stop() - mmap.blk_start,
mmap.mem_start, mmap.mem_stop - mmap.mem_start);
}
_Conn::_Conn() {}
_Conn::~_Conn() {}
void _Conn::incref() {
object::incref();
}
void _Conn::decref() {
if (__decref())
delete this;
}
_FileH::_FileH() {}
_FileH::~_FileH() {}
void _FileH::decref() {
if (__decref())
delete this;
}
_Mapping::_Mapping() {}
_Mapping::~_Mapping() {}
void _Mapping::decref() {
if (__decref())
delete this;
}
dict<int64_t, zodb::Tid> _tfileh_pinned(FileH fileh) {
return fileh->_pinned;
}
} // wcfs::
......@@ -18,13 +18,65 @@
// See https://www.nexedi.com/licensing for rationale and options.
// Package wcfs provides WCFS client.
//
// This client package takes care about WCFS isolation protocol details and
// provides to clients simple interface to isolated view of bigfile data on
// WCFS similar to regular files: given a particular revision of database @at,
// it provides synthetic read-only bigfile memory mappings with data
// corresponding to @at state, but using /head/bigfile/* most of the time to
// build and maintain the mappings.
//
// For its data a mapping to bigfile X mostly reuses kernel cache for
// /head/bigfile/X with amount of data not associated with kernel cache for
// /head/bigfile/X being proportional to δ(bigfile/X, at..head). In the usual
// case where many client workers simultaneously serve requests, their database
// views are a bit outdated, but close to head, which means that in practice
// the kernel cache for /head/bigfile/* is being used almost 100% of the time.
//
// A mapping for bigfile X@at is built from OS-level memory mappings of
// on-WCFS files as follows:
//
// ___ /@revA/bigfile/X
// __ /@revB/bigfile/X
// _ /@revC/bigfile/X
// + ...
// ─── ───── ────────────────────────── ───── /head/bigfile/X
//
// where @revR mmaps are being dynamically added/removed by this client package
// to maintain X@at data view according to WCFS isolation protocol(*).
//
//
// API overview
//
// - `WCFS` represents filesystem-level connection to wcfs server.
// - `Conn` represents logical connection that provides view of data on wcfs
// filesystem as of particular database state.
// - `FileH` represent isolated file view under Conn.
// - `Mapping` represents one memory mapping of FileH.
//
// A path from WCFS to Mapping is as follows:
//
// WCFS.connect(at) -> Conn
// Conn.open(foid) -> FileH
// FileH.mmap([blk_start +blk_len)) -> Mapping
//
// A connection can be resynced to another database view via Conn.resync(at').
//
// Documentation for classes provides more thorough overview and API details.
//
// --------
//
// (*) see wcfs.go documentation for WCFS isolation protocol overview and details.
#ifndef _NXD_WCFS_H_
#define _NXD_WCFS_H_
#include <golang/libgolang.h>
#include <golang/cxx.h>
#include <golang/sync.h>
#include <tuple>
#include <utility>
#include "wcfs_misc.h"
......@@ -33,10 +85,15 @@
namespace wcfs {
using namespace golang;
using cxx::dict;
using cxx::set;
using std::tuple;
using std::pair;
typedef refptr<struct _Conn> Conn;
typedef refptr<struct _Mapping> Mapping;
typedef refptr<struct _FileH> FileH;
typedef refptr<struct _WatchLink> WatchLink;
struct PinReq;
......@@ -45,20 +102,185 @@ struct PinReq;
//
// Use wcfs.join in Python API to create it.
//
// The primary way to access wcfs is to open logical connection viewing on-wcfs
// data as of particular database state, and use that logical connection to
// create base-layer mappings. See .connect and Conn for details.
//
// WCFS logically mirrors ZODB.DB .
// It is safe to use WCFS from multiple threads simultaneously.
struct WCFS {
string mountpoint;
pair<Conn, error> connect(zodb::Tid at);
pair<WatchLink, error> _openwatch();
string String() const;
error _headWait(zodb::Tid at);
// at OS-level, on-WCFS raw files can be accessed via ._path and ._open.
string _path(const string &obj);
tuple<os::File, error> _open(const string &path, int flags=O_RDONLY);
};
// Conn represents logical connection that provides view of data on wcfs
// filesystem as of particular database state.
//
// It uses /head/bigfile/* and notifications received from /head/watch to
// maintain isolated database view while at the same time sharing most of data
// cache in OS pagecache of /head/bigfile/*.
//
// Use WCFS.connect(at) to create Conn.
// Use .open to create new FileH.
// Use .resync to resync Conn onto different database view.
//
// Conn logically mirrors ZODB.Connection .
// It is safe to use Conn from multiple threads simultaneously.
typedef refptr<struct _Conn> Conn;
struct _Conn : os::_IAfterFork, object {
WCFS *_wc;
WatchLink _wlink; // watch/receive pins for mappings created under this conn
// atMu protects .at.
// While it is rlocked, .at is guaranteed to stay unchanged and Conn
// viewing the database at particular state. .resync write-locks this and
// knows noone is using the connection for reading simultaneously.
sync::RWMutex _atMu;
zodb::Tid _at;
sync::RWMutex _filehMu; // _atMu.W | _atMu.R + _filehMu
error _downErr; // !nil if connection is closed or no longer operational
dict<zodb::Oid, FileH> _filehTab; // {} foid -> fileh
sync::WorkGroup _pinWG; // pin/unpin messages from wcfs are served by _pinner
func<void()> _pinCancel; // spawned under _pinWG.
// don't new - create via WCFS.connect
private:
_Conn();
virtual ~_Conn();
friend pair<Conn, error> WCFS::connect(zodb::Tid at);
public:
void incref();
void decref();
public:
zodb::Tid at();
pair<FileH, error> open(zodb::Oid foid);
error close();
error resync(zodb::Tid at);
string String() const;
private:
error _pinner(context::Context ctx);
error __pinner(context::Context ctx);
error _pin1(PinReq *req);
error __pin1(PinReq *req);
void afterFork();
};
// FileH represent isolated file view under Conn.
//
// The file view is maintained to be as of @Conn.at database state even in the
// presence of simultaneous database changes. The file view uses
// /head/<file>/data primarily and /@revX/<file>/data pin overrides.
//
// Use .mmap to map file view into memory.
//
// It is safe to use FileH from multiple threads simultaneously.
enum _FileHState {
// NOTE order of states is semantically important
_FileHOpening = 0, // FileH open is in progress
_FileHOpened = 1, // FileH is opened and can be used
_FileHClosing = 2, // FileH close is in progress
_FileHClosed = 3, // FileH is closed
};
typedef refptr<struct _FileH> FileH;
struct _FileH : object {
Conn wconn;
zodb::Oid foid; // ZBigFile root object ID (does not change after fileh open)
// protected by wconn._filehMu
_FileHState _state; // opening/opened/closing/closed
int _nopen; // number of times Conn.open returned this fileh
chan<structZ> _openReady; // in-flight open completed
error _openErr; // error result from open
chan<structZ> _closedq; // in-flight close completed
os::File _headf; // file object of head/file
size_t blksize; // block size of this file (does not change after fileh open)
// head/file size is known to be at least headfsize (size ↑=)
// protected by .wconn._atMu
off_t _headfsize;
sync::Mutex _mmapMu; // atMu.W | atMu.R + _mmapMu
dict<int64_t, zodb::Tid> _pinned; // {} blk -> rev that wcfs already sent us for this file
vector<Mapping> _mmaps; // []Mapping ↑blk_start mappings of this file
// don't new - create via Conn.open
private:
_FileH();
~_FileH();
friend pair<FileH, error> _Conn::open(zodb::Oid foid);
public:
void decref();
public:
error close();
pair<Mapping, error> mmap(int64_t blk_start, int64_t blk_len);
string String() const;
error _open();
error _closeLocked(bool force);
void _afterFork();
};
// Mapping represents one memory mapping of FileH.
//
// The mapped memory is [.mem_start, .mem_stop)
// Use .unmap to release virtual memory resources used by mapping.
//
// Except unmap, it is safe to use Mapping from multiple threads simultaneously.
typedef refptr<struct _Mapping> Mapping;
struct _Mapping : object {
FileH fileh;
int64_t blk_start; // offset of this mapping in file
// protected by fileh._mmapMu
uint8_t *mem_start; // mmapped memory [mem_start, mem_stop)
uint8_t *mem_stop;
bool efaulted; // y after mapping was switched to be invalid (gives SIGSEGV on access)
int64_t blk_stop() const {
if (!((mem_stop - mem_start) % fileh->blksize == 0))
panic("len(mmap) % fileh.blksize != 0");
return blk_start + (mem_stop - mem_start) / fileh->blksize;
}
error unmap();
error _remmapblk(int64_t blk, zodb::Tid at);
error __remmapAsEfault();
error __remmapBlkAsEfault(int64_t blk);
// don't new - create via FileH.mmap
private:
_Mapping();
~_Mapping();
friend pair<Mapping, error> _FileH::mmap(int64_t blk_start, int64_t blk_len);
public:
void decref();
string String() const;
};
// for testing
dict<int64_t, zodb::Tid> _tfileh_pinned(FileH fileh);
} // wcfs::
......
......@@ -23,6 +23,7 @@
#include <golang/errors.h>
#include <golang/fmt.h>
#include <golang/io.h>
#include <golang/sync.h>
using namespace golang;
#include <inttypes.h>
......@@ -30,6 +31,7 @@ using namespace golang;
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <algorithm>
#include <memory>
......@@ -40,6 +42,9 @@ namespace golang {
// os::
namespace os {
global<error> ErrClosed = errors::New("file already closed");
// TODO -> os.PathError + err=syscall.Errno
static error _pathError(const char *op, const string &path, int syserr);
static string _sysErrString(int syserr);
......@@ -131,6 +136,59 @@ static error _pathError(const char *op, const string &path, int syserr) {
}
// afterfork
static sync::Mutex _afterForkMu;
static bool _afterForkInit;
static vector<IAfterFork> _afterForkList;
// _runAfterFork runs handlers registered by RegisterAfterFork.
static void _runAfterFork() {
// we were just forked: This is child process and there is only 1 thread.
// The state of memory was copied from parent.
// There is no other mutators except us.
// -> go through _afterForkList *without* locking.
for (auto obj : _afterForkList) {
obj->afterFork();
}
// reset _afterFork* state because child could want to fork again
new (&_afterForkMu) sync::Mutex;
_afterForkInit = false;
_afterForkList.clear();
}
void RegisterAfterFork(IAfterFork obj) {
_afterForkMu.lock();
defer([&]() {
_afterForkMu.unlock();
});
if (!_afterForkInit) {
int e = pthread_atfork(/*prepare=*/nil, /*parent=*/nil, /*child=*/_runAfterFork);
if (e != 0) {
string estr = fmt::sprintf("pthread_atfork: %s", v(_sysErrString(e)));
panic(v(estr));
}
_afterForkInit = true;
}
_afterForkList.push_back(obj);
}
void UnregisterAfterFork(IAfterFork obj) {
_afterForkMu.lock();
defer([&]() {
_afterForkMu.unlock();
});
// _afterForkList.remove(obj)
_afterForkList.erase(
std::remove(_afterForkList.begin(), _afterForkList.end(), obj),
_afterForkList.end());
}
// _sysErrString returns string corresponding to system error syserr.
static string _sysErrString(int syserr) {
char ebuf[128];
......@@ -141,6 +199,88 @@ static string _sysErrString(int syserr) {
} // os::
// mm::
namespace mm {
// map memory-maps f.fd[offset +size) somewhere into memory.
// prot is PROT_* from mmap(2).
// flags is MAP_* from mmap(2); MAP_FIXED must not be used.
tuple<uint8_t*, error> map(int prot, int flags, os::File f, off_t offset, size_t size) {
void *addr;
if (flags & MAP_FIXED)
panic("MAP_FIXED not allowed for map - use map_into");
addr = ::mmap(nil, size, prot, flags, f->fd(), offset);
if (addr == MAP_FAILED)
return make_tuple(nil, os::_pathError("mmap", f->name(), errno));
return make_tuple((uint8_t*)addr, nil);
}
// map_into memory-maps f.fd[offset +size) into [addr +size).
// prot is PROT_* from mmap(2).
// flags is MAP_* from mmap(2); MAP_FIXED is added automatically.
error map_into(void *addr, size_t size, int prot, int flags, os::File f, off_t offset) {
void *addr2;
addr2 = ::mmap(addr, size, prot, MAP_FIXED | flags, f->fd(), offset);
if (addr2 == MAP_FAILED)
return os::_pathError("mmap", f->name(), errno);
if (addr2 != addr)
panic("mmap(addr, MAP_FIXED): returned !addr");
return nil;
}
// unmap unmaps [addr +size) memory previously mapped with map & co.
error unmap(void *addr, size_t size) {
int err = ::munmap(addr, size);
if (err != 0)
return os::_pathError("munmap", "<memory>", errno);
return nil;
}
} // mm::
// io::ioutil::
namespace io {
namespace ioutil {
tuple<string, error> ReadFile(const string& path) {
// errctx is ok as returned by all calls.
os::File f;
error err;
tie(f, err) = os::open(path);
if (err != nil)
return make_tuple("", err);
string data;
vector<char> buf(4096);
while (1) {
int n;
tie(n, err) = f->read(&buf[0], buf.size());
data.append(&buf[0], n);
if (err != nil) {
if (err == io::EOF_)
err = nil;
break;
}
}
error err2 = f->close();
if (err == nil)
err = err2;
if (err != nil)
data = "";
return make_tuple(data, err);
}
}} // io::ioutil::
// xstrconv:: (strconv-like)
namespace xstrconv {
......
......@@ -61,6 +61,9 @@ namespace golang {
// os::
namespace os {
extern global<error> ErrClosed;
// os::File mimics os.File from Go.
// its operations return error with full file context.
typedef refptr<class _File> File;
......@@ -104,8 +107,43 @@ tuple<File, error> open(const string &path, int flags = O_RDONLY,
S_IRGRP | S_IWGRP | S_IXGRP |
S_IROTH | S_IWOTH | S_IXOTH);
// afterfork
// IAfterFork is the interface that objects must implement to be notified after fork.
typedef refptr<struct _IAfterFork> IAfterFork;
struct _IAfterFork : public _interface {
// afterFork is called in just forked child process for objects that
// were previously registered in parent via RegisterAfterFork.
virtual void afterFork() = 0;
};
// RegisterAfterFork registers obj so that obj.afterFork is run after fork in
// the child process.
void RegisterAfterFork(IAfterFork obj);
// UnregisterAfterFork undoes RegisterAfterFork.
// It is noop if obj was not registered.
void UnregisterAfterFork(IAfterFork obj);
} // os::
// mm::
namespace mm {
tuple<uint8_t*, error> map(int prot, int flags, os::File f, off_t offset, size_t size);
error map_into(void *addr, size_t size, int prot, int flags, os::File f, off_t offset);
error unmap(void *addr, size_t size);
} // mm::
// io::ioutil::
namespace io {
namespace ioutil {
tuple<string, error> ReadFile(const string& path);
}} // io::ioutil::
// ---- misc ----
......
......@@ -63,6 +63,10 @@ pair<WatchLink, error> WCFS::_openwatch() {
wlink->rx_eof = makechan<structZ>();
os::RegisterAfterFork(newref(
static_cast<os::_IAfterFork*>( wlink._ptr() )
));
context::Context serveCtx;
tie(serveCtx, wlink->_serveCancel) = context::with_cancel(context::background());
wlink->_serveWG = sync::NewWorkGroup(serveCtx);
......@@ -96,9 +100,24 @@ error _WatchLink::close() {
if (err == nil)
err = err3;
os::UnregisterAfterFork(newref(
static_cast<os::_IAfterFork*>( &wlink )
));
return E(err);
}
// afterFork detaches from wcfs in child process right after fork.
void _WatchLink::afterFork() {
_WatchLink& wlink = *this;
// in child right after fork we are the only thread to run; in particular
// _serveRX is not running. Just release the file handle, that fork
// duplicated, to make sure that child cannot send anything to wcfs and
// interfere into parent-wcfs exchange.
wlink._f->close(); // ignore err
}
// closeWrite closes send half of the link.
error _WatchLink::closeWrite() {
_WatchLink& wlink = *this;
......
......@@ -70,7 +70,7 @@ static_assert(sizeof(rxPkt) == 256, "rxPkt miscompiled"); // NOTE 128 is too low
//
// It is safe to use WatchLink from multiple threads simultaneously.
typedef refptr<class _WatchLink> WatchLink;
class _WatchLink : public object {
class _WatchLink : public os::_IAfterFork, object {
WCFS *_wc;
os::File _f; // head/watch file handle
string _rxbuf; // buffer for data already read from _f
......@@ -123,6 +123,8 @@ private:
StreamID _nextReqID();
tuple<chan<rxPkt>, error> _sendReq(context::Context ctx, StreamID stream, const string &req);
void afterFork();
friend error _twlinkwrite(WatchLink wlink, const string &pkt);
};
......
......@@ -167,6 +167,21 @@ def unmap(const unsigned char[::1] mem not None):
return
# map_zero_ro creats new read-only mmaping that all reads as zero.
# created mapping, even after it is accessed, does not consume memory.
def map_zero_ro(size_t size):
cdef void *addr
# mmap /dev/zero with MAP_NORESERVE and MAP_SHARED
# this way the mapping will be able to be read, but no memory will be allocated to keep it.
f = open("/dev/zero", "rb")
addr = mman.mmap(NULL, size, mman.PROT_READ, mman.MAP_SHARED | mman.MAP_NORESERVE, f.fileno(), 0)
f.close()
if addr == mman.MAP_FAILED:
PyErr_SetFromErrno(OSError)
return
return <unsigned char[:size:1]>addr
# advise advises kernel about use of mem's memory.
#
......@@ -180,3 +195,17 @@ def advise(const unsigned char[::1] mem not None, int advice):
PyErr_SetFromErrno(OSError)
return
# protect sets protection on a region of memory.
#
# see mprotect(2) for details.
def protect(const unsigned char[::1] mem not None, int prot):
cdef const void *addr = &mem[0]
cdef size_t size = mem.shape[0]
cdef err = mman.mprotect(<void *>addr, size, prot)
if err:
PyErr_SetFromErrno(OSError)
return
......@@ -143,6 +143,16 @@ cdef unsigned char _read_exfault(const unsigned char *p) nogil except +topyexc:
return b
def read_mustfault(const unsigned char[::1] mem not None):
try:
read_exfault_nogil(mem)
except SegmentationFault:
# ok
pass
else:
raise AssertionError("not faulted")
# --------
......
......@@ -31,12 +31,18 @@
// head/bigfile/<bigfileX> which represents always latest bigfile data.
// Clients that want to get isolation guarantee should subscribe for
// invalidations and re-mmap invalidated regions to file with pinned bigfile revision for
// the duration of their transaction. See "Isolation protocol" for details.
// the duration of their transaction. See "Isolation protocol" for details(*).
//
// In the usual situation when bigfiles are big, and there are O(1)/δt updates,
// there should be no need for any cache besides shared kernel cache of latest
// bigfile data.
//
// --------
//
// (*) wcfs servers comes accompanied by Python and C++ client packages that
// take care about isolation protocol details and provide to clients simple
// interface similar to regular files.
//
//
// Filesystem organization
//
......
......@@ -18,6 +18,9 @@
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""wcfs_test.py tests wcfs filesystem from outside as python client process.
Virtmem layer provided by wcfs client package is unit-tested by
wcfs/client/client_test.py .
"""
from __future__ import print_function, absolute_import
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment