Commit 2623b3e6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ec199348
...@@ -122,6 +122,9 @@ class tDB: ...@@ -122,6 +122,9 @@ class tDB:
t.root = testdb.dbopen() t.root = testdb.dbopen()
t.wc = wcfs.join(testzurl, autostart=True) t.wc = wcfs.join(testzurl, autostart=True)
# ZBigFile(s) scheduled for commit
t._changed = {} # ZBigFile -> {} blk -> data
# commited: head + head history # commited: head + head history
t.head = None t.head = None
t._headv = [] t._headv = []
...@@ -131,23 +134,43 @@ class tDB: ...@@ -131,23 +134,43 @@ class tDB:
t._wc_zheadv = [] t._wc_zheadv = []
# tracked tFiles # tracked tFiles
t.tracked = set() t._tracked = set()
def close(t): def close(t):
for tf in t.tracked.copy(): for tf in t._tracked.copy():
tf.close() tf.close()
assert len(t.tracked) == 0 assert len(t._tracked) == 0
t._wc_zheadfh.close() t._wc_zheadfh.close()
t.wc.close() t.wc.close()
dbclose(t.root) dbclose(t.root)
# change schedules zf to be changed according changeDelta at commit.
#
# changeDelta is {} blk -> data.
def change(t, zf, changeDelta):
assert isinstance(zf, ZBigFile)
zfDelta = t._changed.setdefault(zf, {})
for blk, data in changeDelta.iteritems():
assert len(data) <= zf.blksize
zfDelta[blk] = data
# commit commits transaction and remembers/returns committed transaction ID. # commit commits transaction and remembers/returns committed transaction ID.
def commit(t): def commit(t):
# perform modifications scheduled by change.
# use !wcfs mode so that we prepare data independently of wcfs code paths.
for zf, zfDelta in t._changed.iteritems():
zfh = zf.fileh_open(_use_wcfs=False)
for blk, data in zfDelta.iteritems():
data += b'\0'*(zf.blksize - len(data)) # trailing \0
vma = zfh.mmap(blk, 1)
memcpy(vma, data)
t._changed = {}
# NOTE there is no clean way to retrieve tid of just committed transaction # NOTE there is no clean way to retrieve tid of just committed transaction
# we are using last._p_serial as workaround. # we are using last._p_serial as workaround.
t.root['_last'] = last = Persistent() t.root['_last'] = last = Persistent()
last._p_changed = 1 last._p_changed = 1
transaction.commit() transaction.commit()
head = last._p_serial head = last._p_serial
#print('commit -> %s' % h(head)) #print('commit -> %s' % h(head))
...@@ -205,6 +228,7 @@ class tDB: ...@@ -205,6 +228,7 @@ class tDB:
def open(t, zf, at=None): def open(t, zf, at=None):
return tFile(t, zf, at=at) return tFile(t, zf, at=at)
# tFile is testing environment for one bigfile on wcfs. # tFile is testing environment for one bigfile on wcfs.
class tFile: class tFile:
# maximum number of pages we mmap for 1 file. # maximum number of pages we mmap for 1 file.
...@@ -225,10 +249,10 @@ class tFile: ...@@ -225,10 +249,10 @@ class tFile:
t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked*t.blksize) t.fmmap = mm.map_ro(t.f.fileno(), 0, t._max_tracked*t.blksize)
mm.lock(t.fmmap, mm.MLOCK_ONFAULT) mm.lock(t.fmmap, mm.MLOCK_ONFAULT)
tdb.tracked.add(t) tdb._tracked.add(t)
def close(t): def close(t):
t.tdb.tracked.remove(t) t.tdb._tracked.remove(t)
mm.unmap(t.fmmap) mm.unmap(t.fmmap)
t.f.close() t.f.close()
...@@ -346,6 +370,7 @@ def test_wcfs(): ...@@ -346,6 +370,7 @@ def test_wcfs():
assert tidtime(tid2) > tidtime(tid1) assert tidtime(tid2) > tidtime(tid1)
t.wcsync() t.wcsync()
# >>> lookup non-BigFile -> must be rejected # >>> lookup non-BigFile -> must be rejected
with raises(OSError) as exc: with raises(OSError) as exc:
t.stat(nonfile) t.stat(nonfile)
...@@ -353,15 +378,12 @@ def test_wcfs(): ...@@ -353,15 +378,12 @@ def test_wcfs():
f = t.open(zf) f = t.open(zf)
# * file initially empty # >>> file initially empty
f.assertCache([]) f.assertCache([])
f.assertData ([], mtime=tid1) f.assertData ([], mtime=tid1)
# >>> commit data to zf -> verify we can see it on wcfs # >>> commit data to zf -> verify we can see it on wcfs
# (use !wcfs mode so that we prepare data independently of wcfs code paths) t.change(zf, {2: b'alpha'})
zfh = zf.fileh_open(_use_wcfs=False)
vma = zfh.mmap(2, 1) # 1 page at offset=2
memcpy(vma, b'alpha')
t.commit() t.commit()
t.wcsync() t.wcsync()
...@@ -372,18 +394,21 @@ def test_wcfs(): ...@@ -372,18 +394,21 @@ def test_wcfs():
# >>> commit data again -> verify we can see both latest and snapshotted states. # >>> commit data again -> verify we can see both latest and snapshotted states.
at1 = t.head at1 = t.head
"""
zfh = zf.fileh_open(_use_wcfs=False) zfh = zf.fileh_open(_use_wcfs=False)
vma1 = zfh.mmap(2, 1) vma1 = zfh.mmap(2, 1)
vma2 = zfh.mmap(2+1, 1) vma2 = zfh.mmap(2+1, 1)
memcpy(vma1,b'beta') memcpy(vma1,b'beta')
memcpy(vma2,b'gamma') memcpy(vma2,b'gamma')
"""
t.change(zf, {2: b'beta', 3: b'gamma'})
t.commit() t.commit()
t.wcsync() t.wcsync()
# f @head # f @head
f.assertCache([1,1,0,0]) f.assertCache([1,1,0,0])
f.assertData ([b'',b'', b'betaa', b'gamma'], mtime=t.head) f.assertData ([b'',b'', b'beta', b'gamma'], mtime=t.head)
# f @at1 # f @at1
f1 = t.open(zf, at=at1) f1 = t.open(zf, at=at1)
...@@ -395,9 +420,12 @@ def test_wcfs(): ...@@ -395,9 +420,12 @@ def test_wcfs():
at2 = t.head at2 = t.head
f2 = t.open(zf, at=at2) f2 = t.open(zf, at=at2)
"""
zfh = zf.fileh_open(_use_wcfs=False) zfh = zf.fileh_open(_use_wcfs=False)
vma = zfh.mmap(2, 1) vma = zfh.mmap(2, 1)
memcpy(vma, b'kitty') memcpy(vma, b'kitty')
"""
t.change(zf, {2: b'kitty'})
t.commit() t.commit()
t.wcsync() t.wcsync()
...@@ -415,7 +443,7 @@ def test_wcfs(): ...@@ -415,7 +443,7 @@ def test_wcfs():
# f @at2 # f @at2
f2.assertCache([0,0,1,0]) f2.assertCache([0,0,1,0])
f2.assertData ([b'',b'',b'betaa',b'gamma']) # XXX mtime=at2 f2.assertData ([b'',b'',b'beta',b'gamma']) # XXX mtime=at2
# f @at1 # f @at1
f1.assertCache([1,1,1]) f1.assertCache([1,1,1])
...@@ -432,6 +460,8 @@ def test_wcfs(): ...@@ -432,6 +460,8 @@ def test_wcfs():
assert f.cached() != [0,0,0,0] assert f.cached() != [0,0,0,0]
# >>> XXX commit data to not yet accessed f part - nothing happens
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment