# -*- coding: utf-8 -*-
# Copyright (C) 2018-2019  Nexedi SA and Contributors.
#                          Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""wcfs_test tests wcfs filesystem from outside as python client process"""

from __future__ import print_function

from wendelin.lib.testing import getTestDB
from wendelin.lib.zodb import dbclose
from wendelin.lib.mem import memcpy
from wendelin.bigfile.file_zodb import ZBigFile
from wendelin.bigfile.tests.test_filezodb import blksize
from wendelin import wcfs

import transaction
from persistent import Persistent
from persistent.timestamp import TimeStamp
from ZODB.utils import z64, u64, p64

import sys, os, os.path, subprocess, threading, inspect, traceback, re
from errno import EINVAL
from golang import go, chan, select, func, defer
from golang import context, sync, time
from golang.gcompat import qq
from zodbtools.util import ashex as h, fromhex
from pytest import raises
from six import reraise
from .internal import mm
from .internal.wcfs_test import read0_nogil

# setup:
# - create test database, compute zurl and mountpoint for wcfs
# - at every test: make sure wcfs is not running before & after the test.

testdb = None
testzurl = None     # URL of testdb
testmntpt = None    # wcfs is mounted here
def setup_module():
    global testdb, testzurl, testmntpt
    testdb = getTestDB()
    testdb.setup()

    zstor = testdb.getZODBStorage()
    testzurl = wcfs.zstor_2zurl(zstor)
    zstor.close()
    testmntpt = wcfs._mntpt_4zurl(testzurl)
    os.rmdir(testmntpt)

def teardown_module():
    testdb.teardown()

# make sure we start every test without wcfs server running.
def setup_function(f):
    assert not os.path.exists(testmntpt)

# make sure we unmount wcfs after every test.
def teardown_function(f):
    mounted = not subprocess.call(["mountpoint", "-q", testmntpt])
    if mounted:
        subprocess.check_call(["fusermount", "-u", testmntpt])
    if os.path.exists(testmntpt):
        os.rmdir(testmntpt)


# ---- test join/autostart ----

# test that zurl does not change from one open to another storage open.
def test_zurlstable():
    for i in range(10):
        zstor = testdb.getZODBStorage()
        zurl  = wcfs.zstor_2zurl(zstor)
        zstor.close()
        assert zurl == testzurl

# test that join works.
@func
def test_join():
    zurl = testzurl
    with raises(RuntimeError, match="wcfs: join .*: server not started"):
        wcfs.join(zurl, autostart=False)

    wc = wcfs._start(zurl)
    xdefer(wc.close)
    assert wc.mountpoint == testmntpt
    assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
    assert os.path.isdir(wc.mountpoint + "/head")
    assert os.path.isdir(wc.mountpoint + "/head/bigfile")

    wc2 = wcfs.join(zurl, autostart=False)
    xdefer(wc2.close)
    assert wc2.mountpoint == wc.mountpoint

# test that join(autostart=y) works.
@func
def test_join_autostart():
    zurl = testzurl
    with raises(RuntimeError, match="wcfs: join .*: server not started"):
        wcfs.join(zurl, autostart=False)

    wc = wcfs.join(zurl, autostart=True)
    xdefer(wc.close)
    assert wc.mountpoint == testmntpt
    assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
    assert os.path.isdir(wc.mountpoint + "/head")
    assert os.path.isdir(wc.mountpoint + "/head/bigfile")


# --- test access to data ----

# XXX place=?
# many tests need to be run with some reasonable timeout to detect lack of wcfs
# response. with_timout and timeout provides syntatic shortcuts to do so.
def with_timeout(parent=context.background()):  # -> ctx, cancel
    return context.with_timeout(parent, 3*time.second)

def timeout(parent=context.background()):   # -> ctx
    ctx, _ = with_timeout()
    return ctx

# XXX
def tdelay():
    time.sleep(1*time.millisecond)     # XXX -> 10ms ?

# DF represents a change in files space.
# it corresponds to ΔF in wcfs.go .
class DF:
    # .rev      tid
    # .byfile   {} ZBigFile -> DFile
    def __init__(dF):
        # rev set from outside
        dF.byfile = {}

# DFile represents a change to one file.
# it is is similar to ΔFile in wcfs.go .
class DFile:
    # .rev      tid
    # .ddata    {} blk -> data
    def __init__(dfile):
        # rev set from outside
        dfile.ddata = {}

# tDB provides database/wcfs testing environment.
#
# BigFiles opened under tDB are represented as tFile - see .open for details.
# Watches opened under tDB are represented as tWatchLink - see .openwatch for details.
#
# XXX .open         -> .topen
# XXX .openwatch    -> .topenwatch ?
#
# XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tDB:
    def __init__(t):
        t.root  = testdb.dbopen()
        t.wc    = wcfs.join(testzurl, autostart=True)

        # ZBigFile(s) scheduled for commit
        t._changed = {} # ZBigFile -> {} blk -> data

        # committed: (tail, head] + δF history
        t.tail   = t.root._p_jar.db().storage.lastTransaction()
        t.head   = None
        t._headv = [] # XXX -> just use dFtail[·].rev ?
        t.dFtail = [] # of DF

        # fh(.wcfs/zhead) + history of zhead read from there
        t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
        t._wc_zheadv  = []

        # tracked opened tFiles & tWatches
        t._files    = set()
        t._wlinks   = set()

    # close closes test database as well as all tracked files, watch links and wcfs.
    def close(t):
        for tf in t._files.copy():
            tf.close()
        for tw in t._wlinks.copy():
            tw.close()
        assert len(t._files)    == 0
        assert len(t._wlinks)  == 0
        t._wc_zheadfh.close()
        t.wc.close()
        dbclose(t.root)

    # open opens wcfs file corresponding to zf@at and starts to track it.
    # see returned tFile for details.
    def open(t, zf, at=None):   # -> tFile
        return tFile(t, zf, at=at)

    # openwatch opens /head/watch on wcfs.
    # see returned tWatchLink for details.
    def openwatch(t):   # -> tWatchLink
        return tWatchLink(t)

    # change schedules zf to be changed according changeDelta at commit.
    #
    # changeDelta is {} blk -> data.
    # data can be both bytes and unicode.
    def change(t, zf, changeDelta):
        assert isinstance(zf, ZBigFile)
        zfDelta = t._changed.setdefault(zf, {})
        for blk, data in changeDelta.iteritems():
            if not isinstance(data, bytes):
                data = data.encode('utf-8')
            assert len(data) <= zf.blksize
            zfDelta[blk] = data

    # commit commits transaction and makes sure wcfs is synchronized to it.
    #
    # It remembers/returns committed transaction ID.
    def commit(t):
        # perform modifications scheduled by change.
        # use !wcfs mode so that we prepare data independently of wcfs code paths.
        dF = DF()
        for zf, zfDelta in t._changed.items():
            dfile = DFile()
            zfh = zf.fileh_open(_use_wcfs=False)
            for blk, data in zfDelta.iteritems():
                dfile.ddata[blk] = data
                data += b'\0'*(zf.blksize - len(data))  # trailing \0
                vma = zfh.mmap(blk, 1)
                memcpy(vma, data)
            dF.byfile[zf] = dfile

        # NOTE there is no clean way to retrieve tid of just committed transaction
        #      we use last._p_serial as workaround.
        t.root['_last'] = last = Persistent()
        last._p_changed = 1

        transaction.commit()
        head = last._p_serial
        t.head = head
        t._headv.append(head)

        print('\nM: commit -> %s' % t.hat(head))
        for zf, zfDelta in t._changed.items():
            print('M:      f<%s>\t%s' % (h(zf._p_oid), sorted(zfDelta.keys())))
        t._changed = {}

        dF.rev = head
        for dfile in dF.byfile.values():
            dfile.rev = head
        t.dFtail.append(dF)

        # synchronize wcfs to db
        t._wcsync()

        return head

    # _wcsync makes sure wcfs is synchronized to latest committed transaction.
    def _wcsync(t):
        while len(t._wc_zheadv) < len(t._headv):
            l = t._wc_zheadfh.readline()
            #print('> zhead read: %r' % l)
            l = l.rstrip('\n')
            wchead = fromhex(l)
            i = len(t._wc_zheadv)
            if wchead != t._headv[i]:
                raise RuntimeError("wcsync #%d: wczhead (%s) != zhead (%s)" % (i, h(wchead), h(t._headv[i])))
            t._wc_zheadv.append(wchead)

        # head/at = last txn of whole db
        assert t.read("head/at") == h(t.head)


    # path returns path for object on wcfs.
    # - str:        wcfs root + obj;
    # - Persistent: wcfs root + (head|@<at>)/bigfile/obj
    def path(t, obj, at=None):
        if isinstance(obj, Persistent):
            head = "head/" if at is None else ("@%s/" % h(at))
            obj  = "%s/bigfile/%s" % (head, h(obj._p_oid))
            at   = None

        assert isinstance(obj, str)
        assert at is None  # must not be used with str

        return os.path.join(t.wc.mountpoint, obj)

    # read reads file corresponding to obj on wcfs.
    def read(t, obj, at=None):
        path = t.path(obj, at=at)
        return readfile(path)

    # stat stats file corresponding to obj on wcfs.
    def stat(t, obj, at=None):
        path = t.path(obj, at=at)
        return os.stat(path)

    # _open opens file corresponding to obj on wcfs.
    def _open(t, obj, mode='rb', at=None):
        path = t.path(obj, at=at)
        return open(path, mode, 0)  # unbuffered


    # iter_revv iterates through all possible at_i -> at_j -> at_k ... sequences.
    # at_i < at_j
    # NOTE all sequences go till head.
    def iter_revv(t, start=z64, level=0):
        dFtail = [_ for _ in t.dFtail if _.rev > start]
        #print(' '*level, 'iter_revv', t.hat(start), [t.hat(_.rev) for _ in dFtail])
        if len(dFtail) == 0:
            yield []
            return

        for dF in dFtail:
            #print(' '*level, 'QQQ', t.hat(dF.rev))
            for tail in t.iter_revv(start=dF.rev, level=level+1):
                #print(' '*level, 'zzz', tail)
                yield ([dF.rev] + tail)

    # _blkData returns expected zf[blk] data and revision as of @at database state.
    #
    # If the block is hole (b'', at0) is returned.  XXX -> @z64?
    # XXX ret for when the file did not existed at all? blk was after file size?
    def _blkData(t, zf, blk, at): # -> (data, rev)
        if at is None:
            at = t.head

        # XXX dup wrt _pinAt

        # all changes to zf
        vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]

        # changes to zf[blk] <= at
        blkhistoryat = [_ for _ in vdf if blk in _.ddata and _.rev <= at]
        if len(blkhistoryat) == 0:
            # blk did not existed @at       # XXX verify whether file was existing at all
            data = b''
            rev  = t._headv[0]  # was hole - at0    XXX -> pin to z64
        else:
            _ = blkhistoryat[-1]
            data = _.ddata[blk]
            rev  = _.rev
            assert rev <= at

        return data, rev

    # _blkRev returns expected zf[blk] revision as of @at database state.
    def _blkRev(t, zf, blk, at): # -> rev
        _, rev = t._blkData(zf, blk, at)
        return rev


# tFile provides testing environment for one bigfile on wcfs.
#
# .blk() provides access to data of a block. .cached() gives state of which
# blocks are in OS pagecache. .assertCache and .assertBlk/.assertData assert
# on state of cache and data.
class tFile:
    # maximum number of pages we mmap for 1 file.
    # this should be not big not to exceed mlock limit.
    _max_tracked_pages = 8

    def __init__(t, tdb, zf, at=None):
        assert isinstance(zf, ZBigFile)
        t.tdb = tdb
        t.zf  = zf
        t.at  = at
        t.f   = tdb._open(zf, at=at)
        t.blksize = zf.blksize

        # mmap the file past the end up to _max_tracked_pages and setup
        # invariants on which we rely to verify OS cache state:
        #
        # 1. lock pages with MLOCK_ONFAULT: this way after a page is read by
        #    mmap access we have the guarantee from kernel that the page will
        #    stay in pagecache.
        #
        # 2. madvise memory with MADV_SEQUENTIAL and MADV_RANDOM in interleaved
        #    mode. This adjusts kernel readahead (which triggers for MADV_SEQUENTIAL
        #    vma) to not go over to next block and thus a read access to one
        #    block won't trigger implicit read access to its neighbour block.
        #
        #      https://www.quora.com/What-heuristics-does-the-adaptive-readahead-implementation-in-the-Linux-kernel-use
        #      https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/mm/madvise.c?h=v5.2-rc4#n51
        #
        #    we don't use MADV_NORMAL instead of MADV_SEQUENTIAL, because for
        #    MADV_NORMAL, there is not only read-ahead, but also read-around,
        #    which might result in accessing previous block.
        #
        #    we don't disable readahead universally, since enabled readahead
        #    helps to test how wcfs handles simultaneous read triggered by
        #    async kernel readahead vs wcfs uploading data for the same block
        #    into OS cache. Also, fully enabled readahead is how wcfs is
        #    actually used.
        assert t.blksize % mm.PAGE_SIZE == 0
        t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked_pages*t.blksize)

        mm.lock(t.fmmap, mm.MLOCK_ONFAULT)

        for blk in range(t._max_tracked_pages):
            blkmmap = t.fmmap[blk*t.blksize:(blk+1)*t.blksize]
            # NOTE the kernel does not start readahead from access to
            # MADV_RANDOM vma, but for a MADV_{NORMAL/SEQUENTIAL} vma it starts
            # readhead which can go _beyond_ vma that was used to decide RA
            # start. For this reason - to prevent RA started at one block to
            # overlap with the next block, we put MADV_RANDOM vma at the end of
            # every block covering last 1/8 of it.
            # XXX implicit assumption that RA window is < 1/8·blksize
            #
            # NOTE with a block completely covered by MADV_RANDOM the kernel
            # issues 4K sized reads; wcfs starts uploading into cache almost
            # immediately, but the kernel still issues many reads to read the
            # full 2MB of the block. This works slow.
            # XXX -> investigate and maybe make read(while-uploading) wait for
            # uploading to complete and only then return? (maybe it will help
            # performance even in normal case)
            _ = len(blkmmap)*7//8
            mm.advise(blkmmap[:_], mm.MADV_SEQUENTIAL)
            mm.advise(blkmmap[_:], mm.MADV_RANDOM)

        tdb._files.add(t)

    def close(t):
        t.tdb._files.remove(t)
        mm.unmap(t.fmmap)
        t.f.close()

    # blk returns memoryview of file[blk].
    def blk(t, blk):
        assert blk <= t._max_tracked_pages
        return memoryview(t.fmmap[blk*t.blksize:(blk+1)*t.blksize])

    # cached returns [] with indicating whether a file block is cached or not.
    # 1 - cached, 0 - not cached, fractional (0,1) - some pages of the block are cached some not.
    def cached(t):
        l = t._sizeinblk()
        incorev = mm.incore(t.fmmap[:l*t.blksize])
        # incorev is in pages; convert to in blocks
        assert t.blksize % mm.PAGE_SIZE == 0
        blkpages = t.blksize // mm.PAGE_SIZE
        cachev = [0.]*l
        for i, v in enumerate(incorev):
            blk = i // blkpages
            cachev[blk] += bool(v)
        for blk in range(l):
            cachev[blk] /= blkpages
            if cachev[blk] == int(cachev[blk]):
                cachev[blk] = int(cachev[blk])  # 0.0 -> 0, 1.0 -> 1
        return cachev

    # _sizeinblk returns file size in blocks.
    def _sizeinblk(t):
        st = os.fstat(t.f.fileno())
        assert st.st_size % t.blksize == 0
        assert st.st_size // t.blksize <= t._max_tracked_pages
        return st.st_size // t.blksize

    # assertCache asserts state of OS cache for file.
    #
    # incorev is [] of 1/0 representing whether block data is present or not.
    def assertCache(t, incorev):
        assert t.cached() == incorev

    # assertBlk asserts that file[blk] has data as expected.
    #
    # Expected data may be given with size < t.blksize. In such case the data
    # is implicitly appended with trailing zeros. Data can be both bytes and unicode.
    #
    # It also checks that file watches are properly notified on data access -
    # - see "7.2) for all registered client@at watchers ..."
    #
    # pinokByWLink: {} tWatchLink -> {} blk -> at.
    # pinokByWLink can be None - in that case it is computed automatically.
    @func
    def assertBlk(t, blk, dataok, pinokByWLink=None):
        #print('assertBlk #%d' % blk)
        # XXX -> assertCtx('blk #%d' % blk)
        def _():
            assertCtx = 'blk #%d' % blk
            _, e, _ = sys.exc_info()
            if isinstance(e, AssertionError):
                assert len(e.args) == 1 # pytest puts everything as args[0]
                e.args = (assertCtx + "\n" + e.args[0],)
        defer(_)

        if not isinstance(dataok, bytes):
            dataok = dataok.encode('utf-8')
        assert len(dataok) <= t.blksize
        blkdata, blkrev = t.tdb._blkData(t.zf, blk, t.at)
        assert blkdata == dataok, "computed vs explicit data"
        dataok += b'\0'*(t.blksize - len(dataok))   # tailing zeros
        assert blk < t._sizeinblk()

        # access to this block must not trigger access to other blocks
        incore_before = t.cached()
        def _():
            incore_after = t.cached()
            incore_before[blk] = 'x'
            incore_after [blk] = 'x'
            assert incore_before == incore_after
        defer(_)

        cached = t.cached()[blk]
        assert cached in (0, 1) # every check access a block in full
        shouldPin = False       # whether at least one wlink should receive a pin

        # watches that must be notified if access goes to @head/file
        wpin = {}   # tWatchLink -> pinok
        for wlink in t.tdb._wlinks:
            pinok = {}
            if t.at is None: # @head/...
                w = wlink._watching.get(t.zf)
                if w is not None and w.at < blkrev:
                    if cached == 1:
                        # XXX assert blk already pinned on that watch
                        pass
                    else:
                        # XXX and watch not already pinned on the watch
                        pinok = {blk: t.tdb._blkRev(t.zf, blk, w.at)}
                        shouldPin = True
            wpin[wlink] = pinok

        if pinokByWLink is not None:
            assert wpin == pinokByWLink, "computed vs explicit pinokByWLink"
        pinokByWLink = wpin

        # doCheckingPin expects every wlink entry to also contain zf
        for wlink, pinok in pinokByWLink.items():
            pinokByWLink[wlink] = (t.zf, pinok)

        blkview = t.blk(blk)
        assert t.cached()[blk] == cached

        def _(ctx, ev):
            assert t.cached()[blk] == cached
            ev.append('read pre')

            # access data with released GIL so that the thread that reads data from
            # head/watch can receive pin message. Be careful to handle cancelation,
            # so that on error in another worker we don't stuck and the error can
            # be propagated to wait and reported.
            have_read = chan(1)
            def _():
                b = read0_nogil(blkview)
                have_read.send(b)
            go(_)
            _, _rx = select(
                ctx.done().recv,    # 0
                have_read.recv,     # 1
            )
            if _ == 0:
                raise ctx.err()
            b = _rx

            ev.append('read ' + chr(b))
        ev = doCheckingPin(_, pinokByWLink)

        # XXX hack - wlinks are notified and emit events simultaneously - we
        # check only that events begin and end with read pre/post and that pins
        # are inside. Better do explicit check in tracetest style.
        assert ev[0]  == 'read pre', ev
        assert ev[-1] == 'read ' + dataok[0], ev
        ev = ev[1:-1]
        if not shouldPin:
            assert ev == []
        else:
            assert 'pin rx' in ev
            assert 'pin ack pre' in ev

        assert t.cached()[blk] > 0

        # XXX assert individually for every block's page? (easier debugging?)
        assert blkview == dataok

        # we just accessed the block - it has to be in OS cache
        assert t.cached()[blk] == 1


    # assertData asserts that file has data blocks as specified.
    #
    # Expected blocks may be given with size < zf.blksize. In such case they
    # are implicitly appended with trailing zeros.
    #
    # It also checks file size and optionally mtime.
    def assertData(t, dataokv, mtime=None):
        st = os.fstat(t.f.fileno())
        assert st.st_size == len(dataokv)*t.blksize
        if mtime is not None:
            assert st.st_mtime == tidtime(mtime)

        for blk, dataok in enumerate(dataokv):
            t.assertBlk(blk, dataok)

        # all blocks must be in cache after we touched them all
        t.assertCache([1]*len(dataokv))


# tWatch represents watch for one file setup on a tWatchLink.
class tWatch:
    def __init__(w):
        w.at     = z64  # not None - always concrete
        w.pinned = {}   # blk -> rev

# tWatchLink provides testing environment for /head/watch link opened on wcfs.
#
# .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
# .watch() setups a watch for a file and verifies ... XXX
class tWatchLink:

    def __init__(t, tdb):
        t.tdb = tdb

        # head/watch handle.
        #
        # python/stdio lock file object on read/write, however we need both
        # read and write to be working simultaneously.
        # -> use 2 separate file objects for rx and tx.
        #
        # fdopen takes ownership of file descriptor and closes it when file
        # object is closed -> dup fd so that each file object has its own fd.
        wh  = os.open(tdb.path("head/watch"), os.O_RDWR)
        wh2 = os.dup(wh)
        t._wrx = os.fdopen(wh, 'rb')
        t._wtx = os.fdopen(wh2, 'wb')

        t.rx_eof = chan()   # .rx_eof becomes ready when wcfs closes its tx side
        t.fatalv = []       # fatal messages received from wcfs

        # inv.protocol message IO
        t._acceptq  = chan() # (stream, msg)    server originated messages go here
        t._rxmu     = threading.Lock()
        t._rxtab    = {}     # stream -> rxq    server replies go via here
        t._accepted = set()  # of stream        streams we accepted but did not replied yet

        t._txmu     = threading.Lock()  # serializes writes

        serveCtx, t._serveCancel = context.with_cancel(context.background())
        t._serveWG = sync.WorkGroup(serveCtx)
        t._serveWG.go(t._serveRX)

        # this tWatchLink currently watches the following files at particular state.
        t._watching = {}    # {} ZBigFile -> tWatch

        tdb._wlinks.add(t)

    def close(t):
        t.tdb._wlinks.remove(t)

        t._serveCancel()
        # ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
        # _serveRX on client (= on us). The connection can be already closed by
        # wcfs - so ignore errorswhen sending bye.
        try:
            t._send(1, b'bye')
        except IOError:
            pass
        # XXX we can get stuck here if wcfs does not behave as we want.
        # XXX in particular if there is a silly - e.g. syntax or type error in
        #     test code - we curently get stuck here.
        #
        # XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour?
        try:
            t._serveWG.wait()
        except Exception as e:
            # canceled is expected and ok
            if e != context.canceled:
                reraise(e, None, e.__traceback__)

        t._wtx.close()
        t._wrx.close()


    # ---- message IO ----

    # _serveRX receives messages from ._wrx and dispatches them according to streamID.
    @func
    def _serveRX(t, ctx):
        # when finishing - wakeup everyone waiting for rx
        def _():
            t._acceptq.close()
            with t._rxmu:
                rxtab = t._rxtab
                t._rxtab = None     # don't allow new rxtab registers
            for rxq in rxtab.values():
                rxq.close()
        defer(_)

        while 1:
            # NOTE: .close() makes sure ._wrx.read*() will wake up
            l = t._wrx.readline()
            print('C: watch  : rx: %r' % l)
            if len(l) == 0:     # peer closed its tx
                t.rx_eof.close()
                break

            # <stream> ... \n
            stream, msg = l.split(' ', 1)
            stream = int(stream)
            msg = msg.rstrip('\n')

            if stream == 0: # control/fatal message from wcfs
                # XXX print -> receive somewhere?
                print('C: watch  : rx fatal: %r' % msg)
                t.fatalv.append(msg)
                continue

            reply = bool(stream % 2)
            if reply:
                with t._rxmu:
                    assert stream in t._rxtab
                    rxq = t._rxtab.pop(stream)
                _, _rx = select(
                    ctx.done().recv,    # 0
                    (rxq.send, msg),    # 1
                )
                if _ == 0:
                    raise ctx.err()
            else:
                with t._rxmu:
                    assert stream not in t._accepted
                    t._accepted.add(stream)
                _, _rx = select(
                    ctx.done().recv,                    # 0
                    (t._acceptq.send, (stream, msg)),   # 1
                )
                if _ == 0:
                    raise ctx.err()


    # _send sends raw message via specified stream.
    #
    # multiple _send can be called in parallel - _send serializes writes.
    # XXX +ctx?
    def _send(t, stream, msg):
        assert '\n' not in msg
        pkt = b"%d %s\n" % (stream, msg)
        t._write(pkt)

    def _write(t, pkt):
        with t._txmu:
            #print('C: watch  : tx: %r' % pkt)
            t._wtx.write(pkt)
            t._wtx.flush()

    # sendReq sends client -> server request and returns server reply.
    #
    # only 1 sendReq must be used at a time.    # XXX relax?
    def sendReq(t, ctx, req):   # reply | None when EOF
        stream = 1

        rxq = chan()
        with t._rxmu:
            assert stream not in t._rxtab
            t._rxtab[stream] = rxq

        t._send(stream, req)

        _, _rx = select(
            ctx.done().recv,    # 0
            rxq.recv,           # 1
        )
        if _ == 0:
            raise ctx.err()
        return _rx

    # recvReq receives client <- server request.
    #
    # multiple recvReq could be used at a time.
    def recvReq(t, ctx): # -> tSrvReq | None when EOF
        _, _rx = select(
            ctx.done().recv,    # 0
            t._acceptq.recv,    # 1
        )
        if _ == 0:
            raise ctx.err()

        rx = _rx
        if rx is None:
            return rx

        stream, msg = rx
        return tSrvReq(t, stream, msg)


# tSrvReq represents 1 server-initiated wcfs request received over /head/watch link.
class tSrvReq:
    def __init__(req, twlink, stream, msg):
        req.twlink = twlink
        req.stream = stream
        req.msg    = msg

    def reply(req, answer):
        #print('C: reply %s <- %r ...' % (req, answer))
        t = req.twlink
        with t._rxmu:
            assert req.stream in t._accepted

        t._send(req.stream, answer)

        with t._rxmu:
            assert req.stream in t._accepted
            t._accepted.remove(req.stream)

        # XXX also track as answered? (and don't accept with the same ID ?)

    def _parse(req): # -> (foid, blk, at|None)
        # pin <foid> #<blk> @(<at>|head)
        m = re.match(b"pin (?P<foid>[0-9a-f]{16}) #(?P<blk>[0-9]+) @(P<at>[^ ]+)$", req.msg)
        if m is None:
            raise RuntimeError("message is not valid pin request: %s" % qq(req.msg))
        foid = fromhex(m.group('foid'))
        blk  = int(m.group('blk'))
        at   = m.group('at')
        if at == "head":
            at = None
        else:
            at = fromhex(at)

        return foid, blk, at

    @property
    def foid(req):    return req._parse()[0]
    @property
    def blk(req):     return req._parse()[1]
    @property
    def at(req):      return req._parse()[2]


# ---- watch setup/adjust ----

# _pinAt returns which blocks needs to be pinned for zf@at.
#
# it does not take into account whether blocks are in cache or not and computes
# pin from all changes.     XXX is it desired behaviour?
@func(tWatchLink)
def _pinAt(twlink, zf, at):  # -> pin = {} blk -> rev
    t = twlink.tdb

    # XXX dup in _blkData

    # all changes to zf
    vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]

    # {} blk -> at for changes ∈ (at, head]
    pin = {}
    for df in [_ for _ in vdf if _.rev > at]:
        for blk in df.ddata:
            if blk in pin:
                continue
            # history of blk changes <= at
            blkhistoryat = [_.rev for _ in vdf if blk in _.ddata and _.rev <= at]
            if len(blkhistoryat) == 0:
                pinrev = t._headv[0]    # was hole - at0  XXX -> pin to z64?
            else:
                pinrev = blkhistoryat[-1]
            assert pinrev <= at
            pin[blk] = pinrev

    return pin

# watch sets up or adjusts a watch for file@at.
#
# During setup it verifies that wcfs sends correct initial pins.
#
# pinok can be None - in that case it is computed automatically.
@func(tWatchLink)
def watch(twlink, zf, at, pinok=None):   # XXX -> ?
    t = twlink.tdb
    w = twlink._watching.get(zf)
    if w is None:
        w = twlink._watching[zf] = tWatch()
        at_prev = None
    else:
        at_prev = w.at  # we were previously watching zf @at_prev

    at_from = ''
    if at_prev is not None:
        at_from = '(%s ->) ' % t.hat(at_prev)
    print('\nC: setup watch f<%s> %s%s' % (h(zf._p_oid), at_from, t.hat(at)))

    # pinstr returns human-readable representation for {}blk->rev
    def pinstr(pin):
        pinv = []
        for blk in sorted(pin.keys()):
            if pin[blk] is None:
                s = 'head'
            else:
                s = t.hat(pin[blk])
            pinv.append('%d: %s' % (blk, s))
        return '{%s}' % ', '.join(pinv)

    pin_prev = {}
    if at_prev is not None:
        assert at_prev <= at, 'TODO %s -> %s' % (t.hat(at_prev), t.hat(at))
        pin_prev = twlink._pinAt(zf, at_prev)

    pin = twlink._pinAt(zf, at)

    if at_prev != at and at_prev is not None:
        print('# pin@old: %s\n# pin@new: %s' % (pinstr(pin_prev), pinstr(pin)))

    for blk in set(pin_prev.keys()).union(pin.keys()):
        # blk ∉ pin_prev,   blk ∉ pin       -> cannot happen
        assert (blk in pin_prev) or (blk in pin)

        # blk ∉ pin_prev,   blk ∈ pin       -> cannot happen
        if blk not in pin_prev and blk in pin:
            if at_prev is not None:
                assert False, '#%d pinned %s; not pinned %s' % (t.hat(at_prev), t.hat(at))

        # blk ∈ pin_prev,   blk ∉ pin       -> unpin to head
        elif blk in pin_prev and blk not in pin:
            pin[blk] = None     # @head

        # blk ∈ pin_prev,   blk ∈ pin       -> if rev different: use pin
        elif blk in pin_prev and blk in pin:
            assert pin_prev[blk] <= pin[blk]
            if pin_prev[blk] == pin[blk]:
                del pin[blk]    # would need to pin to what it is already pinned

    #print('-> %s' % pinstr(pin))

    # {} blk -> at that have to be pinned.
    # XXX also check that head/file[blk] is in cache - else no need to pin
    if pinok is not None:
        assert pin == pinok,    "computed vs explicit pinok"
    pinok = pin
    print('#  pinok: %s' % pinstr(pinok))

    # send watch request and check that we receive pins for in-cache blocks
    # changed > at. FIXME "in-cache" is currently not handled
    twlink._watch(zf, at, pinok, "ok")

    w.at = at
    # XXX update pinned

# _watch sends watch request for zf@at, expects initial pins specified by pinok and final reply.
#
# pinok: {} blk -> at that have to be pinned.
# if replyok ends with '…' only reply prefix until the dots is checked.
@func(tWatchLink)
def _watch(twlink, zf, at, pinok, replyok):
    def _(ctx, ev):
        reply = twlink.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at)))
        if replyok.endswith('…'):
            rok = replyok[:-len('…')]
            assert reply[:len(rok)] == rok
        else:
            assert reply == replyok

    doCheckingPin(_, {twlink: (zf, pinok)})


# doCheckingPin calls f and verifies that wcfs sends expected pins during the
# time f executes.
#
# f(ctx, eventv)
# pinokByWLink: {} tWatchLink -> (zf, {} blk -> at).
# pinfunc(wlink, foid, blk, at) | None.     XXX foid -> ZBigFile?
#
# pinfunc is called after pin request is received from wcfs, but before pin ack
# is replied back. Pinfunc must not block.
def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str)
    # call f and check that we receive pins as specified.
    # Use timeout to detect wcfs replying less pins than expected.
    #
    # XXX detect not sent pins via ack'ing previous pins as they come in (not
    # waiting for all of them) and then seeing that we did not received expeced
    # pin when f completes?
    ctx, cancel = with_timeout()
    wg = sync.WorkGroup(ctx)
    ev = []

    for wlink, (zf, pinok) in pinokByWLink.items():
        def _(ctx, wlink, zf, pinok):
            w = wlink._watching.get(zf)
            if len(pinok) > 0:
                assert w is not None

            pinv = wlink._expectPin(ctx, zf, pinok)
            if len(pinv) > 0:
                ev.append('pin rx')         # XXX + zf, pin details?

            # increase probability to receive erroneous extra pins
            tdelay()

            if len(pinv) > 0:
                if pinfunc is not None:
                    for p in pinv:
                        pinfunc(wlink, p.foid, p.blk, p.at)
                ev.append('pin ack pre')    # XXX +details?
                for p in pinv:
                    assert w.foid == p.foid
                    if p.at is None:    # repin to @head
                        assert p.blk in w.pinned    # must have been pinned before  XXX correct?
                        del w.pinned[p.blk]
                    else:
                        w.pinned[p.blk] = p.at

                    p.reply(b"ack")

            # check that we don't get extra pins before f completes
            try:
                req = wlink.recvReq(ctx)
            except Exception as e:
                if e is context.canceled:
                    return # cancel is expected after f completes
                reraise(e, None, e.__traceback__)

            assert False, "extra pin message received: %r" % req.msg
        wg.go(_, wlink, zf, pinok)

    def _(ctx):
        f(ctx, ev)
        # cancel _expectPin waiting upon completing f
        # -> error that missed pins were not received.
        cancel()
    wg.go(_)

    wg.wait()
    return ev


# _expectPin asserts that wcfs sends expected pin messages.
#
# expect is {} blk -> at
# returns [] of received pin requests.
@func(tWatchLink)
def _expectPin(twlink, ctx, zf, expect): # -> []tSrvReq
    expected = set()    # of expected pin messages
    for blk, at in expect.items():
        hat = h(at) if at is not None else 'head'
        msg = b"pin %s #%d @%s" % (h(zf._p_oid), blk, hat)
        assert msg not in expected
        expected.add(msg)

    reqv = []   # of received requests
    while len(expected) > 0:
        try:
            req = twlink.recvReq(ctx)
        except Exception as e:
            raise RuntimeError("%s\nnot all pin missages received - pending:\n%s" % (e, expected))
        assert req is not None  # channel not closed
        assert req.msg in expected
        expected.remove(req.msg)
        reqv.append(req)

    return reqv


# test_wcfs exercises wcfs functionality.
@func
def test_wcfs():
    t = tDB()
    xdefer(t.close)

    t.root['!file'] = nonfile = Persistent()
    t.root['zfile'] = zf = ZBigFile(blksize)

    at0  = t.commit()

    # >>> lookup non-BigFile -> must be rejected
    with raises(OSError) as exc:
        t.stat(nonfile)
    assert exc.value.errno == EINVAL

    # >>> file initially empty
    f = t.open(zf)
    f.assertCache([])
    f.assertData ([], mtime=at0)

    # >>> (@at1) commit data -> we can see it on wcfs
    t.change(zf, {2: '1cc'})
    at1 = t.commit()

    f.assertCache([0,0,0])  # initially not cached
    f.assertData (['','','1cc'], mtime=t.head)

    # >>> (@at2) commit again -> we can see both latest and snapshotted states
    t.change(zf, {2: '2c', 3: '2d'})
    at2 = t.commit()

    # f @head
    f.assertCache([1,1,0,0])
    f.assertData (['','', '2c', '2d'], mtime=t.head)

    # f @at1
    f1 = t.open(zf, at=at1)
    f1.assertCache([0,0,1])
    f1.assertData (['','','1cc'])  # XXX + mtime=at1?


    # >>> (@at3) commit again without changing zf size  XXX write about b cache inval
    f2 = t.open(zf, at=at2)
    t.change(zf, {2: '3c'})     # FIXME + 3b after δbtree works (hole -> zblk)
    at3 = t.commit()

    f.assertCache([1,1,0,1])    # FIXME b must be invalidated (see 3b ^^^)

    # f @head is opened again -> cache must not be lost
    f_ = t.open(zf)
    f_.assertCache([1,1,0,1])
    f_.close()
    f.assertCache([1,1,0,1])

    # f @head
    f.assertCache([1,1,0,1])
    f.assertData (['','','3c','2d'], mtime=t.head)

    # f @at2
    f2.assertCache([0,0,1,0])
    f2.assertData (['','','2c','2d'])   # XXX mtime=at2?

    # f @at1
    f1.assertCache([1,1,1])
    f1.assertData (['','','1cc'])       # XXX + mtime=at1?


    # >>> f close / open again -> cache must not be lost
    # XXX a bit flaky since OS can evict whole f cache under pressure
    f.assertCache([1,1,1,1])
    f.close()
    f = t.open(zf)
    assert f.cached() != [0,0,0,0]

    # >>> XXX commit data to not yet accessed f part - nothing happens

    """
    # >>> invalidation protocol
    print('\n\n inv. protocol \n\n')

    # invalid requests -> wcfs replies error
    wl = t.openwatch()
    assert wl.sendReq(context.background(), b'bla bla') ==  \
            b'error bad watch: not a watch request: "bla bla"'

    # invalid request not following frame structure -> fatal + wcfs must close watch link
    assert wl.fatalv == []
    wl._write(b'zzz hello\n')
    _, _rx = select(
        timeout().done().recv,
        wl.rx_eof.recv,
    )
    if _ == 0:
        raise RuntimeError("%s: did not rx EOF after bad frame " % wl)
    assert wl.fatalv == [b'error: invalid frame: "zzz hello\\n" (invalid stream)']
    wl.close()

    # watch with @at < δtail.tail -> rejected
    wl = t.openwatch()
    atpast = p64(u64(t.tail)-1)
    wl._watch(zf, atpast, {}, "error setup watch f<%s> @%s: too far away back from"
            " head/at (@%s); …" % (h(zf._p_oid), h(atpast), h(t.head)))
    wl.close()

    # some watch requests with explicit pinok (also partly verifies how
    # tWatchLink.watch computes automatic pinok)
    wl = t.openwatch()
    # XXX check @at0 ?
    wl.watch(zf, at1, {2: at1, 3: at0})     #     -> at1 (new watch)    XXX at0 -> ø (blk3 was hole)?
    wl.watch(zf, at2, {2: at2, 3: None})    # at1 -> at2
    wl.watch(zf, at3, {2: None})            # at2 -> at3 (current head)
    wl.close()

    # all valid watch requests going at_i -> at_j -> ... with automatic pinok
    for zf in t.zfiles():
        for revv in t.iter_revv():
            print('\n--------')
            print(' -> '.join([t.hat(_) for _ in revv]))
            wl = t.openwatch()
            wl.watch(zf, revv[0])
            wl.watch(zf, revv[0])    # verify at_i -> at_i
            for at in revv[1:]:
                wl.watch(zf, at)
            wl.close()
    """

    # XXX move before setup watch?
    print('\n\n\n\nWATCH+COMMIT\n\n')

    # watched + commit -> read -> receive pin messages; read is stuck until pins are acknowledged
    wl1 = t.openwatch()
    wl2 = t.openwatch()
    wl1.watch(zf, at3); assert at3 == t.head
    wl2.watch(zf, at2)
    f.assertCache([1,1,1,1])
    t.change(zf, {         2: '4c', 5: '4f'})   # FIXME + 4a after δbtree works
    at4 = t.commit()
    f.assertCache([1,1,0,1,0,0])                # FIXME a must be invalidated - see δbtree ^^^

    t.dump_history()

    f.assertBlk(0, '',   {wl1: {},          wl2: {}})  # XXX + {0, at3} after δbtree works
    f.assertBlk(1, '',   {wl1: {},          wl2: {}})
    f.assertBlk(2, '4c', {wl1: {2: at3},    wl2: {}})
    # blk4 is hole @head - the same as at earlier db view - not pinned
    # XXX or do not allow hole past .size ?
    f.assertBlk(4, '',   {wl1: {},          wl2: {}})
    f.assertBlk(5, '4f', {wl1: {5: at0},    wl2: {5: at0}}) # XXX at0 -> ø  XXX also triggers access to #4 ?

    # XXX commit again, but block is already pinned - not notified

    # XXX wlink close   -> watch no longer notified
    # XXX 2 opened watchs for 1 file at the same time
    # XXX watch with at="-" -> watch no longer notified

    wl1.close()
    wl2.close() # XXX temp


    # XXX commit after current file size -> watch


    # XXX no reply to pin - killed


    # XXX access to block not previously accessed but invalidated in ZODB





    # XXX going not only up, but also down at1 <- at2 <- at3    ?   -> forbid?

    # XXX watch for 2 files via single watch open


    # XXX watch with @at > head - must wait for head to become >= at

    # XXX drop file[blk] from cache, access again -> no pin message sent the second time

    # XXX mmap f; change f[blk] on pin message while under pagefault - should get changed page

    # XXX new watch request while previous watch request is in progress (over the same /head/watch handle)

    # XXX blk not initially covered by f.δtail (blk never accessed - f.δtail
    # not updated on invalidation). then blk is accessed - what happens with
    # watch that should be triggerring for this blk?

    # XXX similar to ^^^ but with two changes to blk not covered by f.δtail. To
    # which @rev blk is pinned on watch? (δtail is missing both and if it will
    # be another blk rev < rev1,rev2 - it will be incorrect)

    # XXX pin message when blk data only first appeared after > w.at - pin
    # needs to pin to zero (or at0 ?).

    # XXX watch @at when file did not existed -> error

    # XXX ZBlk copied from blk1 -> blk2 ; for the same file and for file1 -> file2
    # XXX ZBlk moved  from blk1 -> blk2 ; for the same file and for file1 -> file2

    # XXX read file[blk]=hole; then file[blk]=zblk - must be invalidated and
    # setupWatch must send pins.


# ---- misc ---

# readfile reads file @ path.
def readfile(path):
    with open(path) as f:
        return f.read()

# tidtime converts tid to transaction commit time.
def tidtime(tid):
    t = TimeStamp(tid).timeTime()

    # ZODB/py vs ZODB/go time resolution is not better than 1µs
    # see e.g. https://lab.nexedi.com/kirr/neo/commit/9112f21e
    #
    # NOTE pytest.approx supports only ==, not e.g. <, so we use plain round.
    return round(t, 6)

# verify that tidtime is precise enough to show difference in between transactions.
@func
def test_tidtime_notrough():
    t = tDB()
    xdefer(t.close)

    atprev = t.commit()
    for i in range(10):
        at = t.commit()
        assert tidtime(at) > tidtime(atprev)


# hat returns string for at.
# it gives both symbolic version and raw hex for at, for example:
#   @at2 (03cf7850500b5f66)
@func(tDB)
def hat(t, at):
    try:
        i = t._headv.index(at)
    except ValueError:
        return "@" + h(at)

    return "@at%d (%s)" % (i, h(at))


# zfiles returns ZBigFiles that were ever changed under t.
@func(tDB)
def zfiles(t):
    zfs = set()
    for dF in t.dFtail:
        for zf in dF.byfile:
            if zf not in zfs:
                zfs.add(zf)
    return zfs


# dump_history prints t's change history in tabular form.
#
# the output is useful while developing: to get overview of how file(s) are
# changed in tests.
@func(tDB)
def dump_history(t):
    print('>>> Change history by file:\n')
    for zf in t.zfiles():
        print('f<%s>:' % h(zf._p_oid))
        for dF in t.dFtail:
            df = dF.byfile.get(zf)
            emitv = []
            if df is not None:
                dblk = set(df.ddata.keys())
                for blk in range(max(dblk)+1):
                    if blk in dblk:
                        emitv.append('%d' % blk)
                    else:
                        emitv.append(' ')

            print('\t%s\t%s' % (t.hat(dF.rev), ' '.join(emitv)))
    print()


# xdefer is like defer, but makes sure exception raised before deferred
# function is called is not lost.
#
# if deferred function raises exception itself - it prints previous exception to stderr.
#
# XXX xdefer is workaround for Python2 not having exception chanining (PEP 3134)
# without which, if e.g. tDB.close() raises exception, it prevents to see
# whether and which an assert in the test failed.
#
# XXX merge into defer?
def xdefer(f):
    # hack - imitate as if defer called from under xdefer was called directly by caller func
    fgo = inspect.currentframe().f_back.f_back
    __goframe__ = fgo.f_locals['__goframe__']
    _xdefer(f)

def _xdefer(f):
    def _():
        # call f, but print previous exception if f raises
        exc_type, exc_value, exc_traceback = sys.exc_info()
        try:
            f()
        except:
            traceback.print_exception(exc_type, exc_value, exc_traceback)
            raise
    defer(_)