wcfs_test.py 28.7 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1
# -*- coding: utf-8 -*-
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
2 3
# Copyright (C) 2018-2019  Nexedi SA and Contributors.
#                          Kirill Smelkov <kirr@nexedi.com>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
20
"""wcfs_test tests wcfs filesystem from outside as python client process"""
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
21

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
22 23
from __future__ import print_function

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
24
from wendelin.lib.testing import getTestDB
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
25
from wendelin.lib.zodb import dbclose
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
26
from wendelin.lib.mem import memcpy
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
27 28
from wendelin.bigfile.file_zodb import ZBigFile
from wendelin.bigfile.tests.test_filezodb import blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
29
from wendelin import wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
30

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
31
import transaction
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
32 33
from persistent import Persistent
from persistent.timestamp import TimeStamp
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
34

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
35
import os, os.path, subprocess, threading
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
36
from errno import EINVAL
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
37
from golang import chan, func, defer, select
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
38
from golang import context, sync
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
39
from zodbtools.util import ashex as h, fromhex
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
40
from pytest import raises
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
41
from six import reraise
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
42
from .internal import mm
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
43

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
44 45 46 47
# setup:
# - create test database, compute zurl and mountpoint for wcfs
# - at every test: make sure wcfs is not running before & after the test.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
48
testdb = None
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
49 50
testzurl = None     # URL of testdb
testmntpt = None    # wcfs is mounted here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
51
def setup_module():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
52
    global testdb, testzurl, testmntpt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
53 54 55
    testdb = getTestDB()
    testdb.setup()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
56
    zstor = testdb.getZODBStorage()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
57
    testzurl = wcfs.zstor_2zurl(zstor)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
58 59 60 61
    zstor.close()
    testmntpt = wcfs._mntpt_4zurl(testzurl)
    os.rmdir(testmntpt)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
62 63 64
def teardown_module():
    testdb.teardown()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
65
# make sure we start every test without wcfs server running.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66 67 68
def setup_function(f):
    assert not os.path.exists(testmntpt)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
69
# make sure we unmount wcfs after every test.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
70 71 72 73 74 75 76 77
def teardown_function(f):
    mounted = not subprocess.call(["mountpoint", "-q", testmntpt])
    if mounted:
        subprocess.check_call(["fusermount", "-u", testmntpt])
    if os.path.exists(testmntpt):
        os.rmdir(testmntpt)


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
78
# ---- test join/autostart ----
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
79

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
80
# test that zurl does not change from one open to another storage open.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
81 82 83
def test_zurlstable():
    for i in range(10):
        zstor = testdb.getZODBStorage()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
84
        zurl  = wcfs.zstor_2zurl(zstor)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
85 86 87
        zstor.close()
        assert zurl == testzurl

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
88
# test that join works.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
89
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
90
def test_join():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
91
    zurl = testzurl
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
92 93 94 95
    with raises(RuntimeError, match="wcfs: join .*: server not started"):
        wcfs.join(zurl, autostart=False)

    wc = wcfs._start(zurl)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
96
    defer(wc.close)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
97
    assert wc.mountpoint == testmntpt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
98
    assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
99 100
    assert os.path.isdir(wc.mountpoint + "/head")
    assert os.path.isdir(wc.mountpoint + "/head/bigfile")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
101

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
102
    wc2 = wcfs.join(zurl, autostart=False)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
103
    defer(wc2.close)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
104 105
    assert wc2.mountpoint == wc.mountpoint

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
106
# test that join(autostart=y) works.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
107
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
108
def test_join_autostart():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
109 110 111 112 113
    zurl = testzurl
    with raises(RuntimeError, match="wcfs: join .*: server not started"):
        wcfs.join(zurl, autostart=False)

    wc = wcfs.join(zurl, autostart=True)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
114
    defer(wc.close)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
115
    assert wc.mountpoint == testmntpt
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
116
    assert readfile(wc.mountpoint + "/.wcfs/zurl") == zurl
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
117 118
    assert os.path.isdir(wc.mountpoint + "/head")
    assert os.path.isdir(wc.mountpoint + "/head/bigfile")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
119

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
120

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
121 122
# --- test access to data ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
123 124
# DF represents a change in files space.
# it corresponds to ΔF in wcfs.go .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
125 126 127 128 129 130 131
class DF:
    # .rev      tid
    # .byfile   {} ZBigFile -> DFile
    def __init__(dF):
        # rev set from outside
        dF.byfile = {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
132 133
# DFile represents a change to one file.
# it is is similar to ΔFile in wcfs.go .
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
134 135 136 137 138 139 140
class DFile:
    # .rev      tid
    # .ddata    {} blk -> data      XXX name
    def __init__(dfile):
        # rev set from outside
        dfile.ddata = {}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
141 142
# tDB provides database/wcfs testing environment.
#
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
143 144 145 146 147
# BigFiles opened under tDB are represented as tFile - see .open for details.
# Watches opened under tDB are represented as tWatch - see .openwatch for details.
#
# XXX .open         -> .topen
# XXX .openwatch    -> .topenwatch ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
148 149
#
# XXX print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
150 151 152 153 154
class tDB:
    def __init__(t):
        t.root  = testdb.dbopen()
        t.wc    = wcfs.join(testzurl, autostart=True)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
155 156 157
        # ZBigFile(s) scheduled for commit
        t._changed = {} # ZBigFile -> {} blk -> data

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
158
        # committed: head + δF history
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
159
        t.head   = None
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
160 161
        t._headv = [] # XXX -> just use dFtail[·].rev ?
        t.dFtail = [] # of DF
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
162 163 164 165 166

        # fh(.wcfs/zhead) + history of zhead read from there
        t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
        t._wc_zheadv  = []

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
167
        # tracked tFiles & tWatches
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
168
        t._tracked = set()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
169

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
170
    # close closes test database as well as all tracked files, watches and wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
171
    def close(t):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
172
        for tf in t._tracked.copy():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
173
            tf.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
174
        assert len(t._tracked) == 0
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
175 176 177
        t._wc_zheadfh.close()
        t.wc.close()
        dbclose(t.root)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
178

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
179 180 181 182 183 184 185 186 187 188
    # open opens wcfs file corresponding to zf@at and starts to track it.
    # see returned tFile for details.
    def open(t, zf, at=None):   # -> tFile
        return tFile(t, zf, at=at)

    # openwatch opens /head/watch on wcfs.
    # see returned tWatch for details.
    def openwatch(t):   # -> tWatch
        return tWatch(t)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
189 190 191 192 193 194 195 196 197
    # change schedules zf to be changed according changeDelta at commit.
    #
    # changeDelta is {} blk -> data.
    def change(t, zf, changeDelta):
        assert isinstance(zf, ZBigFile)
        zfDelta = t._changed.setdefault(zf, {})
        for blk, data in changeDelta.iteritems():
            assert len(data) <= zf.blksize
            zfDelta[blk] = data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
198

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
199 200 201
    # commit commits transaction and makes sure wcfs is synchronized to it.
    #
    # It remembers/returns committed transaction ID.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
202
    def commit(t):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
203 204
        # perform modifications scheduled by change.
        # use !wcfs mode so that we prepare data independently of wcfs code paths.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
205
        dF = DF()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
206
        for zf, zfDelta in t._changed.items():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
207
            dfile = DFile()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
208 209
            zfh = zf.fileh_open(_use_wcfs=False)
            for blk, data in zfDelta.iteritems():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
210
                dfile.ddata[blk] = data
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
211 212 213
                data += b'\0'*(zf.blksize - len(data))  # trailing \0
                vma = zfh.mmap(blk, 1)
                memcpy(vma, data)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
214
            dF.byfile[zf] = dfile
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
215

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
216
        # NOTE there is no clean way to retrieve tid of just committed transaction
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
217
        #      we use last._p_serial as workaround.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
218 219
        t.root['_last'] = last = Persistent()
        last._p_changed = 1
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
220

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
221 222 223 224
        transaction.commit()
        head = last._p_serial
        t.head = head
        t._headv.append(head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
225 226 227 228 229

        print('\nM: commit -> %s' % t.hat(head))
        for zf, zfDelta in t._changed.items():
            print('M:      f<%s>\t%s' % (h(zf._p_oid), sorted(zfDelta.keys())))
        t._changed = {}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
230

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
231 232 233 234 235
        dF.rev = head
        for dfile in dF.byfile.values():
            dfile.rev = head
        t.dFtail.append(dF)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
236
        # synchronize wcfs to db
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
237 238
        t._wcsync()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
239 240
        return head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
241 242
    # _wcsync makes sure wcfs is synchronized to latest committed transaction.
    def _wcsync(t):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
243 244
        while len(t._wc_zheadv) < len(t._headv):
            l = t._wc_zheadfh.readline()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
245
            #print('> zhead read: %r' % l)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
246 247 248 249
            l = l.rstrip('\n')
            wchead = fromhex(l)
            i = len(t._wc_zheadv)
            if wchead != t._headv[i]:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
250
                raise RuntimeError("wcsync #%d: wczhead (%s) != zhead (%s)" % (i, h(wchead), h(t._headv[i])))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
251 252
            t._wc_zheadv.append(wchead)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
253 254 255 256
        # head/at = last txn of whole db
        assert t.read("head/at") == h(t.head)


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
257
    # path returns path for object on wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
258
    # - str:        wcfs root + obj;
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
259 260
    # - Persistent: wcfs root + (head|@<at>)/bigfile/obj
    def path(t, obj, at=None):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
261
        if isinstance(obj, Persistent):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
262
            head = "head/" if at is None else ("@%s/" % h(at))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
263
            obj  = "%s/bigfile/%s" % (head, h(obj._p_oid))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
264
            at   = None
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
265

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
266
        assert isinstance(obj, str)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
267
        assert at is None  # must not be used with str
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
268 269 270

        return os.path.join(t.wc.mountpoint, obj)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
271
    # read reads file corresponding to obj on wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
272 273
    def read(t, obj, at=None):
        path = t.path(obj, at=at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
274 275
        return readfile(path)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
276
    # stat stats file corresponding to obj on wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
277 278
    def stat(t, obj, at=None):
        path = t.path(obj, at=at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
279
        return os.stat(path)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
280

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
281
    # _open opens file corresponding to obj on wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
282
    def _open(t, obj, mode='rb', at=None):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
283
        path = t.path(obj, at=at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
284
        return open(path, mode, 0)  # unbuffered
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
285

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
286

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
287 288 289 290 291 292 293 294 295 296 297 298
    # hat returns string for at.
    # it gives both symbolic version and raw hex for at, for example:
    #   @at2 (03cf7850500b5f66)
    def hat(t, at):
        try:
            i = t._headv.index(at)
        except ValueError:
            return "@" + h(at)

        return "@at%d (%s)" % (i, h(at))


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
299 300 301 302 303 304 305 306 307 308
    # zfiles returns ZBigFiles that were ever changed under t.
    def zfiles(t):
        zfs = set()
        for dF in t.dFtail:
            for zf in dF.byfile:
                if zf not in zfs:
                    zfs.add(zf)
        return zfs


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
309
# tFile provides testing environment for one bigfile on wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
310 311
#
# .blk() provides access to data of a block. .cached() gives state of which
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
312
# blocks are in OS pagecache. .assertCache and .assertBlk/.assertData assert
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
313
# on state of cache and data.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
314
class tFile:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
315 316
    # maximum number of pages we mmap for 1 file.
    # this should be not big not to exceed mlock limit.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
317
    _max_tracked = 8
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
318

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
319 320 321
    def __init__(t, tdb, zf, at=None):
        assert isinstance(zf, ZBigFile)
        t.tdb = tdb
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
322
        t.f   = tdb._open(zf, at=at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
323 324
        t.blksize = zf.blksize

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
325 326 327 328
        # mmap the file past the end up to _max_tracked pages and lock the
        # pages with MLOCK_ONFAULT. This way when a page is read by mmap access
        # we have the guarantee from kernel that the page will stay in
        # pagecache. We rely on this to verify OS cache state.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
329 330 331 332
        assert t.blksize % mm.PAGE_SIZE == 0
        t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked*t.blksize)
        mm.lock(t.fmmap, mm.MLOCK_ONFAULT)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
333
        tdb._tracked.add(t)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
334

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
335
    def close(t):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
336
        t.tdb._tracked.remove(t)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
337
        mm.unmap(t.fmmap)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
338 339
        t.f.close()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
340
    # blk returns bytearray view of file[blk].
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
341 342 343 344
    def blk(t, blk):
        assert blk <= t._max_tracked
        return bytearray(t.fmmap[blk*t.blksize:(blk+1)*t.blksize])

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
345
    # cached returns [] with indicating whether a file block is cached or not.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
346 347
    # 1 - cached, 0 - not cached, fractional (0,1) - some pages of the block are cached some not.
    def cached(t):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
348
        l = t._sizeinblk()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
349 350 351 352 353 354 355 356 357 358 359 360 361 362
        incorev = mm.incore(t.fmmap[:l*t.blksize])
        # incorev is in pages; convert to in blocks
        assert t.blksize % mm.PAGE_SIZE == 0
        blkpages = t.blksize // mm.PAGE_SIZE
        cachev = [0.]*l
        for i, v in enumerate(incorev):
            blk = i // blkpages
            cachev[blk] += bool(v)
        for blk in range(l):
            cachev[blk] /= blkpages
            if cachev[blk] == int(cachev[blk]):
                cachev[blk] = int(cachev[blk])  # 0.0 -> 0, 1.0 -> 1
        return cachev

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
363 364 365 366 367 368 369 370 371 372 373 374 375
    # _sizeinblk returns file size in blocks.
    def _sizeinblk(t):
        st = os.fstat(t.f.fileno())
        assert st.st_size % t.blksize == 0
        assert st.st_size // t.blksize <= t._max_tracked
        return st.st_size // t.blksize

    # assertCache asserts state of OS cache for file.
    #
    # incorev is [] of 1/0 representing whether block data is present or not.
    def assertCache(t, incorev):
        assert t.cached() == incorev

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
376 377 378 379 380 381 382
    # assertBlk asserts that file block #blk has data as expected.
    #
    # Expected data may be given with size < t.blksize. In such case the data
    # is implicitly appended with trailing zeros.
    def assertBlk(t, blk, data):
        assert len(data) <= t.blksize
        data += b'\0'*(t.blksize - len(data))   # tailing zeros
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
383
        assert blk < t._sizeinblk()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
384

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
385
        # XXX assert individually for every block's page? (easier debugging?)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
386 387
        assert t.blk(blk) == data, ("#blk: %d" % blk)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
388 389 390
        # we just accessed the block - it has to be in OS cache
        assert t.cached()[blk] == 1

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
391
    # assertData asserts that file has data blocks as specified.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
392 393 394
    #
    # Expected blocks may be given with size < zf.blksize. In such case they
    # are implicitly appended with trailing zeros.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
395
    #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
396
    # It also checks file size and optionally mtime.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
397
    def assertData(t, datav, mtime=None):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
398
        st = os.fstat(t.f.fileno())
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
399
        assert st.st_size == len(datav)*t.blksize
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
400 401 402
        if mtime is not None:
            assert st.st_mtime == tidtime(mtime)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
403 404 405
        for blk, data in enumerate(datav):
            t.assertBlk(blk, data)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
406 407
        # all blocks must be in cache after we touched them all
        t.assertCache([1]*len(datav))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
408 409


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
410
# tWatch provides testing environment for /head/watch opened on wcfs.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
411 412 413
#
# .sendReq()/.recvReq() provides raw IO in terms of wcfs invalidation protocol messages.
# .watch() setups a watch for a file and verifies ... XXX
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
414 415 416 417
class tWatch:

    def __init__(t, tdb):
        t.tdb = tdb
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
418

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
419
        # head/watch handle.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
420
        #
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
421 422
        # python/stdio lock file object on read/write, however we need both
        # read and write to be working simultaneously.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
423
        # -> use 2 separate file objects for rx and tx.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
424 425
        #
        # fdopen takes ownership of file descriptor and closes it when file
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
426
        # object is closed -> dup fd so that each file object has its own fd.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
427
        wh  = os.open(tdb.path("head/watch"), os.O_RDWR)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
428
        wh2 = os.dup(wh)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
429 430
        t.wrx = os.fdopen(wh, 'rb')
        t.wtx = os.fdopen(wh2, 'wb')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
431

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
432
        # inv.protocol message IO
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
433 434 435
        t._acceptq  = chan() # (stream, msg)    server originated messages go here
        t._rxmu     = threading.Lock()
        t._rxtab    = {}     # stream -> rxq    server replies go via here
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
436
        t._accepted = set()  # of stream        streams we accepted but did not replied yet
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
437 438 439

        t._txmu     = threading.Lock()  # serializes writes

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
440 441 442
        serveCtx, t._serveCancel = context.with_cancel(context.background())
        t._serveWG = sync.WorkGroup(serveCtx)
        t._serveWG.go(t._serveRecv)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
443

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
444 445 446
        # this tWatch currently watches files at particular state.
        t._watching = {}    # {} ZBigFile -> @at

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
447 448 449 450
        tdb._tracked.add(t)

    def close(t):
        t.tdb._tracked.remove(t)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
451

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
452
        t._serveCancel()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
453 454 455
        # ask wcfs to close its tx & rx sides; close(wcfs.tx) wakes up
        # _serveRecv on client (= on us).
        t._send(1, b'bye')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
456 457 458
        # XXX we can get stuck here if wcfs does not behave as we want.
        # XXX in particular if there is a silly - e.g. syntax or type error in
        #     test code - we curently get stuck here.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
459 460
        #
        # XXX -> better pthread_kill(SIGINT) instead of relying on wcfs proper behaviour?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
461 462 463 464 465 466
        try:
            t._serveWG.wait()
        except Exception as e:
            # canceled is expected and ok
            if e != context.canceled:
                reraise(e, None, e.__traceback__)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
467 468 469 470

        t.wtx.close()
        t.wrx.close()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
471 472 473 474 475 476 477
        # wakeup everyone waiting for rx
        t._acceptq.close()
        with t._rxmu:
            rxtab = t._rxtab
            t._rxtab = None     # don't allow new rxtab registers
        for rxq in rxtab.values():
            rxq.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
478

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
479 480 481

    # ---- message IO ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
482 483
    # _serveRecv receives messages from .w and dispatches them according to streamID.
    @func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
484
    def _serveRecv(t, ctx):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
485
        while 1:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
486
            # NOTE: .close() makes sure .wrx.read*() will wake up
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
487
            l = t.wrx.readline()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
488
            #print('C: watch  : rx: %r' % l)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
489 490 491
            if len(l) == 0:
                break   # closed

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
492
            # <stream> ... \n
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
493 494
            stream, msg = l.split(' ', 1)
            stream = int(stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
495
            msg = msg.rstrip('\n')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
496 497 498

            reply = bool(stream % 2)
            if reply:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
499
                with t._rxmu:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
500
                    assert stream in t._rxtab
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
501
                    rxq = t._rxtab.pop(stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
502 503 504 505 506
                _, _rx = select(
                    ctx.done().recv,    # 0
                    (rxq.send, msg),    # 1
                )
                if _ == 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
507
                    raise ctx.err()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
508
            else:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
509
                with t._rxmu:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
510 511
                    assert stream not in t._accepted
                    t._accepted.add(stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
512 513 514 515 516
                _, _rx = select(
                    ctx.done().recv,                    # 0
                    (t._acceptq.send, (stream, msg)),   # 1
                )
                if _ == 0:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
517
                    raise ctx.err()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
518

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
519 520 521 522

    # _send sends raw message via specified stream.
    #
    # multiple _send can be called in parallel - _send serializes writes.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
523
    # XXX +ctx?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
524 525 526
    def _send(t, stream, msg):
        assert '\n' not in msg
        with t._txmu:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
527
            pkt = b"%d %s\n" % (stream, msg)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
528
            #print('C: watch  : tx: %r' % pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
529
            t.wtx.write(pkt)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
530
            t.wtx.flush()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
531 532 533 534

    # sendReq sends client -> server request and returns server reply.
    #
    # only 1 sendReq must be used at a time.    # XXX relax?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
535
    def sendReq(t, ctx, req):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
536 537 538
        stream = 1

        rxq = chan()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
539
        with t._rxmu:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
540 541 542 543
            assert stream not in t._rxtab
            t._rxtab[stream] = rxq

        t._send(stream, req)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
544 545 546 547 548 549 550 551

        _, _rx = select(
            ctx.done().recv,    # 0
            rxq.recv,           # 1
        )
        if _ == 0:
            raise ctx.err()
        return _rx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
552 553 554 555

    # recvReq receives client <- server request.
    #
    # multiple recvReq could be used at a time.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
556 557 558 559 560 561 562 563 564
    def recvReq(t, ctx): # -> tSrvReq | None when EOF
        _, _rx = select(
            ctx.done().recv,    # 0
            t._acceptq.recv,    # 1
        )
        if _ == 0:
            raise ctx.err()

        rx = _rx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
565 566
        if rx is None:
            return rx
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
567

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
568 569 570
        stream, msg = rx
        return tSrvReq(t, stream, msg)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
571

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
572 573 574 575 576 577 578 579
# tSrvReq represents 1 server-initiated wcfs request received over /head/watch.
class tSrvReq:
    def __init__(req, twatch, stream, msg):
        req.twatch = twatch
        req.stream = stream
        req.msg    = msg

    def reply(req, answer):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
580
        #print('C: reply %s <- %r ...' % (req, answer))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
581
        t = req.twatch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
582
        with t._rxmu:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
583
            assert req.stream in t._accepted
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
584 585 586

        t._send(req.stream, answer)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
587
        with t._rxmu:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
588
            assert req.stream in t._accepted
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
589
            t._accepted.remove(req.stream)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
590 591 592 593

        # XXX also track as answered? (and don't accept with the same ID ?)


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
594 595
# ---- watch setup/adjust ----

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
# _pinAt returns which blocks needs to be pinned for zf@at.
#
# it does not take into account whether blocks are in cache or not and computes
# pin from all changes.
@func(tWatch)
def _pinAt(w, zf, at):  # -> pin = {} blk -> rev
    t = w.tdb

    # all changes to zf
    vdf = [_.byfile[zf] for _ in t.dFtail if zf in _.byfile]

    # {} blk -> at for changes ∈ (at, head]
    pin = {}
    for df in [_ for _ in vdf if _.rev > at]:
        for blk in df.ddata:
            if blk in pin:
                continue
            # history of blk changes <= at
            blkhistoryat = [_.rev for _ in vdf if blk in _.ddata and _.rev <= at]
            if len(blkhistoryat) == 0:
                pinrev = t._headv[0]    # was hole - at0  XXX -> pin to @00?
            else:
                pinrev = max(blkhistoryat)
            assert pinrev <= at
            pin[blk] = pinrev

    return pin

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
624 625 626 627 628
# watch sets up a watch for file@at.
# XXX and verifies that wcfs sends correct initial pins?
# XXX or adjusts
@func(tWatch)
def watch(w, zf, at):   # XXX -> ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
629
    t = w.tdb
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
630
    at_prev = w._watching.get(zf)   # we were previously watching zf @at_prev
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
631
    at_from = ''
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
632
    if at_prev is not None:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
633 634
        at_from = '(%s ->) ' % t.hat(at_prev)
    print('\nC: setup watch f<%s> %s%s' % (h(zf._p_oid), at_from, t.hat(at)))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
635

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
636 637 638 639 640 641
    # pinstr returns human-readable representation for {}blk->rev
    def pinstr(pin):
        pinv = ['%d: %s' % (blk, t.hat(pin[blk])) for blk in sorted(pin.keys())]
        return '{%s}' % ', '.join(pinv)


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
642
    pin_prev = {}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
643
    if at_prev is not None:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
644 645 646 647 648
        assert at_prev <= at, 'TODO %s -> %s' % (t.hat(at_prev), t.hat(at))
        pin_prev = w._pinAt(zf, at_prev)

    pin = w._pinAt(zf, at)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
649 650
    if at_prev != at and at_prev is not None:
        print('\n%s\n%s' % (pinstr(pin_prev), pinstr(pin)))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
651

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
652 653 654 655 656
    for blk in set(pin_prev.keys()).union(pin.keys()):
        # blk ∉ pin_prev,   blk ∉ pin       -> cannot happen
        assert (blk in pin_prev) or (blk in pin)

        # blk ∉ pin_prev,   blk ∈ pin       -> cannot happen
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
657 658
        if blk not in pin_prev and blk in pin:
            if at_prev is not None:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
659 660 661
                assert False, '#%d pinned %s; not pinned %s' % (t.hat(at_prev), t.hat(at))

        # blk ∈ pin_prev,   blk ∉ pin       -> unpin to head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
662
        elif blk in pin_prev and blk not in pin:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
663 664
            pin[blk] = None     # XXX = head

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
665
        # blk ∈ pin_prev,   blk ∈ pin       -> if rev different: use pin
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
666
        elif blk in pin_prev and blk in pin:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
667 668 669
            assert pin_prev[blk] <= pin[blk]
            if pin_prev[blk] == pin[blk]:
                del pin[blk]    # would need to pin to what it is already pinned
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
670

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
671
    #print('-> %s' % pinstr(pin))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
672 673 674

    # {} blk -> at that have to be pinned
    # XXX also check that head/file[blk] is in cache - else no need to pin
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
675
    pinok = pin
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
676
    print('#  pinok: %s' % pinstr(pinok))
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
677 678 679

    # send watch request and check that we receive pins for in-cache blocks
    # changed > at.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
680
    # XXX use timeout to detect wcfs replying less pins.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
681 682
    ctx, cancel = context.with_cancel(context.background())
    wg = sync.WorkGroup(ctx)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
683

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
684
    def _(ctx):
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
685
        pinv = w._expectPin(ctx, zf, pinok)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
686 687
        for p in pinv:
            p.reply(b"ack")     # XXX -> return to caller?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
688
        # check that we don't get extra pins before "ok" reply to "watch"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
689 690 691 692 693 694 695 696
        try:
            req = w.recvReq(ctx)
        except Exception as e:
            if e is context.canceled:
                return # cancel is expected after seeing "ok"
            reraise(e, None, e.__traceback__)

        assert False, "extra pin message received: %r" % req.msg
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
697
    wg.go(_)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
698

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
699 700
    def _(ctx):
        assert w.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at))) == "ok"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
701
        # cancel _expectPin waiting upon receiving "ok" from wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
702 703 704 705 706
        # -> error that missed pins were not received.
        cancel()
    wg.go(_)

    wg.wait()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
707
    w._watching[zf] = at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
708

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734
# _expectPin asserts that wcfs sends expected pin messages.
#
# expect is {} blk -> at
# returns [] of received pin requests.
@func(tWatch)
def _expectPin(w, ctx, zf, expect):
    expected = set()    # of expected pin messages
    for blk, at in expect.items():
        msg = b"pin %s #%d @%s" % (h(zf._p_oid), blk, h(at))
        assert msg not in expected
        expected.add(msg)

    reqv = []   # of received requests
    while len(expected) > 0:
        try:
            req = w.recvReq(ctx)
        except Exception as e:
            raise RuntimeError("%s\nnot all pin missages received - pending:\n%s" % (e, expected))
        assert req is not None  # channel not closed
        assert req.msg in expected
        expected.remove(req.msg)
        reqv.append(req)

    return reqv


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
735 736

# test_wcfs exercises wcfs functionality.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
737
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
738
def test_wcfs():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
739 740 741
    t = tDB()
    defer(t.close)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
742
    t.root['!file'] = nonfile = Persistent()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
743
    t.root['zfile'] = zf = ZBigFile(blksize)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
744

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
745
    at0  = t.commit()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
746

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
747
    # >>> lookup non-BigFile -> must be rejected
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
748
    with raises(OSError) as exc:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
749
        t.stat(nonfile)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
750 751
    assert exc.value.errno == EINVAL

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
752
    # >>> file initially empty
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
753
    f = t.open(zf)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
754
    f.assertCache([])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
755
    f.assertData ([], mtime=at0)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
756

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
757
    # >>> (@at1) commit data -> we can see it on wcfs
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
758
    t.change(zf, {2: b'alpha'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
759
    at1 = t.commit()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
760

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
761
    f.assertCache([0,0,0])  # initially not cached
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
762
    f.assertData ([b'',b'',b'alpha'], mtime=t.head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
763

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
764
    # >>> (@at2) commit again -> we can see both latest and snapshotted states
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
765
    t.change(zf, {2: b'beta', 3: b'gamma'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
766
    at2 = t.commit()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
767

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
768
    # f @head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
769
    f.assertCache([1,1,0,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
770
    f.assertData ([b'',b'', b'beta', b'gamma'], mtime=t.head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
771

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
772 773
    # f @at1
    f1 = t.open(zf, at=at1)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
774
    f1.assertCache([0,0,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
775
    f1.assertData ([b'',b'',b'alpha'])  # XXX + mtime=at1?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
776 777


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
778 779
    # >>> (@at3) commit again without changing zf size
    f2 = t.open(zf, at=at2)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
780
    t.change(zf, {2: b'kitty'})
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
781
    at3 = t.commit()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
782 783 784

    f.assertCache([1,1,0,1])

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
785 786
    # f @head is opened again -> cache must not be lost
    f_ = t.open(zf)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
787
    f_.assertCache([1,1,0,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
788
    f_.close()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
789 790 791
    f.assertCache([1,1,0,1])

    # f @head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
792
    f.assertCache([1,1,0,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
793
    f.assertData ([b'',b'',b'kitty',b'gamma'], mtime=t.head)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
794 795 796

    # f @at2
    f2.assertCache([0,0,1,0])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
797
    f2.assertData ([b'',b'',b'beta',b'gamma'])   # XXX mtime=at2
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
798 799 800

    # f @at1
    f1.assertCache([1,1,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
801
    f1.assertData ([b'',b'',b'alpha'])  # XXX + mtime=at1?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
802 803 804


    # >>> f close / open again -> cache must not be lost
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
805
    # XXX a bit flaky since OS can evict whole f cache under pressure
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
806
    f.assertCache([1,1,1,1])
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
807 808 809 810 811
    f.close()
    f = t.open(zf)
    assert f.cached() != [0,0,0,0]


Kirill Smelkov's avatar
.  
Kirill Smelkov committed
812 813
    # >>> XXX commit data to not yet accessed f part - nothing happens

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
814
    # >>> invalidation protocol
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
815
    print('\n\n inv. protocol \n\n')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
816

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
817
    for zf in t.zfiles():
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
818
        # watch from scratch
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
819
        for dF in t.dFtail:
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
820 821
            w = t.openwatch()
            w.watch(zf, dF.rev)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
822 823
            w.close()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
824 825 826 827 828 829
        # watch going at_i -> at_j
        for dF_from in t.dFtail:
            for dF_to in t.dFtail:
                if not (dF_from.rev <= dF_to.rev):
                    continue    # FIXME TODO test all directions

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
830
                print('\n--------')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
831 832 833 834 835
                w = t.openwatch()
                w.watch(zf, dF_from.rev)
                w.watch(zf, dF_to.rev)
                w.close()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
836
        # watch going at1 -> at2 -> at3
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
837
        print('\n--------')
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
838

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
839
    print()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
840

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
841

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
842
    # XXX both from scratch and going e.g. at1 -> at2 -> at3
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
843
    # XXX going not only up, but also down at1 <- at2 <- at3    ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
844

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
845 846
    # XXX 2 (or more) opened watch for 1 file at the same time
    # XXX watch for 2 files via single watch open
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
847

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
848 849 850 851 852 853 854
    # XXX watch with @at > head - must wait for head to become >= at
    # XXX watch with @at < δtail.tail -> rejected
    # XXX watch with at="-" -> stop watching

    # XXX drop file[blk] from cache, access again -> no pin message sent the second time

    # XXX mmap f; change f[blk] on pin message while under pagefault - should get changed page
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
855

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
856
    # XXX access to block not previously accessed but invalidated in ZODB
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
857

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
858 859
    # XXX new watch request while previous watch request is in progress (over the same /head/watch handle)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
860 861
    # XXX invalid requests -> wcfs replies with error

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
862 863 864 865 866 867 868 869
    # XXX blk not initially covered by f.δtail (blk never accessed - f.δtail
    # not updated on invalidation). then blk is accessed - what happens with
    # watch that should be triggerring for this blk?

    # XXX similar to ^^^ but with two changes to blk not covered by f.δtail. To
    # which @rev blk is pinned on watch? (δtail is missing both and if it will
    # be another blk rev < rev1,rev2 - it will be incorrect)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
870
    # XXX pin message when blk data only first appeared after > w.at - pin
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
871
    # needs to pin to zero (or at0 ?).
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
872

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
873 874
    # XXX watch @at when file did not existed -> error

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
875 876 877
    # XXX ZBlk copied from blk1 -> blk2 ; for the same file and for file1 -> file2
    # XXX ZBlk moved  from blk1 -> blk2 ; for the same file and for file1 -> file2

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
878 879 880
    # XXX read file[blk]=hole; then file[blk]=zblk - must be invalidated and
    # setupWatch must send pins.

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
881

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902
def test_wcfs_invproto():
    # XXX
    pass


# ---- misc ---

# readfile reads file @ path.
def readfile(path):
    with open(path) as f:
        return f.read()

# tidtime converts tid to transaction commit time.
def tidtime(tid):
    t = TimeStamp(tid).timeTime()

    # ZODB/py vs ZODB/go time resolution is not better than 1µs
    # see e.g. https://lab.nexedi.com/kirr/neo/commit/9112f21e
    #
    # NOTE pytest.approx supports only ==, not e.g. <, so we use plain round.
    return round(t, 6)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
903

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
904 905
# verify that tidtime is precise enough to show difference in between transactions.
@func
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
906 907 908 909
def test_tidtime_notrough():
    t = tDB()
    defer(t.close)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
910 911 912 913
    atprev = t.commit()
    for i in range(10):
        at = t.commit()
        assert tidtime(at) > tidtime(atprev)