Commit 08e0c9fb authored by Kirill Smelkov's avatar Kirill Smelkov

*: tests: don't hang on exception in non-main thread

Previously if an assert or something failed in spawned thread, the main
thread was usually spinning indefinitely = tests hang. -> Switch all
threading places to use sync.WorkGroup and this way if a thread fails,
all other threads are canceled and the exception is reported back to
wg.wait in main thread.

Since we start to go this route, NotifyChannel is reworked to fully use
channels instead of busy-waiting.
parent 5f28b72c
# Wendeling.core.bigarray | Tests for ZBigArray
# Copyright (C) 2014-2019 Nexedi SA and Contributors.
# Copyright (C) 2014-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -28,8 +28,8 @@ import transaction
from transaction import TransactionManager
from ZODB.POSException import ConflictError
from numpy import dtype, uint8, all, array_equal, arange
from golang import defer, func
from threading import Thread
from golang import defer, func, chan
from golang import context, sync
from six.moves import _thread
from pytest import raises
......@@ -255,7 +255,9 @@ def test_zbigarray_vs_conn_migration():
c21_1 = NotifyChannel() # T21 -> T11
# open, modify, commit, close, open, commit
def T11():
T11ident = [None] # [0] = gettid(T11)
def T11(ctx):
T11ident[0] = _thread.get_ident()
tell, wait = c12_1.tell, c21_1.wait
conn11_1 = db.open()
......@@ -273,8 +275,8 @@ def test_zbigarray_vs_conn_migration():
# close conn, wait till T21 reopens it
del a11, root11_1
conn11_1.close()
tell('T1-conn11_1-closed')
wait('T2-conn21-opened')
tell(ctx, 'T1-conn11_1-closed')
wait(ctx, 'T2-conn21-opened')
# open nother connection. it must be different
# (see appropriate place in zfile test about why)
......@@ -282,28 +284,31 @@ def test_zbigarray_vs_conn_migration():
assert conn11_2 is not conn11_1
root11_2 = conn11_2.root()
wait('T2-zarray2-modified')
wait(ctx, 'T2-zarray2-modified')
transaction.commit() # should be nothing
tell('T1-txn12-committed')
tell(ctx, 'T1-txn12-committed')
wait('T2-conn21-closed')
wait(ctx, 'T2-conn21-closed')
del root11_2
conn11_2.close()
# hold on this thread until main driver tells us
wait('T11-exit-command')
wait(ctx, 'T11-exit-command')
# open, modify, abort
def T21():
T21done = chan()
@func
def T21(ctx):
defer(T21done.close)
tell, wait = c21_1.tell, c12_1.wait
# wait until T1 finish setting up initial data and get its connection
# (see appropriate place in zfile tests for details)
wait('T1-conn11_1-closed')
wait(ctx, 'T1-conn11_1-closed')
conn21 = db.open()
assert conn21 is conn01
tell('T2-conn21-opened')
tell(ctx, 'T2-conn21-opened')
# modify zarray and arrange timings so that T1 commits after zarray is
# modified, but before we commit/abort.
......@@ -312,21 +317,21 @@ def test_zbigarray_vs_conn_migration():
a21[0:1] = [21] # XXX -> [0] = 21 after BigArray can
tell('T2-zarray2-modified')
wait('T1-txn12-committed')
tell(ctx, 'T2-zarray2-modified')
wait(ctx, 'T1-txn12-committed')
# abort - zarray2 should stay unchanged
transaction.abort()
del a21, root21
conn21.close()
tell('T2-conn21-closed')
tell(ctx, 'T2-conn21-closed')
t11, t21 = Thread(target=T11), Thread(target=T21)
t11.start(); t21.start()
t11_ident = t11.ident
t21.join() # NOTE not joining t11 yet
wg = sync.WorkGroup(context.background())
wg.go(T11)
wg.go(T21)
T21done.recv() # NOTE not joining t11 yet
# now verify that zarray2 stays at 11 state, i.e. T21 was really aborted
conn02 = db.open()
......@@ -346,62 +351,69 @@ def test_zbigarray_vs_conn_migration():
c21_2 = NotifyChannel() # T22 -> T12
# open, abort
T12done = chan()
@func
def T12():
def T12(ctx):
defer(T12done.close)
tell, wait = c12_2.tell, c21_2.wait
wait('T2-conn22-opened')
wait(ctx, 'T2-conn22-opened')
conn12 = db.open()
defer(conn12.close)
tell('T1-conn12-opened')
wait('T2-zarray2-modified')
tell(ctx, 'T1-conn12-opened')
wait(ctx, 'T2-zarray2-modified')
transaction.abort()
tell('T1-txn-aborted')
wait('T2-txn-committed')
tell(ctx, 'T1-txn-aborted')
wait(ctx, 'T2-txn-committed')
# open, modify, commit
T22done = chan()
@func
def T22():
def T22(ctx):
defer(T22done.close)
tell, wait = c21_2.tell, c12_2.wait
# make sure we are not the same thread which ran T11
# (should be so because we cared not to stop T11 yet)
assert _thread.get_ident() != t11_ident
assert _thread.get_ident() != T11ident[0]
conn22 = db.open()
defer(conn22.close)
assert conn22 is conn01
tell('T2-conn22-opened')
tell(ctx, 'T2-conn22-opened')
# modify zarray and arrange timings so that T1 does abort after we
# modify, but before we commit
wait('T1-conn12-opened')
wait(ctx, 'T1-conn12-opened')
root22 = conn22.root()
a22 = root22['zarray2']
a22[0:1] = [22] # XXX -> [0] = 22 after BigArray can
tell('T2-zarray2-modified')
wait('T1-txn-aborted')
tell(ctx, 'T2-zarray2-modified')
wait(ctx, 'T1-txn-aborted')
# commit - changes should propagate to zarray
transaction.commit()
tell('T2-txn-committed')
tell(ctx, 'T2-txn-committed')
t12, t22 = Thread(target=T12), Thread(target=T22)
t12.start(); t22.start()
t12.join(); t22.join()
wg.go(T12)
wg.go(T22)
T12done.recv()
T22done.recv()
# tell T11 to stop also
c21_1.tell('T11-exit-command')
t11.join()
def _(ctx):
c21_1.tell(ctx, 'T11-exit-command')
wg.go(_)
wg.wait()
# now verify that zarray2 changed to 22 state, i.e. T22 was really committed
conn03 = db.open()
......
# Wendelin.core.bigfile | Tests for ZODB BigFile backend
# Copyright (C) 2014-2019 Nexedi SA and Contributors.
# Copyright (C) 2014-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -28,8 +28,8 @@ import transaction
from transaction import TransactionManager
from ZODB.POSException import ConflictError
from numpy import ndarray, array_equal, uint32, zeros, arange
from golang import defer, func
from threading import Thread
from golang import defer, func, chan
from golang import context, sync
from six.moves import _thread
from six import b
import struct
......@@ -223,7 +223,9 @@ def test_bigfile_filezodb_vs_conn_migration():
c21_1 = NotifyChannel() # T21 -> T11
# open, modify, commit, close, open, commit
def T11():
T11ident = [None] # [0] = gettid(T11)
def T11(ctx):
T11ident[0] = _thread.get_ident()
tell, wait = c12_1.tell, c21_1.wait
conn11_1 = db.open()
......@@ -247,8 +249,8 @@ def test_bigfile_filezodb_vs_conn_migration():
# close conn, wait till T21 reopens it
del vma11, fh11, a11, f11, root11_1
conn11_1.close()
tell('T1-conn11_1-closed')
wait('T2-conn21-opened')
tell(ctx, 'T1-conn11_1-closed')
wait(ctx, 'T2-conn21-opened')
# open another connection (e.g. for handling next request) which does
# not touch zfile at all, and arrange timings so that T2 modifies
......@@ -257,33 +259,36 @@ def test_bigfile_filezodb_vs_conn_migration():
assert conn11_2 is not conn11_1
root11_2 = conn11_2.root()
wait('T2-zfile2-modified')
wait(ctx, 'T2-zfile2-modified')
# XXX do we want to also modify some other objects?
# (but this have side effect for joining conn11_2 to txn)
transaction.commit() # should be nothing
tell('T1-txn12-committed')
tell(ctx, 'T1-txn12-committed')
wait('T2-conn21-closed')
wait(ctx, 'T2-conn21-closed')
del root11_2
conn11_2.close()
# hold on this thread until main driver tells us
wait('T11-exit-command')
wait(ctx, 'T11-exit-command')
# open, modify, abort
def T21():
T21done = chan()
@func
def T21(ctx):
defer(T21done.close)
tell, wait = c21_1.tell, c12_1.wait
# - wait until T1 finish setting up initial data for zfile and closes connection.
# - open that connection before T1 is asleep - because ZODB organizes
# connection pool as stack (with correction for #active objects),
# we should get exactly the same connection T1 had.
wait('T1-conn11_1-closed')
wait(ctx, 'T1-conn11_1-closed')
conn21 = db.open()
assert conn21 is conn01
tell('T2-conn21-opened')
tell(ctx, 'T2-conn21-opened')
# modify zfile and arrange timings so that T1 commits after zfile is
# modified, but before we commit/abort.
......@@ -295,21 +300,21 @@ def test_bigfile_filezodb_vs_conn_migration():
Blk(vma21, 0)[0] = 21
tell('T2-zfile2-modified')
wait('T1-txn12-committed')
tell(ctx, 'T2-zfile2-modified')
wait(ctx, 'T1-txn12-committed')
# abort - zfile2 should stay unchanged
transaction.abort()
del vma21, fh21, a21, root21
conn21.close()
tell('T2-conn21-closed')
tell(ctx, 'T2-conn21-closed')
t11, t21 = Thread(target=T11), Thread(target=T21)
t11.start(); t21.start()
t11_ident = t11.ident
t21.join() # NOTE not joining t11 yet
wg = sync.WorkGroup(context.background())
wg.go(T11)
wg.go(T21)
T21done.recv() # NOTE not joining t11 yet
# now verify that zfile2 stays at 11 state, i.e. T21 was really aborted
conn02 = db.open()
......@@ -334,39 +339,45 @@ def test_bigfile_filezodb_vs_conn_migration():
c21_2 = NotifyChannel() # T22 -> T12
# open, abort
def T12():
T12done = chan()
@func
def T12(ctx):
defer(T12done.close)
tell, wait = c12_2.tell, c21_2.wait
wait('T2-conn22-opened')
wait(ctx, 'T2-conn22-opened')
conn12 = db.open()
tell('T1-conn12-opened')
wait('T2-zfile2-modified')
tell(ctx, 'T1-conn12-opened')
wait(ctx, 'T2-zfile2-modified')
transaction.abort()
tell('T1-txn-aborted')
wait('T2-txn-committed')
tell(ctx, 'T1-txn-aborted')
wait(ctx, 'T2-txn-committed')
conn12.close()
# open, modify, commit
def T22():
T22done = chan()
@func
def T22(ctx):
defer(T22done.close)
tell, wait = c21_2.tell, c12_2.wait
# make sure we are not the same thread which ran T11
# (should be so because we cared not to stop T11 yet)
assert _thread.get_ident() != t11_ident
assert _thread.get_ident() != T11ident[0]
conn22 = db.open()
assert conn22 is conn01
tell('T2-conn22-opened')
tell(ctx, 'T2-conn22-opened')
# modify zfile and arrange timings so that T1 does abort after we
# modify, but before we commit
wait('T1-conn12-opened')
wait(ctx, 'T1-conn12-opened')
root22 = conn22.root()
a22 = root22['zarray2']
......@@ -375,24 +386,27 @@ def test_bigfile_filezodb_vs_conn_migration():
Blk(vma22, 0)[0] = 22
tell('T2-zfile2-modified')
wait('T1-txn-aborted')
tell(ctx, 'T2-zfile2-modified')
wait(ctx, 'T1-txn-aborted')
# commit - changes should propagate to zfile
transaction.commit()
tell('T2-txn-committed')
tell(ctx, 'T2-txn-committed')
conn22.close()
t12, t22 = Thread(target=T12), Thread(target=T22)
t12.start(); t22.start()
t12.join(); t22.join()
wg.go(T12)
wg.go(T22)
T12done.recv()
T22done.recv()
# tell T11 to stop also
c21_1.tell('T11-exit-command')
t11.join()
def _(ctx):
c21_1.tell(ctx, 'T11-exit-command')
wg.go(_)
wg.wait()
# now verify that zfile2 changed to 22 state, i.e. T22 was really committed
conn03 = db.open()
......
# Wendelin.core.bigfile | Threading tests
# Copyright (C) 2014-2015 Nexedi SA and Contributors.
# Copyright (C) 2014-2021 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -18,37 +18,50 @@
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from wendelin.bigfile import BigFile, WRITEOUT_STORE
from threading import Thread, Lock
from threading import Lock
from time import sleep
from wendelin.bigfile.tests.test_basic import bord_py3
from six.moves import _thread
from golang import chan, select
from golang import context, sync
# Notify channel for
# - one thread to .wait('condition'), until
# - other thread does .tell('condition')
# - one thread to .wait('condition') for
# - other thread to .tell('condition')
class NotifyChannel:
def __init__(self):
self.state = None
self._ch = chan()
def tell(self, condition):
def tell(self, ctx, condition):
#print >>sys.stderr, ' tell %s\tthread_id: %s\n' \
# % (condition, _thread.get_ident()),
# wait until other thread reads previous tell
while self.state is not None:
pass
self.state = condition
_, _rx = select(
ctx.done().recv, # 0
(self._ch.send, condition), # 1
)
if _ == 0:
raise ctx.err()
#print >>sys.stderr, ' told %s\tthread_id: %s\n' \
# % (condition, _thread.get_ident()),
def wait(self, condition):
def wait(self, ctx, condition):
#print >>sys.stderr, ' wait %s\tthread_id: %s\n' \
# % (condition, _thread.get_ident()),
while self.state != condition:
pass
_, _rx = select(
ctx.done().recv, # 0
self._ch.recv, # 1
)
if _ == 0:
raise ctx.err()
got = _rx
if got != condition:
raise RuntimeError('expected %s; got %s' % (condition, got))
#print >>sys.stderr, ' have %s\tthread_id: %s\n' \
# % (condition, _thread.get_ident()),
self.state = None
......@@ -87,16 +100,18 @@ def test_thread_lock_vs_virtmem_lock():
c21 = NotifyChannel() # T2 -> T1
class ZLockBigFile(BigFile):
# .t1ctx - set by T1
def __new__(cls, blksize):
obj = BigFile.__new__(cls, blksize)
return obj
def Zsync_and_lockunlock(self):
tell, wait = c12.tell, c21.wait
ctx = self.t1ctx
# synchronize with invalidate in T2
tell('T1-V-under')
wait('T2-Z-taken')
tell(ctx, 'T1-V-under')
wait(ctx, 'T2-Z-taken')
# this will deadlock, if V is plain lock and calling from under-virtmem
# is done with V held
......@@ -115,34 +130,37 @@ def test_thread_lock_vs_virtmem_lock():
vma = fh.mmap(0, 1)
m = memoryview(vma)
def T1():
def T1(ctx):
f.t1ctx = ctx
m[0] # calls ZLockBigFile.loadblk()
tell, wait = c12.tell, c21.wait
wait('T2-Z-released')
wait(ctx, 'T2-Z-released: 0')
m[0] = bord_py3(b'1') # make page dirty
fh.dirty_writeout(WRITEOUT_STORE) # calls ZLockBigFile.storeblk()
wait(ctx, 'T2-Z-released: 1')
def T2():
def T2(ctx):
tell, wait = c21.tell, c12.wait
# cycle 0: vs loadblk in T0
# cycle 1: vs storeblk in T0
for _ in range(2):
wait('T1-V-under')
wait(ctx, 'T1-V-under')
Z.acquire()
tell('T2-Z-taken')
tell(ctx, 'T2-Z-taken')
fh2.invalidate_page(0) # NOTE invalidating page _not_ of fh
Z.release()
tell('T2-Z-released')
tell(ctx, 'T2-Z-released: %d' % _)
t1, t2 = Thread(target=T1), Thread(target=T2)
t1.start(); t2.start()
t1.join(); t2.join()
wg = sync.WorkGroup(context.background())
wg.go(T1)
wg.go(T2)
wg.wait()
# multiple access from several threads to the same page - block loaded only once
......@@ -162,12 +180,13 @@ def test_thread_multiaccess_sameblk():
vma = fh.mmap(0, 1)
m = memoryview(vma)
def T():
def T(ctx):
m[0] # calls CountBigFile.loadblk()
t1, t2 = Thread(target=T), Thread(target=T)
t1.start(); t2.start()
t1.join(); t2.join()
wg = sync.WorkGroup(context.background())
wg.go(T)
wg.go(T)
wg.wait()
assert d[0] == 1
......@@ -175,14 +194,21 @@ def test_thread_multiaccess_sameblk():
def test_thread_multiaccess_parallel():
# tid -> (T0 -> T<tid>, T<tid> -> T0)
channels = {}
# [0] = channels<T1>
# [1] = channels<T2>
channelv = [None, None]
# tid -> ctx in T<tid>
tidctx = {}
class SyncBigFile(BigFile):
def loadblk(self, blk, buf):
# tell driver we are in loadblk and wait untill it says us to go
cin, cout = channels[_thread.get_ident()]
cout.tell('ready')
cin.wait('go')
tid = _thread.get_ident()
ctx = tidctx[tid]
cin, cout = channels[tid]
cout.tell(ctx, 'ready')
cin.wait(ctx, 'go')
f = SyncBigFile(PS)
fh = f.fileh_open()
......@@ -190,30 +216,39 @@ def test_thread_multiaccess_parallel():
m = memoryview(vma)
def T1():
channels[_thread.get_ident()] = (NotifyChannel(), NotifyChannel())
def T1(ctx):
tid = _thread.get_ident()
tidctx[tid] = ctx
channelv[0] = channels[tid] = (NotifyChannel(), NotifyChannel())
m[0*PS]
def T2():
channels[_thread.get_ident()] = (NotifyChannel(), NotifyChannel())
def T2(ctx):
tid = _thread.get_ident()
tidctx[tid] = ctx
channelv[1] = channels[tid] = (NotifyChannel(), NotifyChannel())
m[1*PS]
t1, t2 = Thread(target=T1), Thread(target=T2)
t1.start(); t2.start()
wg = sync.WorkGroup(context.background())
wg.go(T1)
wg.go(T2)
while len(channels) != 2:
pass
c01, c10 = channels[t1.ident]
c02, c20 = channels[t2.ident]
c01, c10 = channelv[0]
c02, c20 = channelv[1]
def _(ctx):
c10.wait(ctx, 'ready'); c20.wait(ctx, 'ready')
c01.tell(ctx, 'go'); c02.tell(ctx, 'go')
wg.go(_)
c10.wait('ready'); c20.wait('ready')
c01.tell('go'); c02.tell('go')
t1.join(); t2.join()
wg.wait()
# loading vs invalidate of same page in another thread
def test_thread_load_vs_invalidate():
c12 = NotifyChannel() # T1 -> T2
c21 = NotifyChannel() # T2 -> T1
tidctx = {} # tid -> ctx used in T<n>
class RetryBigFile(BigFile):
def __new__(cls, blksize):
......@@ -222,13 +257,14 @@ def test_thread_load_vs_invalidate():
return obj
def loadblk(self, blk, buf):
ctx = tidctx[_thread.get_ident()]
tell, wait = c12.tell, c21.wait
bufmem = memoryview(buf)
# on the first cycle we synchronize with invalidate in T2
if self.cycle == 0:
tell('T1-loadblk0-ready')
wait('T1-loadblk0-go')
tell(ctx, 'T1-loadblk0-ready')
wait(ctx, 'T1-loadblk0-go')
# here we know request to invalidate this page came in and this
# '1' should be ignored by virtmem
bufmem[0] = bord_py3(b'1')
......@@ -246,16 +282,19 @@ def test_thread_load_vs_invalidate():
vma = fh.mmap(0, 1)
m = memoryview(vma)
def T1():
def T1(ctx):
tidctx[_thread.get_ident()] = ctx
assert m[0] == bord_py3(b'2')
def T2():
def T2(ctx):
tidctx[_thread.get_ident()] = ctx
tell, wait = c21.tell, c12.wait
wait('T1-loadblk0-ready')
wait(ctx, 'T1-loadblk0-ready')
fh.invalidate_page(0)
tell('T1-loadblk0-go')
tell(ctx, 'T1-loadblk0-go')
t1, t2 = Thread(target=T1), Thread(target=T2)
t1.start(); t2.start()
t1.join(); t2.join()
wg = sync.WorkGroup(context.background())
wg.go(T1)
wg.go(T2)
wg.wait()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment