// Copyright (C) 2018-2019  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

// Program wcfs provides filesystem server with file data backed by wendelin.core arrays.
//
// Intro
//
// Each wendelin.core array (ZBigArray) is actually a linear file (ZBigFile)
// and array metadata like dtype, shape and strides associated with it. This
// program exposes as files only ZBigFile data and leaves rest of
// array-specific handling to clients. Every ZBigFile is exposed as one separate
// file that represents whole ZBigFile's data.
//
// For a client, the primary way to access a bigfile should be to mmap
// head/bigfile/<bigfileX> which represents always latest bigfile data.
// Clients that want to get isolation guarantee should subscribe for
// invalidations and re-mmap invalidated regions to file with pinned bigfile revision for
// the duration of their transaction. See "Invalidation protocol" for details.
//
// In the usual situation when bigfiles are big, and there are O(1)/δt updates,
// there should be no need for any cache besides shared kernel cache of latest
// bigfile data.
//
//
// Filesystem organization
//
// Top-level structure of provided filesystem is as follows:
//
//	head/			; latest database view
//		...
//	@<rev1>/		; database view as of revision <revX>
//		...
//	@<rev2>/
//		...
//	...
//
// where head/ represents latest data as stored in upstream ZODB, and
// @<revX>/ represents data as of database revision <revX>.
//
// head/ has the following structure:
//
//	head/
//		at			; data inside head/ is as of this ZODB transaction
//		watch			; channel for bigfile invalidations
//		bigfile/		; bigfiles' data
//			<oid(ZBigFile1)>
//			<oid(ZBigFile2)>
//			...
//
// where /bigfile/<bigfileX> represents latest bigfile data as stored in
// upstream ZODB. As there can be some lag receiving updates from the database,
// /at describes precisely ZODB state for which bigfile data is currently
// exposed. Whenever bigfile data is changed in upstream ZODB, information
// about the changes is first propagated to /watch, and only after that
// /bigfile/<bigfileX> is updated. See "Invalidation protocol" for details.
//
// @<revX>/ has the following structure:
//
//	@<revX>/
//		at
//		bigfile/		; bigfiles' data as of revision <revX>
//			<oid(ZBigFile1)>
//			<oid(ZBigFile2)>
//			...
//
// where /bigfile/<bigfileX> represent bigfile data as of revision <revX>.
//
// Unless accessed {head,@<revX>}/bigfile/<bigfileX> are not automatically visible in
// wcfs filesystem. Similarly @<revX>/ become visible only after access.
//
//
// Invalidation protocol
//
// In order to support isolation, wcfs implements invalidation protocol that
// must be cooperatively followed by both wcfs and client.
//
// First, client mmaps latest bigfile, but does not access it
//
//	mmap(head/bigfile/<bigfileX>)
//
// Then client opens head/watch and tells wcfs through it for which ZODB state
// it wants to get bigfile's view.
//
//	C: 1 watch <bigfileX> @<at>
//
// The server then, after potentially sending initial pin messages (see below),
// reports either success or failure:
//
//	S: 1 ok
//	S: 1 error ...		; if <at> is too far away back from head/at
//
// The server sends "ok" reply only after head/at is ≥ requested <at>, and
// only after all initial pin messages are fully acknowledged by the client.
// The client can start to use mmapped data after it gets "ok".
// The server sends "error" reply if requested <at> is too far away back from
// head/at.
// XXX other errors are possible (e.g. "no such file", or error handling pin).
// XXX error handling pin -> then client is killed?
// XXX if not - specify that watch state is lost after error.
//
// Upon watch request, either initially, or after sending "ok", the server will be notifying the
// client about file blocks that client needs to pin in order to observe file's
// data as of <at> revision:
//
// The filesystem server itself receives information about changed data from
// ZODB server through regular ZODB invalidation channel (as it is ZODB client
// itself). Then, separately for each changed file block, before actually
// updating head/bigfile/<bigfileX> content, it notifies through opened
// head/watch links to clients, that had requested it (separately to each
// client), about the changes:
//
//	S: <2·k> pin <bigfileX> #<blk> @<rev_max>
//	XXX @head means unpin.
//	XXX or use `unpin <bigfileX> #<blk>` ?
//
// and waits until all clients confirm that changed file block can be updated
// in global OS cache.
//
// The client in turn should now re-mmap requested to be pinned block to bigfile@<rev_max>
//
//	# mmapped at address corresponding to #blk
//	mmap(@<rev_max>/bigfile/<bigfileX>, #blk, MAP_FIXED)
//
// and must send ack back to the server when it is done:
//
//	C: <2·k> ack
//
// The server sends pin notifications only for file blocks, that are known to
// be potentially changed after client's <at>, and <rev_max> describes the
// upper bound for the block revision as of <at> database view:
//
//	<rev_max> ≤ <at>	; block stays unchanged in (<rev_max>, <at>] range
//
// The server maintains short history tail of file changes to be able to
// support openings with <at> being slightly in the past compared to current
// head/at. The server might reject a watch request if <at> is too far away in
// the past from head/at. The client is advised to restart its transaction with
// more uptodate database view if it gets watch setup error.
//
// A later request from the client for the same <bigfileX> but with different
// <at>, overrides previous watch request for that file. A client can use "-"
// instead of "@<at>" to stop watching a file.
//
// A single client can send several watch requests through single head/watch
// open, as well as it can use several head/watch opens simultaneously.
// The server sends pin notifications for all files requested to be watched via
// every head/watch open.
//
// Note: a client could use a single watch to manage its several views for the same
// file but with different <at>. This could be achieved via watching with
// @<at_min>, and then deciding internally which views needs to be adjusted and
// which views need not. Wcfs does not oblige clients to do so though, and a
// client is free to use as many head/watch openings as it needs to.
//
// When clients are done with @<revX>/bigfile/<bigfileX> (i.e. client's
// transaction ends and array is unmapped), the server sees number of opened
// files to @<revX>/bigfile/<bigfileX> drops to zero, and automatically
// destroys @<revX>/bigfile/<bigfileX> after reasonable timeout.
//
//
// Protection against slow or faulty clients
//
// If a client, on purpose or due to a bug or being stopped, is slow to respond
// with ack to file invalidation notification, it creates a problem because the
// server will become blocked waiting for pin acknowledgments, and thus all
// other clients, that try to work with the same file, will get stuck.
//
// The problem could be avoided, if wcfs would reside inside OS kernel and this
// way could be able to manipulate clients address space directly (then
// invalidation protocol won't be needed). It is also possible to imagine
// mechanism, where wcfs would synchronously change clients' address space via
// injecting trusted code and running it on client side via ptrace to adjust
// file mappings.
//
// However ptrace does not work when client thread is blocked under pagefault,
// and that is exactly what wcfs would need to do to process invalidations
// lazily, because eager invalidation processing results in prohibitively slow
// file opens. See internal wcfs overview for details about why ptrace
// cannot be used and why lazy invalidation processing is required.
//
// Lacking OS primitives to change address space of another process and not
// being able to work it around with ptrace in userspace, wcfs takes approach
// to kill a slow client on 30 seconds timeout by default.
//
//
// Writes
//
// As each bigfile is represented by 1 synthetic file, there can be several
// write schemes:
//
// 1. mmap(MAP_PRIVATE) + writeout by client
//
// In this scheme bigfile data is mmapped in MAP_PRIVATE mode, so that local
// user changes are not automatically propagated back to the file. When there
// is a need to commit, client investigates via some OS mechanism, e.g.
// /proc/self/pagemap or something similar, which pages of this mapping it
// modified. Knowing this it knows which data it dirtied and so can write this
// data back to ZODB itself, without filesystem server providing write support.
//
// 2. mmap(MAP_SHARED, PROT_READ) + write-tracking & writeout by client
//
// In this scheme bigfile data is mmaped in MAP_SHARED mode with read-only pages
// protection. Then whenever write fault occurs, client allocates RAM from
// shmfs, copies faulted page to it, and then mmaps RAM page with RW protection
// in place of original bigfile page. Writeout implementation should be similar
// to "1", only here client already knows the pages it dirtied, and this way
// there is no need to consult /proc/self/pagemap.
//
// The advantage of this scheme over mmap(MAP_PRIVATE) is that in case
// there are several in-process mappings of the same bigfile with overlapping
// in-file ranges, changes in one mapping will be visible in another mapping.
// Contrary: whenever a MAP_PRIVATE mapping is modified, the kernel COWs
// faulted page into a page completely private to this mapping, so that other
// MAP_PRIVATE mappings of this file, including ones created from the same
// process, do not see changes made to the first mapping.
//
// Since wendelin.core needs to provide coherency in between different slices
// of the same array, this is the mode wendelin.core actually uses.
//
// 3. write to wcfs
//
// TODO we later could implement "write-directly" mode where clients would write
// data directly into the file.
package main

// Wcfs organization
//
// Wcfs is a ZODB client that translates ZODB objects into OS files as would
// non-wcfs wendelin.core do for a ZBigFile. Contrary to non-wcfs wendelin.core,
// it keeps bigfile data in shared OS cache efficiently. It is organized as follows:
//
// 1) 1 ZODB connection for "latest data" for whole filesystem (zhead).
// 2) head/bigfile/* of all bigfiles represent state as of zhead.At .
// 3) for head/bigfile/* the following invariant is maintained:
//
//	#blk ∈ OS file cache    =>    ZBlk(#blk) + all BTree/Bucket that lead to it  ∈ zhead cache(%)
//	                              (ZBlk* in ghost state)
//
//	                        =>    all BTree/Bucket that lead to blk are tracked (XXX)
//
//    The invariant helps on invalidation: if we see a changed oid, and
//    zhead.cache.lookup(oid) = ø -> we know we don't have to invalidate OS
//    cache for any part of any file (even if oid relates to a file block - that
//    block is not cached and will trigger ZODB load on file read).
//
//    XXX explain why tracked
//
//    Currently we maintain this invariant by simply never evicting ZBlk/LOBTree/LOBucket
//    objects from ZODB Connection cache. In the future we may want to try to
//    synchronize to kernel freeing its pagecache pages.
//
// 4) when we receive an invalidation message from ZODB - we process it and
//    propagate invalidations to OS file cache of head/bigfile/*:
//
//	invalidation message: (tid↑, []oid)
//
//    4.1) zhead.cache.lookup(oid)			XXX -> δFtail
//    4.2) ø: nothing to do - see invariant ^^^.
//    4.3) obj found:
//
//	- ZBlk*		-> [] of file/[]#blk
//	- BTree/Bucket	-> δ(BTree)  -> file/[]#blk
//
//	in the end after processing all []oid from invalidation message we have
//
//	  [] of file/[]#blk
//
//	that describes which file(s) parts needs to be invalidated.
//
//	FIXME no - we can build it but not in full - since we consider only zobj in live cache.
//	FIXME and even if we consider all δ'ed zobj, building complete set of
//	      file.δtail requires to first do complete scan of file.blktab
//	      which is prohibitively expensive. XXX -> no, we'll do the scan
//
//    4.4) for all file/blk to invalidate we do:
//
//	- try to retrieve head/bigfile/file[blk] from OS file cache(*);
//	- if retrieved successfully -> store retrieved data back into OS file
//	  cache for @<rev>/bigfile/file[blk], where
//
//	    # see below about file.δtail
//	    # XXX -> file.BlkRevAt(#blk, zhead.at)
//	    rev = max(file.δtail.by(#blk)) || min(rev ∈ file.δtail) || zhead.at
//
//	- invalidate head/bigfile/file[blk] in OS file cache.
//
//	This preserves previous data in OS file cache in case it will be needed
//	by not-yet-uptodate clients, and makes sure file read of head/bigfile/file[blk]
//	won't be served from OS file cache and instead will trigger a FUSE read
//	request to wcfs.
//
//    4.5) no invalidation messages are sent to wcfs clients at this point(+).
//
//    4.6) processing ZODB invalidations and serving file reads (see 7) are
//      organized to be mutually exclusive.
//
// 5) after OS file cache was invalidated, we resync zhead to new database
//    view corresponding to tid.
//
// 6) for every file δtail invalidation info about head/data is maintained:	XXX -> δFtail
//
//	- tailv: [](rev↑, []#blk)
//	- by:    {} #blk -> []rev↑ in tail
//
//    δtail.tail describes invalidations to file we learned from ZODB invalidation.
//    δtail.by   allows to quickly lookup information by #blk.
//
//    min(rev) in δtail is min(@at) at which head/bigfile/file is currently watched (see below).
//
//    XXX δtail can miss ...
//
//    to support initial openings with @at being slightly in the past, we also
//    make sure that min(rev) is enough to cover last 10 minutes of history
//    from head/at.
//
// 7) when we receive a FUSE read(#blk) request to a head/bigfile/file, we process it as follows:
//
//   7.1) load blkdata for head/bigfile/file[blk] @zhead.at .
//
//	while loading this also gives upper bound estimate of when the block
//	was last changed:
//
//	  rev(blk) ≤ max(_.serial for _ in (ZBlk(#blk), all BTree/Bucket that lead to ZBlk))
//
//	it is not exact because BTree/Bucket can change (e.g. rebalance)
//	but still point to the same k->ZBlk.
//
//	we also use file.δtail to find either exact blk revision:	XXX δFtail
//
//	  rev(blk) = max(file.δtail.by(#blk) -> []rev↑)
//
//	or another upper bound if #blk ∉ δtail:
//
//	  rev(blk) ≤ min(rev ∈ δtail)		; #blk ∉ δtail
//
//
//	below rev'(blk) is min(of the estimates found):
//
//	  rev(blk) ≤ rev'(blk)		rev'(blk) = min(^^^)
//
//
//	XXX we delay recomputing δFtail.LastBlkRev(file, #blk, head) because
//	using just cheap revmax estimate can frequently result in all watches
//	being skipped.
//
//   7.2) for all registered client@at watches of head/bigfile/file:
//
//	- rev'(blk) ≤ at: -> do nothing
//	- rev'(blk) > at:
//	  - if blk ∈ watch.pinned -> do nothing
//	  - rev = max(δtail.by(#blk) : _ ≤ at)	|| min(rev ∈ δtail : rev ≤ at)	|| at
//	  - watch.pin(file, #blk, @rev)
//	  - watch.pinned += blk
//
//	where
//
//	  watch.pin(file, #blk, @rev)
//
//	sends pin message according to "Invalidation protocol", and is assumed
//	to cause
//
//	  remmap(file, #blk, @rev/bigfile/file)
//
//	on client.
//
//	( one could imagine adjusting mappings synchronously via running
//	  wcfs-trusted code via ptrace that wcfs injects into clients, but ptrace
//	  won't work when client thread is blocked under pagefault or syscall(^) )
//
//	in order to support watching for each head/bigfile/file
//
//	  [] of watch{client@at↑, pinned}
//
//	is maintained.
//
//   7.3) blkdata is returned to kernel.
//
//   Thus a client that wants latest data on pagefault will get latest data,
//   and a client that wants @rev data will get @rev data, even if it was this
//   "old" client that triggered the pagefault(~).
//
// XXX 8) serving read from @<rev>/data + zconn(s) for historical state
// XXX 9) gc @rev/ and @rev/bigfile/<bigfileX> automatically on atime timeout
//
//
// (*) see notes.txt -> "Notes on OS pagecache control"
// (+) see notes.txt -> "Invalidations to wcfs clients are delayed until block access"
// (~) see notes.txt -> "Changing mmapping while under pagefault is possible"
// (^) see notes.txt -> "Client cannot be ptraced while under pagefault"
// (%) no need to keep track of ZData - ZBlk1 is always marked as changed on blk data change.
//
// XXX For every ZODB connection a dedicated read-only transaction is maintained.

// XXX notation
//
// δZ    - change in ZODB space
// δB    - change in BTree*s* space
// δT    - change in BTree(1) space
// δF    - change in File*s* space
// δfile - change in File(1) space

// XXX locking
//
// head.zheadMu		WLock by handleδZ
//			RLock by read
// ...
//
// zheadMu.W  |  zheadMu.R + BigFileDir.fileMu
//
// WatchLink.byfileMu	> Watch.mu
// BigFile.watchMu      > Watch.mu

import (
	"bufio"
	"context"
	"flag"
	"fmt"
	"io"
	stdlog "log"
	"os"
	"runtime"
	"sort"
	"strings"
	"sync"
	"sync/atomic"
	"syscall"
//	"time"

	log "github.com/golang/glog"
	"golang.org/x/sync/errgroup"

	"lab.nexedi.com/kirr/go123/xcontext"
	"lab.nexedi.com/kirr/go123/xerr"

	"lab.nexedi.com/kirr/neo/go/transaction"
	"lab.nexedi.com/kirr/neo/go/zodb"
	"lab.nexedi.com/kirr/neo/go/zodb/btree"
	_ "lab.nexedi.com/kirr/neo/go/zodb/wks"

	"github.com/hanwen/go-fuse/fuse"
	"github.com/hanwen/go-fuse/fuse/nodefs"
	"github.com/pkg/errors"
)

// Root represents root of wcfs filesystem.
type Root struct {
	fsNode

	// ZODB storage we work with
	zstor zodb.IStorage

	// ZODB DB handle for zstor.
	// keeps cache of connections for @<rev>/ accesses.
	// only one connection is used for each @<rev>.
	zdb *zodb.DB

	// directory + ZODB connection for head/
	// (zhead is Resync'ed and is kept outside zdb pool)
	head *Head

	// directories + ZODB connections for @<rev>/
	revMu  sync.Mutex
	revTab map[zodb.Tid]*Head
}

// /(head|<rev>)/			- served by Head.
type Head struct {
	fsNode

	rev   zodb.Tid    // 0 for head/, !0 for @<rev>/
	bfdir *BigFileDir // bigfile/
	// at    - served by .readAt
	// watch - implicitly linked to by fs

	// ZODB connection for everything under this head

	// zheadMu protects zconn.At & live _objects_ associated with it.
	// while it is rlocked zconn is guaranteed to stay viewing database at
	// particular view.
	//
	// zwatcher write-locks this and knows noone is using ZODB objects and
	// noone mutates OS file cache while zwatcher is running.
	//
	// it is also kept rlocked by OS cache uploaders (see BigFile.uploadBlk)
	// with additional locking protocol to avoid deadlocks (see below for
	// pauseOSCacheUpload + ...).
	zheadMu sync.RWMutex
	zconn   *ZConn       // for head/ zwatcher resyncs head.zconn; others only read zconn objects.

	// zwatcher signals to uploadBlk to pause/continue uploads to OS cache to avoid deadlocks.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
	pauseOSCacheUpload    bool
	continueOSCacheUpload chan struct{}
	// uploadBlk signals to zwatcher that there are so many inflight OS cache uploads currently.
	inflightOSCacheUploads int32

	// head/watch opens
	wlinkMu  sync.Mutex
	wlinkTab map[*WatchLink]struct{}

	// waiters for zhead.At to become ≥ their at.
	hwaitMu sync.Mutex           // zheadMu.W  |  zheadMu.R + hwaitMu
	hwait   map[hwaiter]struct{} // set{(at, ready)}
}

// /(head|<rev>)/bigfile/		- served by BigFileDir.
type BigFileDir struct {
	fsNode
	head *Head // parent head/ or @<rev>/

	// {} oid -> <bigfileX>
	fileMu  sync.Mutex		// zheadMu.W  |  zheadMu.R + fileMu
	fileTab map[zodb.Oid]*BigFile

	// δ tail of tracked BTree nodes of all BigFiles + -> which file
	// (used only for head/, not revX/)
	δFmu   sync.RWMutex		// zheadMu.W  |  zheadMu.R + δFmu.X
	δFtail *ΔFtail
}

// /(head|<rev>)/bigfile/<bigfileX>	- served by BigFile.
type BigFile struct {
	fsNode

	// this BigFile is under .head/bigfile/; it views ZODB via .head.zconn
	// parent's BigFileDir.head is the same.
	head	*Head

	// ZBigFile top-level object
	zfile	*ZBigFile

	// things read/computed from .zfile; constant during lifetime of current transaction.
	// i.e. changed under zhead.W
	blksize int64    // zfile.blksize
	size    int64    // zfile.Size()
	rev     zodb.Tid // last revision that modified zfile data
			 // XXX we can't know rev fully as some later blocks could be learnt only
			 //     while populating δFtail lazily
			 // XXX or then it is not "constant during lifetime of current txn"

//	// tail change history of this file.
//	//
//	// XXX computationally expensive to start - see "Invalidations to wcfs
//	// clients are delayed ..." in notes.txt
//	δtail *ΔTailI64 // [](rev↑, []#blk)

	// inflight loadings of ZBigFile from ZODB.
	// successful load results are kept here until blkdata is put into OS pagecache.
	//
	// Being a staging area for data to enter OS cache, loading has to be
	// consulted/invalidated whenever wcfs logic needs to consult/invalidate OS cache.
	loadMu  sync.Mutex              // zheadMu.W  |  zheadMu.R + loadMu
	loading map[int64]*blkLoadState // #blk -> {... blkdata}

	// watches attached to this file.
	//
	// both watches in already "established" state (i.e. initial watch
	// request was completed and answered with "ok"), and watches in
	// progress of being established are kept here.
	watchMu  sync.Mutex		// XXX use
	watchTab map[*Watch]struct{}
}

// blkLoadState represents a ZBlk load state/result.
//
// when !ready the loading is in progress.
// when ready  the loading has been completed.
type blkLoadState struct {
	ready chan struct{}

	blkdata  []byte
	err      error
}

// /head/watch				- served by WatchNode.
type WatchNode struct {
	fsNode

	head   *Head // parent head/
	idNext int32 // ID for next opened WatchLink
}

// /head/watch handle			- served by WatchLink.
type WatchLink struct {
	sk   *FileSock // IO channel to client
	id   int32     // ID of this /head/watch handle (for debug log)
	head *Head

	// watches associated with this watch link.
	//
	// both already established, and watches being initialized in-progress are registered here.
	// (see setupWatch)
	// XXX byfile -> wlink-global watchMu ?
	byfileMu sync.Mutex          // zheadMu.W  |  zheadMu.R + byfileMu	(XXX recheck)
	byfile   map[zodb.Oid]*Watch // {} foid -> Watch

	// IO
	reqNext uint64 // stream ID for next wcfs-originated request; 0 is reserved for control messages
	txMu    sync.Mutex
	rxMu    sync.Mutex
	rxTab   map[/*stream*/uint64]chan string // client replies go via here
}

// Watch represents watching for changes to 1 BigFile over particular watch link.
type Watch struct {
	link *WatchLink // link to client
	file *BigFile	// XXX needed?

	// atMu, similarly to zheadMu, protects watch.at and pins associated with Watch.
	// atMu.R guarantees that watch.at is not changing, but multiple
	//        simultaneous pins could be running (used e.g. by readPinWatchers).
	// atMu.W guaraneees that only one user has watch.at write access and
	//        that no pins are running (used by setupWatch).
	atMu sync.RWMutex
	at   zodb.Tid               // requested to be watched @at

	pinnedMu sync.Mutex             // atMu.W  |  atMu.R + pinnedMu
	pinned   map[int64]*blkPinState // {} blk -> {... rev}  blocks that are already pinned to be ≤ at
}

// blkPinState represents state/result of pinning one block.
//
// when !ready the pinning is in progress.
// when ready  the pinning has been completed.
type blkPinState struct {
	rev    zodb.Tid // revision to which the block is being or has been pinned

	ready  chan struct{}
	err    error
}

// -------- 3) Cache invariant --------

// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
// LOBTree/LOBucket from live cache. We want to keep LOBTree/LOBucket always alive
// because it is essentially the index where to find ZBigFile data.
//
// For the data itself - we put it to kernel pagecache and always deactivate
// from ZODB right after that.
//
// See "3) for */head/data the following invariant is maintained..."
type zodbCacheControl struct {}

func (_ *zodbCacheControl) PCacheClassify(obj zodb.IPersistent) zodb.PCachePolicy {
	switch obj.(type) {
	// ZBlk* should be in cache but without data
	case *ZBlk0:
		return zodb.PCachePinObject | zodb.PCacheDropState
	case *ZBlk1:
		return zodb.PCachePinObject | zodb.PCacheDropState

	// ZBigFile btree index should be in cache with data
	case *btree.LOBTree:
		return zodb.PCachePinObject | zodb.PCacheKeepState
	case *btree.LOBucket:
		return zodb.PCachePinObject | zodb.PCacheKeepState

	// don't let ZData to pollute the cache
	case *ZData:
		return zodb.PCacheDropObject | zodb.PCacheDropState

	// for performance reason we also keep ZBigFile in cache.
	//
	// ZBigFile is top-level object that is used on every block load, and
	// it would be a waste to evict ZBigFile from cache.
	case *ZBigFile:
		return zodb.PCachePinObject | zodb.PCacheKeepState
	}

	return 0
}

/*
// -------- zhead lock/wait --------

// XXX needed?
// TODO head.zheadMu -> special mutex with Lock(ctx) so that Lock wait could be canceled
func (head *Head) zheadRLock()   { head.zheadMu.RLock() }
func (head *Head) zheadRUnlock() { head.zheadMu.RUnlock() }
func (head *Head) zheadLock()    { head.zheadMu.Lock() }
func (head *Head) zheadUnlock()  { head.zheadMu.Unlock() }
*/

// -------- 4) ZODB invalidation -> OS cache --------

func traceZWatch(format string, argv ...interface{}) {
	if !log.V(1) {	// XXX -> 2?
		return
	}

	log.InfoDepth(1, fmt.Sprintf("zwatcher: " + format, argv...))
}

// zwatcher watches for ZODB changes.
//
// see "4) when we receive an invalidation message from ZODB ..."
func (root *Root) zwatcher(ctx context.Context, zwatchq chan zodb.Event) (err error) {
	defer xerr.Contextf(&err, "zwatch %s", root.zstor.URL())
	traceZWatch(">>>")

	var zevent zodb.Event
	var ok bool

	for {
		traceZWatch("select ...")
		select {
		case <-ctx.Done():
			traceZWatch("cancel")
			return ctx.Err()

		case zevent, ok = <-zwatchq:
			if !ok {
				traceZWatch("zwatchq closed")
				return nil // closed	XXX ok?
			}

		}

		traceZWatch("zevent: %s", zevent)

		switch zevent := zevent.(type) {
		default:
			return fmt.Errorf("unexpected event: %T", zevent)

		case *zodb.EventError:
			return zevent.Err

		case *zodb.EventCommit:
			err = root.handleδZ(zevent)
			if err != nil {
				return err
			}
		}
	}
}

// handleδZ handles 1 change event from ZODB notification.
func (root *Root) handleδZ(δZ *zodb.EventCommit) (err error) {
	defer xerr.Contextf(&err, "handleδZ @%s", δZ.Tid)

	head := root.head

	// while we are invalidating OS cache, make sure that nothing, that
	// even reads /head/bigfile/*, is running (see 4.6).
	//
	// also make sure that cache uploaders we spawned (uploadBlk) are all
	// paused, or else they could overwrite OS cache with stale data.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for
	// details on how to do this without deadlocks.
	continueOSCacheUpload := make(chan struct{})
retry:
	for {
		head.zheadMu.Lock()
		head.pauseOSCacheUpload = true
		head.continueOSCacheUpload = continueOSCacheUpload

		// NOTE need atomic load, since inflightOSCacheUploads
		// decrement is done not under zheadMu.
		if atomic.LoadInt32(&head.inflightOSCacheUploads) != 0 {
			head.zheadMu.Unlock()
			continue retry
		}

		break
	}

	defer func() {
		head.pauseOSCacheUpload = false
		head.continueOSCacheUpload = nil
		head.zheadMu.Unlock()
		close(continueOSCacheUpload)
	}()

	// head.zheadMu wlocked and all cache uploaders are paused

	zhead := head.zconn
	bfdir := head.bfdir

	// invalidate kernel cache for data in changed files
	// NOTE no δFmu lock needed because zhead is WLocked
	δF := bfdir.δFtail.Update(δZ, zhead)

	if false {
	fmt.Printf("\n\nS: handleδZ: δF (#%d):\n", len(δF.ByFile))
	for file, δfile := range δF.ByFile {
		blkv := δfile.Blocks.Elements()
		sort.Slice(blkv, func(i, j int) bool {
			return blkv[i] < blkv[j]
		})
		size := " "
		if δfile.Size {
			size = "S"
		}
		fmt.Printf("S: \t- %s\t%s %v\n", file.zfile.POid(), size, blkv)
	}
	fmt.Printf("\n\n")
	}

	wg, ctx := errgroup.WithContext(context.TODO())	// XXX ctx = ?
	for file, δfile := range δF.ByFile {
//		// XXX needed?
//		// XXX even though δBtail is complete, not all ZBlk are present here
//		file.δtail.Append(δF.Rev, δfile.Blocks.Elements())

		file := file
		for blk := range δfile.Blocks {
			blk := blk
			wg.Go(func() error {
				return file.invalidateBlk(ctx, blk)
			})
		}
	}
	err = wg.Wait()
	if err != nil {
		return err
	}

	// invalidate kernel cache for attributes
	// we need to do it only if we see topology (i.e. btree) change
	//
	// do it after completing data invalidations.
	wg, ctx = errgroup.WithContext(context.TODO())	// XXX ctx = ?
	for file, δfile := range δF.ByFile {
		if !δfile.Size {
			continue
		}
		file := file
		wg.Go(func() error {
			return file.invalidateAttr() // NOTE does not accept ctx
		})
	}
	err = wg.Wait()
	if err != nil {
		return err
	}

	// resync .zhead to δZ.tid
	// XXX -> Head.Resync() ?

	// 1. abort old and resync to new txn/at
	transaction.Current(zhead.txnCtx).Abort()
	_, ctx = transaction.New(context.Background()) // XXX bg ok?
	err = zhead.Resync(ctx, δZ.Tid)
	if err != nil {
		return err
	}
	zhead.txnCtx = ctx

	// 2. restat invalidated ZBigFile
	// NOTE no lock needed since .blksize and .size are constant during lifetime of one txn.
	// XXX -> parallel
	for file := range δF.ByFile {
		size, sizePath, err := file.zfile.Size(ctx)
		if err != nil {
			return err
		}

		file.size = size
		bfdir.δFtail.Track(file, -1, sizePath, nil)

		// XXX we can miss a change to file if δblk is not yet tracked
		//     -> need to update file.rev at read time -> locking=XXX
		file.rev = zhead.At()
	}

	// notify .wcfs/zhead
	for sk := range gdebug.zheadSockTab {
		_, err := fmt.Fprintf(sk, "%s\n", δZ.Tid)
		if err != nil {
			log.Errorf("%s", err)	// XXX errctx + file, handle, reader pid
			sk.Close()
			delete(gdebug.zheadSockTab, sk)
		}
	}

	// XXX δFtail.ForgetPast(...)
	// XXX for f in δF: f.δtail.ForgetPast(...)

	// notify zhead.At waiters
	for w := range head.hwait {
		if w.at <= δZ.Tid {
			delete(head.hwait, w)
			close(w.ready)
		}
	}

	return nil
}

// hwaiter represents someone waiting for zhead to become ≥ at.
type hwaiter struct {
        at    zodb.Tid
        ready chan struct{}
}

// zheadWait waits till head.zconn.At becomes ≥ at.
//
// It returns error either if wcfs is down or ctx is canceled.
func (head *Head) zheadWait(ctx context.Context, at zodb.Tid) (err error) {
	defer xerr.Contextf(&err, "wait zhead ≥ %s", at)

	if head.rev != 0 {
		panic("must be called only for head/, not @revX/")
	}

	// XXX check wcfs.down

	// check if zhead is already ≥ at
	head.zheadMu.RLock()
	if head.zconn.At() >= at {
		head.zheadMu.RUnlock()
		return nil
	}

	// no - we have to wait for it
	ready := make(chan struct{})
	head.hwaitMu.Lock()
	head.hwait[hwaiter{at, ready}] = struct{}{}
	head.hwaitMu.Unlock()

	head.zheadMu.RUnlock()

	select {
	case <-ctx.Done():
		return ctx.Err()

	case <-ready:
		return nil // ok - zhead.At went ≥ at
	}
}

// invalidateBlk invalidates 1 file block in kernel cache.
//
// see "4.4) for all file/blk to in invalidate we do"
// called with zheadMu wlocked.
func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
	defer xerr.Contextf(&err, "%s: invalidate blk #%d:", f.path(), blk)

	fsconn := gfsconn
	blksize := f.blksize
	off := blk*blksize

	var blkdata []byte = nil

	// first try to retrieve f.loading[blk];
	// make sure f.loading[blk] is invalidated.
	//
	// we are running with zheadMu wlocked - no need to lock f.loadMu
	loading, ok := f.loading[blk]
	if ok {
		if loading.err == nil {
			blkdata = loading.blkdata
		}
		delete(f.loading, blk)
	}

	// TODO skip retrieve/store if len(f.watchTab) == 0
	// try to retrieve cache of current head/data[blk], if we got nothing from f.loading
	if blkdata == nil {
		blkdata = make([]byte, blksize)
		n, st := fsconn.FileRetrieveCache(f.Inode(), off, blkdata)
		if st != fuse.OK {
			// XXX warn
		}
		blkdata = blkdata[:n]
	}

	// if less than blksize was cached - probably the kernel had to evict
	// some data from its cache already. In such case we don't try to
	// preserve the rest and drop what was read, to avoid keeping the
	// system overloaded.
	//
	// if we have the data - preserve it under @revX/bigfile/file[blk].
	if int64(len(blkdata)) == blksize {
		func() {
			// store retrieved data back to OS cache for file @<rev>/file[blk]
			blkrev, _ := f.LastBlkRev(ctx, blk, f.head.zconn.At())
			frev, funlock, err := groot.lockRevFile(blkrev, f.zfile.POid())
			if err != nil {
				log.Errorf("BUG: %s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
			}
			defer funlock()

			st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
			if st != fuse.OK {
				log.Errorf("BUG: %s: invalidate blk #%d: %s: store cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, frev.path(), st)
			}
		}()
	}

	// invalidate file/head/data[blk] in OS file cache.
	st := fsconn.FileNotify(f.Inode(), off, blksize)
	if st != fuse.OK {
		return syscall.Errno(st)
	}

	return nil
}

// invalidateAttr invalidates file attributes in kernel cache.
//
// complements invalidateBlk and is used to invalidate file size.
// called with zheadMu wlocked.
func (f *BigFile) invalidateAttr() (err error) {
	defer xerr.Contextf(&err, "%s: invalidate attr", f.path())
	fsconn := gfsconn
	st := fsconn.FileNotify(f.Inode(), -1, -1) // metadata only
	if st != fuse.OK {
		return syscall.Errno(st)
	}
	return nil
}


// lockRevFile makes sure inode ID of /@<rev>/bigfile/<fid> is known to kernel
// and won't change until unlock.
//
// We need node ID to be know to the kernel, when we need to store data into
// file's kernel cache - if the kernel don't have the node ID for the file in
// question, FileNotifyStoreCache will just fail.
//
// For kernel to know the inode lockRevFile issues regular filesystem lookup
// request which goes to kernel and should go back to wcfs. It is thus not safe
// to use lockRevFile from under FUSE request handler as doing so might deadlock.
//
// Caller must call unlock when inode ID is no longer required to be present.
// It is safe to simultaneously call multiple lockRevFile with the same arguments.
func (root *Root) lockRevFile(rev zodb.Tid, fid zodb.Oid) (_ *BigFile, unlock func(), err error) {
	fsconn := gfsconn

	frevpath := fmt.Sprintf("@%s/bigfile/%s", rev, fid) // relative to fs root for now
	defer xerr.Contextf(&err, "/: lockRevFile %s", frevpath)

	// FIXME checking for "node{0}" is fragile:
	// XXX the node could be still forgotten since we are not holding open on it
	// XXX -> always os.open unconditionally for now
	//        or is it ok since it is just a cache?
	//	  -> no, not ok: if inode ID is forgotten, the same ID could be
	//	     reallocated to another file and then we'll corrupt in-kernel
	//	     cache by wrongly storing data of one file into cache of
	//	     another file.
	//	     -> to avoid this we need to always lock the inode ID with real open.
	// XXX (also disabled for now due to race-detector)
/*
	// first check without going through kernel, whether the inode maybe known already
	xfrev := fsconn.LookupNode(root.Inode(), frevpath)
	if xfrev != nil {
		if xfrev.String() != "node{0}" {
			return xfrev.Node().(*BigFile), func(){}, nil
		}
	}
*/

	// we have to ping the kernel
	frevospath := gmntpt + "/" + frevpath // now starting from OS /
	f, err := os.Open(frevospath)
	if err != nil {
		return nil, nil, err
	}

	xfrev := fsconn.LookupNode(root.Inode(), frevpath)
	// must be !nil as open succeeded
	return xfrev.Node().(*BigFile), func() { f.Close() }, nil
}

// -------- 7) FUSE read(#blk) --------

// /(head|<rev>)/bigfile/<bigfileX> -> Read serves reading bigfile data.
func (f *BigFile) Read(_ nodefs.File, dest []byte, off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
	f.head.zheadMu.RLock()
	defer f.head.zheadMu.RUnlock()

	// cap read request to file size
	end := off + int64(len(dest))		// XXX overflow?
	if end > f.size {
		end = f.size
	}
	if end <= off {
		// the kernel issues e.g. [0 +4K) read for f.size=0 and expects to get (0, ok)
		// POSIX also says to return 0 if off >= f.size
		return fuse.ReadResultData(nil), fuse.OK
	}

	// widen read request to be aligned with blksize granularity
	// (we can load only whole ZBlk* blocks)
	aoff := off - (off % f.blksize)
	aend := end
	if re := end % f.blksize; re != 0 {
		aend += f.blksize - re
	}
	// XXX use original dest if it can fit the data
	dest = make([]byte, aend - aoff) // ~> [aoff:aend) in file

	// XXX better ctx = transaction.PutIntoContext(ctx, txn)
	ctx, cancel := xcontext.Merge(fctx, f.head.zconn.txnCtx)
	defer cancel()

	// read/load all block(s) in parallel
	wg, ctx := errgroup.WithContext(ctx)
	for blkoff := aoff; blkoff < aend; blkoff += f.blksize {
		blkoff := blkoff
		blk := blkoff / f.blksize
		wg.Go(func() error {
			δ := blkoff-aoff // blk position in dest
			//log.Infof("readBlk #%d dest[%d:+%d]", blk, δ, f.blksize)
			return f.readBlk(ctx, blk, dest[δ:δ+f.blksize])
		})
	}

	err := wg.Wait()
	if err != nil {
		return nil, err2LogStatus(err)
	}

	return fuse.ReadResultData(dest[off-aoff:end-aoff]), fuse.OK
}

// readBlk serves Read to read 1 ZBlk #blk into destination buffer.
//
// see "7) when we receive a FUSE read(#blk) request ..." in overview.
//
// len(dest) == blksize.
// called with head.zheadMu rlocked.
func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err error) {
	defer xerr.Contextf(&err, "%s: readblk #%d", f.path(), blk)

	// check if someone else is already loading this block
	f.loadMu.Lock()
	loading, already := f.loading[blk]
	if !already {
		loading = &blkLoadState{
			ready:   make(chan struct{}),
		}
		f.loading[blk] = loading
	}
	f.loadMu.Unlock()

	// if it is already loading - just wait for it
	if already {
		select {
		case <-ctx.Done():
			return ctx.Err()

		case <-loading.ready:
			if loading.err == nil {
				copy(dest, loading.blkdata)	// XXX copy
			}
			return loading.err
		}
	}

	// noone was loading - we became responsible to load this block
	blkdata, treepath, zblk, blkrevMax, err := f.zfile.LoadBlk(ctx, blk)
	loading.blkdata = blkdata
	loading.err = err

	// data loaded with error - cleanup .loading
	if loading.err != nil {
		close(loading.ready)
		f.loadMu.Lock()
		delete(f.loading, blk)
		f.loadMu.Unlock()
		return err
	}

	// we have the data - it can be used after watchers are updated
	// XXX should we use ctx here? (see readPinWatchers comments)
	f.readPinWatchers(ctx, blk, treepath, zblk, blkrevMax)

	// data can be used now
	close(loading.ready)
	copy(dest, blkdata)	// XXX copy

	// store to kernel pagecache whole block that we've just loaded from database.
	// This way, even if the user currently requested to read only small portion from it,
	// it will prevent next e.g. consecutive user read request to again hit
	// the DB, and instead will be served by kernel from its pagecache.
	//
	// We cannot do this directly from reading goroutine - while reading
	// kernel FUSE is holding corresponding page in pagecache locked, and if
	// we would try to update that same page in pagecache it would result
	// in deadlock inside kernel.
	//
	// .loading cleanup is done once we are finished with putting the data into OS pagecache.
	// If we do it earlier - a simultaneous read covered by the same block could result
	// into missing both kernel pagecache (if not yet updated) and empty .loading[blk],
	// and thus would trigger DB access again.
	//
	// XXX if direct-io: don't touch pagecache
	// XXX upload parts only not covered by currrent read (not to e.g. wait for page lock)
	// XXX skip upload completely if read is wide to cover whole blksize
	go f.uploadBlk(blk, loading)

	return nil
}

// uploadBlk complements readBlk: it uploads loaded blkdata into OS cache.
func (f *BigFile) uploadBlk(blk int64, loading *blkLoadState) {
	head := f.head

	// rlock zheadMu and make sure zwatcher is not asking us to pause.
	// if it does - wait for a safer time not to deadlock.
	// see notes.txt -> "Kernel locks page on read/cache store/..." for details.
retry:
	for {
		head.zheadMu.RLock()
		// help zwatcher if it asks us to pause uploadings, so it can
		// take zheadMu wlocked without deadlocks.
		if head.pauseOSCacheUpload {
			ready := head.continueOSCacheUpload
			head.zheadMu.RUnlock()
			<-ready
			continue retry
		}

		break
	}

	// zheadMu rlocked.
	// zwatcher is not currently trying to pause OS cache uploads.

	// check if this block was already invalidated by zwatcher.
	// if so don't upload the block into OS cache.
	f.loadMu.Lock()
	loading_ := f.loading[blk]
	f.loadMu.Unlock()
	if loading != loading_ {
		head.zheadMu.RUnlock()
		return
	}

	oid := f.zfile.POid()

	// signal to zwatcher not to run while we are performing the upload.
	// upload with released zheadMu so that zwatcher can lock it even if to
	// check inflightOSCacheUploads status.
	atomic.AddInt32(&head.inflightOSCacheUploads, +1)
	head.zheadMu.RUnlock()

	st := gfsconn.FileNotifyStoreCache(f.Inode(), blk*f.blksize, loading.blkdata)

	f.loadMu.Lock()
	bug := (loading != f.loading[blk])
	if !bug {
		delete(f.loading, blk)
	}
	f.loadMu.Unlock()

	// signal to zwatcher that we are done and it can continue.
	atomic.AddInt32(&head.inflightOSCacheUploads, -1)

	if bug {
		panicf("BUG: bigfile %s: blk %d: f.loading mutated while uploading data to pagecache", oid, blk)
	}

	if st == fuse.OK {
		return
	}

	// pagecache update failed, but it must not (we verified on startup that
	// pagecache control is supported by kernel). We can correctly live on
	// with the error, but data access will be likely very slow. Tell user
	// about the problem.
	log.Errorf("BUG: bigfile %s: blk %d: -> pagecache: %s  (ignoring, but reading from bigfile will be very slow)", oid, blk, st)
}

// -------- invalidation protocol notification/serving --------
//
// (see "7.2) for all registered client@at watchers ...")

// pin makes sure that file[blk] on client side is the same as of @rev state.
//
// rev = zodb.TidMax means @head; otherwise rev must be ≤ w.at and there must
// be no rev_next changing file[blk]: rev < rev_next ≤ w.at.
//
// must be called with atMu rlocked.
//
// XXX error - when? or close watch on any error?
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
	defer xerr.Contextf(&err, "wlink%d: f<%s>", w.link.id, w.file.zfile.POid())
	return w._pin(ctx, blk, rev)
}

func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
	foid := w.file.zfile.POid()
	revstr := rev.String()
	if rev == zodb.TidMax {
		revstr = "head"
	}
	defer xerr.Contextf(&err, "pin #%d @%s", blk, revstr)

	if !(rev == zodb.TidMax || rev <= w.at) {
		panicf("f<%s>: wlink%d: pin #%d @%s: watch.at (%s) < rev",
			foid, w.link.id, blk, rev, w.at)
	}

	w.pinnedMu.Lock()

	// check/wait for previous/simultaneous pin.
	// (pin could be called simultaneously e.g. by setupWatch and readPinWatchers)
	for {
		blkpin := w.pinned[blk]
		if blkpin == nil {
			break
		}

		w.pinnedMu.Unlock()
		<-blkpin.ready	// XXX + ctx ? (or just keep ready ?)

		if blkpin.rev == rev {
			// already pinned
			// (e.g. os cache for block was evicted and read called the second time)
			return blkpin.err
		}

		// relock the watch and check that w.pinned[blk] is the same. Retry if it is not.
		// ( w.pinned[blk] could have changed while w.mu was not held e.g. by	XXX recheck
		//   simultaneous setupWatch if we were called by readPinWatchers )
		w.pinnedMu.Lock()
		if blkpin == w.pinned[blk] {
			if blkpin.rev == zodb.TidMax {
				w.pinnedMu.Unlock()
				panicf("f<%s>: wlink%d: pinned[#%d] = @head", foid, w.link.id, blk)
			}
			break
		}
	}

	// w.pinnedMu locked & previous pin is either nil or completed and its .rev != rev
	// -> setup new pin state
	blkpin := &blkPinState{rev: rev, ready: make(chan struct{})}
	w.pinned[blk] = blkpin

	// perform IO without w.pinnedMu
	w.pinnedMu.Unlock()
	ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
	w.pinnedMu.Lock()

	// check IO reply & verify/signal blkpin is ready
	defer func() {
		if rev == zodb.TidMax {
			delete(w.pinned, blk)
		}

		w.pinnedMu.Unlock()
		close(blkpin.ready)
	}()


	if err != nil {
		blkpin.err = err
		return err
	}

	if ack != "ack" {
		blkpin.err = fmt.Errorf("expect %q; got %q", "ack", ack)
		return blkpin.err
	}

	if blkpin != w.pinned[blk] {
		blkpin.err = fmt.Errorf("BUG: pinned[#%d] mutated while doing IO", blk)
		panicf("f<%s>: wlink%d: %s", foid, w.link.id, blkpin.err)
	}

	return nil
}

// readPinWatchers complements readBlk: it sends `pin blk` for watchers of the file
// after a block was loaded from ZODB and before block data is returned to kernel.
//
// See "7.2) for all registered client@at watchers ..."
//
// Called with f.head.zheadMu rlocked.
//
// XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure? -> probably no
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btree.LONode, zblk zBlk, blkrevMax zodb.Tid) {
	// only head/ is being watched for
	if f.head.rev != 0 {
		return
	}

	fmt.Printf("S: read #%d -> pin watchers (#%d)\n", blk, len(f.watchTab))

	// update δFtail index
	bfdir := f.head.bfdir
	bfdir.δFmu.Lock()		// XXX locking correct? XXX -> better push down?
	bfdir.δFtail.Track(f, blk, treepath, zblk)	// XXX pass in zblk.rev here?
	bfdir.δFmu.Unlock()

	// makes sure that file[blk] on clients side stays as of @w.at state.

	// try to use blkrevMax only as the first cheap criteria to skip updating watchers.
	// This is likely to be the case, since most watchers should be usually close to head.
	// If using blkrevMax only turns out to be not sufficient, we'll
	// consult δFtail, which might involve recomputing it.
	blkrev := blkrevMax
	blkrevRough := true

	wg, ctx := errgroup.WithContext(ctx)
	for w := range f.watchTab {	// XXX locking (f.watchTab)
		w := w

		// make sure w.at stays unchanged while we pin the block
		w.atMu.RLock()

		fmt.Printf("S: read -> pin watchers: w @%s\n", w.at)

		// the block is already covered by @w.at database view
		if blkrev <= w.at {
			w.atMu.RUnlock()
			continue
		}

		// if blkrev is rough estimation and that upper bound is > w.at
		// we have to recompute ~exact file[blk] revision @head.
		if blkrevRough {
			// unlock atMu while we are (re-)calculating blkrev
			// we'll relock atMu again and recheck blkrev vs w.at after.
			w.atMu.RUnlock()

			blkrev, _ = f.LastBlkRev(ctx, blk, f.head.zconn.At())
			blkrevRough = false

			w.atMu.RLock()
			if blkrev <= w.at {
				w.atMu.RUnlock()
				continue
			}
		}

		// the block is newer - find out its revision as of @w.at and pin to that.
		//
		// We don't pin to w.at since if we would do so for several clients,
		// and most of them would be on different w.at - cache of the file will
		// be lost. Via pinning to particular block revision, we make sure the
		// revision to pin is the same on all clients, and so file cache is shared.
		pinrev, _ := w.file.LastBlkRev(ctx, blk, w.at)	// XXX move into go?

		wg.Go(func() error {
			// XXX close watcher on any error
			defer w.atMu.RUnlock()
			return w.pin(ctx, blk, pinrev)
		})
	}
	err := wg.Wait()
	if err != nil {
		panic(err)	// XXX
	}
}

// setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request.
//
// XXX sends "pin" notifications; final "ok" must be sent by caller.
//
// XXX called synchronously - only 1 setupWatch call at a time?
func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.Tid) (err error) {
	defer xerr.Contextf(&err, "setup watch f<%s> @%s", foid, at)
	head := wlink.head
	bfdir := head.bfdir

	// wait for zhead.At ≥ at
	if at != zodb.InvalidTid {
		err = head.zheadWait(ctx, at)
		if err != nil {
			return err
		}
	}

	// make sure zhead.At stays unchanged while we are preparing the watch
	// (see vvv e.g. about unpin to @head for why it is needed)
	head.zheadMu.RLock()
	defer head.zheadMu.RUnlock()
	headAt := head.zconn.At()

	// XXX δFtail locking? (or ForgetPast is called only with zheadMu.W ?)
	if at != zodb.InvalidTid && at < bfdir.δFtail.Tail() {
		return fmt.Errorf("too far away back from head/at (@%s); δt = %s",
			headAt, headAt.Time().Sub(at.Time().Time))
	}

	// if watch was already established - we need to update it
	w := wlink.byfile[foid]	// XXX locking
	if w == nil {
		// watch was not previously established - set it up anew
		f := bfdir.fileTab[foid]	// XXX locking
		if f == nil {
			// by "invalidation protocol" watch is setup after data file was opened
			return fmt.Errorf("file not yet known to wcfs or is not a ZBigFile")
		}

		w = &Watch{
			link:   wlink,
			file:   f,
			at:     at,
			pinned: make(map[int64]*blkPinState),
		}
	}

	// at="-" (InvalidTid) means "remove the watch"
	if at == zodb.InvalidTid {
		delete(wlink.byfile, foid)	// XXX locking
		delete(w.file.watchTab, w)	// XXX locking
		return nil
	}

	// request exclusive access to the watch to change .at and compute pins.
	// The lock will be downgraded from W to R after pins computation is done.
	// Pins will be executed with atMu.R only - with the idea not to block
	// other client that read-access the file simultaneously to setupWatch.
	w.atMu.Lock()

	// check at >= w.at
	// XXX we might want to allow going back in history if we need it.
	if !(at >= w.at) {
		w.atMu.Unlock()
		return fmt.Errorf("going back in history is forbidden")
	}

	f := w.file

	// register w to f early, so that READs going in parallel to us
	// preparing and processing initial pins, also send pins to w for read
	// blocks. If we don't, we can miss to send pin to w for a freshly read
	// block which could have revision > w.at:	XXX test
	//
	//                1   3    2   4
	//	-----.----x---o----x---x------]----------
	//           ↑                        ↑
	//          w.at                     head
	//
	// Here blocks #1, #2 and #4 were previously accessed, are thus tracked
	// by δFtail and are changed after w.at - they will be returned by vvv
	// δFtail query and pin-sent to w. Block #3 was not yet accessed but
	// was also changed after w.at . As head/file[#3] might be accessed
	// simultaneously to watch setup, and f.readBlk will be checking
	// f.watchTab; if w ∉ f.watchTab at that moment, w will miss to receive
	// pin for #3.
	//
	// NOTE for `unpin blk` to -> @head we can be sure there won't be
	// simultaneous `pin blk` request, because:
	//
	// - unpin means blk was previously pinned,
	// - blk was pinned means it is tracked by δFtail,
	// - if blk is tracked and δFtail says there is no δblk ∈ (at, head],
	//   there is indeed no blk change in that region,
	// - which means that δblk with rev > w.at might be only > head,
	// - but such δblk are processed with zhead wlocked and we keep zhead
	//   rlocked during pin setup.
	//
	//          δ                      δ
	//      ----x----.------------]----x----
	//               ↑            ↑
	//              w.at         head
	//
	// - also: there won't be simultaneous READs that would need to be
	//   unpinned, because we update w.at to requested at early.
	//
	// XXX register only if watch was created anew, not updated?
	w.at = at
	// NOTE registering f.watchTab[w] and wlink.byfile[foid] = w must come together.
	f.watchTab[w] = struct{}{}	// XXX locking
	wlink.byfile[foid] = w		// XXX locking

	// XXX defer -> unregister watch if error?

	// pin all tracked file blocks that were changed in (at, head] range.
	toPin := map[int64]zodb.Tid{} // blk -> @rev

	for _, δfile := range bfdir.δFtail.SliceByFileRev(f, at, headAt) {	// XXX locking δFtail
		for blk := range δfile.Blocks {
			_, already := toPin[blk]
			if already {
				continue
			}

			toPin[blk], _ = f.LastBlkRev(ctx, blk, at)
		}
	}

	// if a block was previously pinned, but ∉ δ(at, head] -> unpin it to head.
	for blk, pinPrev := range w.pinned {
		// only 1 setupWatch can be run simultaneously for one file
		// XXX assert pinPrev.rev != zodb.TidMax

		pinNew, pinning := toPin[blk]
		if !pinning {
			toPin[blk] = zodb.TidMax // @head
		}

		// TODO don't bother to spawn .pin goroutines if pin revision is the same ?
		// if pinNew == pinPrev.rev && ready(pinPrev.ready) && pinPrev.err == nil {
		// 	delete(toPin, blk)
		// }
		_ = pinPrev
		_ = pinNew
	}

	// downgrade atMu.W -> atMu.R
	// XXX there is no primitive to do Wlock->Rlock atomically, but we are
	// ok with that since we prepared eveyrhing to handle simultaneous pins
	// from other reads.
	w.atMu.Unlock()
	w.atMu.RLock()
	defer w.atMu.RUnlock()

	wg, ctx := errgroup.WithContext(ctx)
	for blk, rev := range toPin {
		blk := blk
		rev := rev
		wg.Go(func() error {
			return w._pin(ctx, blk, rev)
		})
	}
	err = wg.Wait()
	if err != nil {
		return err
	}

	return nil
}

// Open serves /head/watch opens.
func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
	// XXX check flags?
	head := wnode.head

	wlink := &WatchLink{
		sk:     NewFileSock(),
		id:     atomic.AddInt32(&wnode.idNext, +1),
		head:   head,
		byfile: make(map[zodb.Oid]*Watch),
		rxTab:  make(map[uint64]chan string),
	}

	head.wlinkMu.Lock()
	// XXX del wlinkTab[w] on w.sk.File.Release
	head.wlinkTab[wlink] = struct{}{}
	head.wlinkMu.Unlock()

	go wlink.serve()
	return wlink.sk.File(), fuse.OK
}

// serve serves client initiated watch requests and routes client replies to
// wcfs initiated pin requests.
func (wlink *WatchLink) serve() {
	err := wlink._serve()
	// XXX log error if !close
	if err != nil {
		log.Error(err)
	}

	head := wlink.head
	head.wlinkMu.Lock()
	delete(head.wlinkTab, wlink)
	head.wlinkMu.Unlock()
}

func (wlink *WatchLink) _serve() (err error) {
	defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
	r := bufio.NewReader(wlink.sk)

	ctx0 := context.TODO()	// XXX ctx = ? -> merge(ctx of wcfs running, ctx of wlink timeout)

	ctx, cancel := context.WithCancel(ctx0)
	wg, ctx := errgroup.WithContext(ctx)

	defer func() {
		// cancel all handlers on both error and ok return.
		// ( ok return is e.g. when we received "bye", so if client
		//   sends "bye" and some pin handlers are in progress - they
		//   anyway don't need to wait for client replies anymore )
		cancel()

		err2 := wg.Wait()
		if err == nil {
			err = err2
		}

		// unregister all watches created on this wlink
		for _, w := range wlink.byfile {	// XXX locking
			delete(w.file.watchTab, w)	// XXX locking
		}
		wlink.byfile = nil

		// write to peer if it was logical error on client side
		// then .sk.tx to wakeup rx on client side
		if err != nil {
			_ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err))
		}

		// close .sk.tx : this wakes up rx on client side.
		err2 = wlink.sk.CloseWrite()
		if err == nil {
			err = err2
		}
	}()

	// close .sk.rx on error/wcfs stopping or return: this wakes up read(sk).
	retq := make(chan struct{})
	defer close(retq)
	wg.Go(func() error {
		// monitor is always canceled - either at parent ctx cancel, or
		// upon return from serve (see "cancel all handlers ..." ^^^).
		// If it was return - report returned error to wg.Wait, not "canceled".
		<-ctx.Done()
		e := ctx.Err()
		select {
		default:
		case <-retq:
			e = err // returned error
		}

		e2 := wlink.sk.CloseRead()
		if e == nil {
			e = e2
		}
		return e
	})

	// allow for only 1 watch request at a time
	// TODO allow simultaneous watch requests for different files
	var handlingWatch int32

	for {
		l, err := r.ReadString('\n')	// XXX limit accepted line len to prevent DOS
		if err != nil {
			// r.Read is woken up by sk.CloseRead when serve decides to exit
			if err == io.ErrClosedPipe {
				err = nil
			}
			return err
		}

		fmt.Printf("S: wlink %d: rx: %q\n", wlink.id, l)

		stream, msg, err := parseWatchFrame(l)
		if err != nil {
			return err
		}

		// reply from client to wcfs
		reply := (stream % 2 == 0)
		if reply {
			wlink.rxMu.Lock()
			rxq := wlink.rxTab[stream]
			delete(wlink.rxTab, stream)
			wlink.rxMu.Unlock()

			if rxq == nil {
				return fmt.Errorf("%d: reply on unexpected stream", stream)
			}
			rxq <- msg
			continue
		}

		// client-initiated request

		// bye		TODO document in "Invalidation protocol"
		if msg == "bye" {
			return nil // deferred sk.Close will wake-up rx on client side
		}

		// watch ...
		if atomic.LoadInt32(&handlingWatch) != 0 {
			return fmt.Errorf("%d: another watch request is already in progress", stream)
		}
		atomic.StoreInt32(&handlingWatch, 1)
		wg.Go(func() error {
			defer atomic.StoreInt32(&handlingWatch, 0)
			return wlink.handleWatch(ctx, stream, msg)
		})
	}
}

// handleWatch handles watch request from client.
//
// returned error comes without full error prefix.
func (wlink *WatchLink) handleWatch(ctx context.Context, stream uint64, msg string) (err error) {
	defer xerr.Contextf(&err, "%d", stream)

	err = wlink._handleWatch(ctx, msg)
	reply := "ok"
	if err != nil {
		// logical error is reported back to client, but watch link remains live
		reply = fmt.Sprintf("error %s", err)
		err = nil
	}

	err = wlink.send(ctx, stream, reply)
	return err
}

func (wlink *WatchLink) _handleWatch(ctx context.Context, msg string) error {
	foid, at, err := parseWatch(msg)
	if err != nil {
		return err
	}

	err = wlink.setupWatch(ctx, foid, at)
	return err
}

// sendReq sends wcfs-originated request to client and returns client response.
func (wlink *WatchLink) sendReq(ctx context.Context, req string) (reply string, err error) {
	// XXX err ctx
	var stream uint64
	for stream == 0 {
		stream = atomic.AddUint64(&wlink.reqNext, +2)
	}

	rxq := make(chan string) // XXX cap=1? (so that if we return canceled we do not block client)
	wlink.rxMu.Lock()
	wlink.rxTab[stream] = rxq	// XXX assert .stream is not there?
	wlink.rxMu.Unlock()

	err = wlink.send(ctx, stream, req)
	if err != nil {
		return "", err
	}

	select {
	case <-ctx.Done():
		// XXX del rxTab[stream] ?
		return "", ctx.Err()

	case reply = <-rxq:
		return reply, nil
	}
}

// send sends a message to client over specified stream ID.
//
// Multiple send can be called simultaneously; send serializes writes.
func (wlink *WatchLink) send(ctx context.Context, stream uint64, msg string) error {
	// XXX err ctx
	// XXX assert '\n' not in msg

	wlink.txMu.Lock()
	defer wlink.txMu.Unlock()

	// XXX timeout write on ctx cancel
	pkt := []byte(fmt.Sprintf("%d %s\n", stream, msg))
	fmt.Printf("S: wlink %d: tx: %q\n", wlink.id, pkt)
	_, err := wlink.sk.Write(pkt)
	if err != nil {
		return err
	}

	return nil
}


// ---- Lookup ----

// /(head|<rev>)/bigfile/ -> Lookup receives client request to create /(head|<rev>)/bigfile/<bigfileX>.
func (bfdir *BigFileDir) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
	f, err := bfdir.lookup(out, name, fctx)
	var inode *nodefs.Inode
	if f != nil {
		inode = f.Inode()
	}
	return inode, err2LogStatus(err)

}

func (bfdir *BigFileDir) lookup(out *fuse.Attr, name string, fctx *fuse.Context) (f *BigFile, err error) {
	defer xerr.Contextf(&err, "%s: lookup %q", bfdir.path(), name)

	oid, err := zodb.ParseOid(name)
	if err != nil {
		return nil, eINVALf("not oid")
	}

	bfdir.head.zheadMu.RLock()
	defer bfdir.head.zheadMu.RUnlock()

	defer func() {
		if f != nil {
			f.getattr(out)
		}
	}()

	// check to see if dir(oid) is already there
	bfdir.fileMu.Lock()
	f, already := bfdir.fileTab[oid]
	bfdir.fileMu.Unlock()

	if already {
		return f, nil
	}

	// not there - without bfdir lock proceed to open BigFile from ZODB
	f, err = bfdir.head.bigopen(fctx, oid)
	if err != nil {
		return nil, err
	}

	// relock bfdir and either register f or, if the file was maybe
	// simultaneously created while we were not holding bfdir.fileMu, return that.
	bfdir.fileMu.Lock()
	f2, already := bfdir.fileTab[oid]
	if already {
		bfdir.fileMu.Unlock()
		f.Close()
		return f2, nil
	}

	bfdir.fileTab[oid] = f
	bfdir.fileMu.Unlock()

	// mkfile takes filesystem treeLock - do it outside bfdir.fileMu
	mkfile(bfdir, name, f)

	return f, nil
}

// / -> Lookup receives client request to create @<rev>/.
func (root *Root) Lookup(out *fuse.Attr, name string, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
	revd, err := root.lookup(name, fctx)
	var inode *nodefs.Inode
	if revd != nil {
		inode = revd.Inode()
		_ = revd.GetAttr(out, nil, fctx) // always ok
	}
	return inode, err2LogStatus(err)
}

func (root *Root) lookup(name string, fctx *fuse.Context) (_ *Head, err error) {
	defer xerr.Contextf(&err, "/: lookup %q", name)

	var rev zodb.Tid
	ok := false

	if strings.HasPrefix(name, "@") {
		rev, err = zodb.ParseTid(name[1:])
		ok = (err == nil)
	}
	if !ok {
		return nil, eINVALf("not @rev")
	}

	// check to see if dir(rev) is already there
	root.revMu.Lock()
	revDir, already := root.revTab[rev]
	root.revMu.Unlock()

	if already {
		return revDir, nil
	}

	// not there - without revMu lock proceed to open @rev view of ZODB
//	zconnRev, err := root.zopenAt(fctx, rev)
	zconnRev, err := zopen(fctx, root.zdb, &zodb.ConnOptions{At: rev})
	if err != nil {
		return nil, err
	}

	// relock root and either register new revX/ directory or, if the
	// directory was maybe simultaneously created while we were not holding
	// revMu, return that.
	root.revMu.Lock()
	revDir, already = root.revTab[rev]
	if already {
		root.revMu.Unlock()
//		zconnRev.Release()
		transaction.Current(zconnRev.txnCtx).Abort()
		return revDir, nil
	}

	revDir = &Head{
		fsNode: newFSNode(&fsOptions{Sticky: false}),	// XXX + Head.OnForget() -> del root.revTab[]
		rev:    rev,
		zconn:  zconnRev,
	}

	bfdir := &BigFileDir{
		fsNode:  newFSNode(&fsOptions{Sticky: false}),	// XXX + BigFileDir.OnForget()
		head:    revDir,
		fileTab: make(map[zodb.Oid]*BigFile),
		δFtail:  nil, // δFtail not needed/used for @revX/
	}
	revDir.bfdir = bfdir

	root.revTab[rev] = revDir
	root.revMu.Unlock()

	// mkdir takes filesystem treeLock - do it outside revMu.
	mkdir(root, name, revDir)
	mkdir(revDir, "bigfile", bfdir)
	// XXX + "at"

	return revDir, nil
}


// bigopen opens BigFile corresponding to oid on head.zconn.
//
// A ZBigFile corresponding to oid is activated and statted.
//
// head.zconn must be locked.
func (head *Head) bigopen(ctx context.Context, oid zodb.Oid) (_ *BigFile, err error) {
	zconn := head.zconn
	defer xerr.Contextf(&err, "bigopen %s @%s", oid, zconn.At())

	// XXX better ctx = transaction.PutIntoContext(ctx, txn)
	ctx, cancel := xcontext.Merge(ctx, zconn.txnCtx)
	defer cancel()

	xzfile, err := zconn.Get(ctx, oid)
	if err != nil {
		switch errors.Cause(err).(type) {
		case *zodb.NoObjectError:
			return nil, eINVAL(err)
		case *zodb.NoDataError:
			return nil, eINVAL(err) // XXX what to do if it was existing and got deleted?
		default:
			return nil, err
		}
	}

	zfile, ok := xzfile.(*ZBigFile)
	if !ok {
		return nil, eINVALf("%s is not a ZBigFile", typeOf(xzfile))
	}

	// extract blksize, size and initial approximation for file revision
	err = zfile.PActivate(ctx)
	if err != nil {
		return nil, err
	}
	blksize := zfile.blksize
	// XXX it should be revision of both ZBigFile and its data. But we
	// cannot get data revision without expensive scan of all ZBigFile's objects.
	// -> approximate mtime initially with ZBigFile object mtime.
	//
	// XXX for @rev/... we can know initial mtime more exactly?
	rev     := zfile.PSerial()
	zfile.PDeactivate()

	size, sizePath, err := zfile.Size(ctx)
	if err != nil {
		return nil, err
	}

	f := &BigFile{
		fsNode:  newFSNode(&fsOptions{Sticky: false}),	// XXX + BigFile.OnForget -> del .head.bfdir.fileTab[]
		head:    head,
		zfile:   zfile,
		blksize: blksize,
		size:    size,
		rev:     rev,
		loading: make(map[int64]*blkLoadState),
	}

	// only head/ needs δFtail, f.δtail and watches.
	if head.rev == 0 {
		head.bfdir.δFmu.Lock()	// XXX locking ok?
		head.bfdir.δFtail.Track(f, -1, sizePath, nil)
		head.bfdir.δFmu.Unlock()

		// FIXME: scan zfile.blktab - so that we can detect all btree changes
		// see  "XXX building δFtail lazily ..." in notes.txt

		f.watchTab = make(map[*Watch]struct{})
	}

	return f, nil
}

// Close release all resources of BigFile.
func (f *BigFile) Close() error {
	// XXX locking?
	f.zfile = nil

//	f.zconn.Release()
//	f.zconn = nil
	f.head = nil

	return nil
}

// ---- misc ---

// /(head|<rev>)/at -> readAt serves read.
func (h *Head) readAt() []byte {
	h.zheadMu.RLock()
	defer h.zheadMu.RUnlock()

	return []byte(h.zconn.At().String())
}

// /(head|<rev>)/ -> Getattr serves stat.
func (head *Head) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
	at := head.rev
	if at == 0 {
		head.zheadMu.RLock()
		at = head.zconn.At()
		head.zheadMu.RUnlock()
	}
	t := at.Time().Time

	out.Mode = fuse.S_IFDIR | 0555
	out.SetTimes(/*atime=*/nil, /*mtime=*/&t, /*ctime=*/&t)
	return fuse.OK
}

// /(head|<rev>)/bigfile/<bigfileX> -> Getattr serves stat.
func (f *BigFile) GetAttr(out *fuse.Attr, _ nodefs.File, _ *fuse.Context) fuse.Status {
	f.head.zheadMu.RLock()
	defer f.head.zheadMu.RUnlock()

	f.getattr(out)
	return fuse.OK
}

func (f *BigFile) getattr(out *fuse.Attr) {
	out.Mode = fuse.S_IFREG | 0444
	out.Size = uint64(f.size)
	// .Blocks
	// .Blksize

	mtime := f.rev.Time().Time
	out.SetTimes(/*atime=*/nil, /*mtime=*/&mtime, /*ctime=*/&mtime)
}




// FIXME groot/gfsconn is tmp workaround for lack of way to retrieve FileSystemConnector from nodefs.Inode
// TODO:
//	- Inode += .Mount() -> nodefs.Mount
//	- Mount:
//		.Root()		-> root Inode of the fs
//		.Connector()	-> FileSystemConnector through which fs is mounted
var groot   *Root
var gfsconn *nodefs.FileSystemConnector

// root of the filesystem is mounted here.
//
// we need to talk to kernel and lookup @<rev>/bigfile/<fid> before uploading
// data to kernel cache there. Referencing root of the filesystem via path is
// vulnerable to bugs wrt e.g. `mount --move` and/or mounting something else
// over wcfs. However keeping opened root fd will prevent wcfs to be unmounted,
// so we still have to reference the root via path.
var gmntpt string

// debugging	(protected by zhead.W)
var gdebug = struct {
	// .wcfs/zhead opens
	// protected by groot.head.zheadMu
	zheadSockTab map[*FileSock]struct{}
}{}

func init() {
	gdebug.zheadSockTab = make(map[*FileSock]struct{})
}

// _wcfs_Zhead serves .wcfs/zhead opens.
type _wcfs_Zhead struct {
	fsNode
}

func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
	// XXX check flags?
	sk := NewFileSock()
	sk.CloseRead()

	groot.head.zheadMu.Lock()
	defer groot.head.zheadMu.Unlock()

	// XXX del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
	gdebug.zheadSockTab[sk] = struct{}{}
	return sk.File(), fuse.OK
}

// TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?)

func main() {
	stdlog.SetPrefix("wcfs: ")
	//log.CopyStandardLogTo("WARNING") // XXX -> "DEBUG" if -d ?
	defer log.Flush()

	err := _main()
	if err != nil {
		log.Fatal(err)
	}
}

func _main() (err error) {
	debug := flag.Bool("d", false, "debug")
	autoexit := flag.Bool("autoexit", false, "automatically stop service when there is no client activity")
	// XXX option to prevent starting if wcfs was already started/mounted on mntpt ?
	// XXX do the check unconditionally?

	flag.Parse()
	if len(flag.Args()) != 2 {
		fmt.Errorf("Usage: %s [OPTIONS] zurl mntpt", os.Args[0])
	}
	zurl := flag.Args()[0]
	mntpt := flag.Args()[1]

	xclose := func(c io.Closer) {
		err = xerr.First(err, c.Close())
	}

	// debug -> precise t, no dates	(XXX -> always precise t?)
	if *debug {
		stdlog.SetFlags(stdlog.Lmicroseconds)
	}

	log.Infof("start %q %q", mntpt, zurl)
	gover := "(built with " + runtime.Version()
	if raceBuild {
		gover += " -race"
	}
	gover += ")"
	log.Info(gover)

	// open zodb storage/watch/db/connection
	ctx := context.Background()	// XXX + timeout?
	zstor, err := zodb.Open(ctx, zurl, &zodb.OpenOptions{
		ReadOnly: true,
	})
	if err != nil {
		return err
	}
	defer xclose(zstor)

	zwatchq := make(chan zodb.Event)
	at0 := zstor.AddWatch(zwatchq)
	defer zstor.DelWatch(zwatchq)

	zdb := zodb.NewDB(zstor)
	defer xclose(zdb)
	zhead, err := zopen(ctx, zdb, &zodb.ConnOptions{
		At: at0,

		// we need zhead.cache to be maintained across several transactions.
		// see "3) for head/bigfile/* the following invariant is maintained ..."
		NoPool: true,
	})
	if err != nil {
		return err
	}
	zhead.Cache().Lock()
	zhead.Cache().SetControl(&zodbCacheControl{})
	zhead.Cache().Unlock()

	// mount root + head/
	head := &Head{
		fsNode:   newFSNode(fSticky),
		rev:      0,
		zconn:    zhead,
		wlinkTab: make(map[*WatchLink]struct{}),
		hwait:    make(map[hwaiter]struct{}),
	}

	wnode := &WatchNode{
		fsNode: newFSNode(fSticky),
		head:   head,
	}

	bfdir := &BigFileDir{
		fsNode:   newFSNode(fSticky),
		head:     head,
		fileTab:  make(map[zodb.Oid]*BigFile),
		δFtail:   NewΔFtail(zhead.At()),
	}
	head.bfdir = bfdir

	root := &Root{
		fsNode: newFSNode(fSticky),
		zstor:  zstor,
		zdb:    zdb,
		head:   head,
		revTab: make(map[zodb.Tid]*Head),
	}

	opts := &fuse.MountOptions{
		FsName: zurl,
		Name:   "wcfs",

		// We retrieve kernel cache in ZBlk.blksize chunks, which are 2MB in size.
		// XXX currently go-fuse caps MaxWrite to 128KB.
		// TODO -> teach go-fuse to handle Init.MaxPages (Linux 4.20+).
		MaxWrite:      2*1024*1024,

		// XXX tune MaxReadAhead? MaxBackground?

		// OS cache that we populate with bigfile data is precious;
		// we explicitly propagate ZODB invalidations into file invalidations.
		ExplicitDataCacheControl: true,

		DisableXAttrs: true,        // we don't use
		Debug:         *debug,
	}

	fssrv, fsconn, err := mount(mntpt, root, opts)
	if err != nil {
		return err
	}
	groot   = root		// FIXME temp workaround (see ^^^)
	gfsconn = fsconn	// FIXME ----//----
	gmntpt  = mntpt

	// we require proper pagecache control (added to Linux 2.6.36 in 2010)
	kinit := fssrv.KernelSettings()
	kfuse := fmt.Sprintf("kernel FUSE (API %d.%d)", kinit.Major, kinit.Minor)
	supports := kinit.SupportsNotify
	if !(supports(fuse.NOTIFY_STORE_CACHE) && supports(fuse.NOTIFY_RETRIEVE_CACHE)) {
		return fmt.Errorf("%s does not support pagecache control", kfuse)
	}
	// make a bold warning if kernel does not support explicit cache invalidation
	// (patch sent upstream; see notes.txt -> "Notes on OS pagecache control")
	if kinit.Flags & fuse.CAP_EXPLICIT_INVAL_DATA == 0 {
		w1 := fmt.Sprintf("%s does not support explicit data cache invalidation", kfuse)
		w2 := "-> performance will be AWFUL."
		w3 := "-> you need kernel which includes git.kernel.org/linus/ad2ba64dd489 ."
		log.Error(w1); log.Error(w2); log.Error(w3)
		fmt.Fprintf(os.Stderr, "W: wcfs: %s\nW: wcfs: %s\nW: wcfs: %s\n", w1, w2, w3)
	}

	// add entries to /
	mkdir(root, "head", head)
	mkdir(head, "bigfile", bfdir)
	mkfile(head, "at", NewSmallFile(head.readAt))   // TODO mtime(at) = tidtime(at)
	mkfile(head, "watch", wnode)

	// for debugging/testing
	_wcfs := newFSNode(fSticky)
	mkdir(root, ".wcfs", &_wcfs)
	mkfile(&_wcfs, "zurl", NewStaticFile([]byte(zurl)))

	// .wcfs/zhead - special file channel that sends zhead.at.
	//
	// If a user opens it, it will start to get tids of through which
	// zhead.at was, starting from the time when .wcfs/zhead was opened.
	// There can be multiple openers. Once opened, the file must be read,
	// as wcfs blocks waiting for data to be read when processing
	// invalidations.
	mkfile(&_wcfs, "zhead", &_wcfs_Zhead{
		fsNode: newFSNode(fSticky),
	})

	// TODO handle autoexit
	// (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl
	//  opened, so when all inodes has been forgotten - we know all wcfs.py clients exited)
	_ = autoexit

	defer xerr.Contextf(&err, "serve %s %s", mntpt, zurl)

	// spawn filesystem server.
	//
	// use `go serve` + `waitMount` not just `serve` - because waitMount
	// cares to disable OS calling poll on us.
	// ( if we don't disable polling - fs serving can get stuck - see
	//   https://github.com/hanwen/go-fuse/commit/4f10e248eb for details )
	serveCtx, serveCancel := context.WithCancel(context.Background())
	go func () {
		defer serveCancel()
		fssrv.Serve()
	}()
	err = fssrv.WaitMount()
	if err != nil {
		return err
	}

	// filesystem server is serving requests.
	// run zwatcher and wait for it to complete.
	// zwatcher completes either normally - due to filesystem unmount, or fails.
	// if zwatcher fails - switch filesystem to return EIO instead of stale data.
	err = root.zwatcher(serveCtx, zwatchq)
	if errors.Cause(err) != context.Canceled {
		log.Error(err)
		log.Errorf("zwatcher failed -> switching filesystem to EIO mode")
		// XXX switch fs to EIO mode
	}

	// wait for unmount
	<-serveCtx.Done()
	return nil	// XXX serveErr | zwatchErr ?
}