Commit fc20f3aa authored by Kirill Smelkov's avatar Kirill Smelkov

racetest: Add test for disconnect / invalidation race

This currently fails for ZEO (see Bug2 in
https://github.com/zopefoundation/ZEO/issues/209 for details):

    (z-dev) kirr@deca:~/src/wendelin/z/ZEO5$ ZEO_MTACCEPTOR=1 zope-testrunner -fvvvx --test-path=src -t check_race_external_invalidate_vs_disconnect
    /home/kirr/src/wendelin/venv/z-dev/bin/zope-testrunner traceio=True
    /home/kirr/src/wendelin/z/ZEO5/src/ZEO/StorageServer.py:51: DeprecationWarning: The mtacceptor module is deprecated and will be removed in ZEO version 6.
      'in ZEO version 6.', DeprecationWarning)
    Running tests at level 1
    Running .BlobAdaptedFileStorageTests tests:
      Set up .BlobAdaptedFileStorageTests in 0.000 seconds.
      Running:
     check_race_external_invalidate_vs_disconnect (ZEO.tests.testZEO.BlobAdaptedFileStorageTests) (1.889 s)

    Failure in test check_race_external_invalidate_vs_disconnect (ZEO.tests.testZEO.BlobAdaptedFileStorageTests)
    Traceback (most recent call last):
      File "/usr/lib/python2.7/unittest/case.py", line 329, in run
        testMethod()
      File "/home/kirr/src/wendelin/z/ZODB/src/ZODB/tests/racetest.py", line 357, in check_race_external_invalidate_vs_disconnect
        T2ObjectsInc2Phase())
      File "/home/kirr/src/wendelin/z/ZODB/src/ZODB/tests/util.py", line 400, in _
        return f(*argv, **kw)
      File "/home/kirr/src/wendelin/z/ZODB/src/ZODB/tests/racetest.py", line 446, in _check_race_xxx_vs_external_disconnect
        self.fail('\n\n'.join([_ for _ in failure if _]))
      File "/usr/lib/python2.7/unittest/case.py", line 410, in fail
        raise self.failureException(msg)
    AssertionError: T15: obj1 (6) - obj2(4) != phase (1)
    obj1._p_serial: 0x03ea4cc505486777  obj2._p_serial: 0x03ea4cc503413799  phase._p_serial: 0x03ea4cc505486777
    zconn_at: 0x03ea4cc505486777  # approximated as max(serials)
    zstor.loadBefore(obj1, @zconn.at)       ->  serial: 0x03ea4cc505486777  next_serial: None
    zstor.loadBefore(obj2, @zconn.at)       ->  serial: 0x03ea4cc504a74099  next_serial: None
    zstor.loadBefore(phase, @zconn.at)      ->  serial: 0x03ea4cc505486777  next_serial: None
    zstor._cache.clear()
    zstor.loadBefore(obj1, @zconn.at)       ->  serial: 0x03ea4cc505486777  next_serial: None
    zstor.loadBefore(obj2, @zconn.at)       ->  serial: 0x03ea4cc504a74099  next_serial: 0x03ea4cc506104155
    zstor.loadBefore(phase, @zconn.at)      ->  serial: 0x03ea4cc505486777  next_serial: 0x03ea4cc506104155

    T51: obj1 (6) - obj2(4) != phase (1)
    obj1._p_serial: 0x03ea4cc505486777  obj2._p_serial: 0x03ea4cc503413799  phase._p_serial: 0x03ea4cc505486777
    zconn_at: 0x03ea4cc505486777  # approximated as max(serials)
    zstor.loadBefore(obj1, @zconn.at)       ->  serial: 0x03ea4cc505486777  next_serial: None
    zstor.loadBefore(obj2, @zconn.at)       ->  serial: 0x03ea4cc503413799  next_serial: None
    zstor.loadBefore(phase, @zconn.at)      ->  serial: 0x03ea4cc505486777  next_serial: None
    zstor._cache.clear()
    zstor.loadBefore(obj1, @zconn.at)       ->  serial: 0x03ea4cc505486777  next_serial: None
    zstor.loadBefore(obj2, @zconn.at)       ->  serial: 0x03ea4cc504a74099  next_serial: None
    zstor.loadBefore(phase, @zconn.at)      ->  serial: 0x03ea4cc505486777  next_serial: None
parent fa844159
......@@ -93,6 +93,39 @@ class T2ObjectsInc(ISpec):
raise AssertionError("obj1 (%d) != obj2 (%d)" % (i1, i2))
class T2ObjectsInc2Phase(ISpec):
"""T2ObjectsInc2Phase is specification with behaviour where two objects
obj1 and obj2 are incremented in lock-step.
It is used in tests where bugs can be observed on the next transaction
after the race.
invariant: obj1 - obj2 == phase
"""
def init(_, root):
root['obj1'] = MinPO(0)
root['obj2'] = MinPO(0)
root['phase'] = MinPO(0)
def next(_, root):
phase = root['phase']
if phase.value == 0:
root['obj1'].value += 1
else:
root['obj2'].value += 1
phase.value += 1
phase.value %= 2
def assertStateOK(_, root):
i1 = root['obj1'].value
i2 = root['obj2'].value
p = root['phase'].value
if not (i1 - i2 == p):
raise AssertionError("obj1 (%d) - obj2(%d) != phase (%d)" %
(i1, i2, p))
class RaceTests(object):
# verify storage/Connection for race in between load/open and local
......@@ -307,6 +340,111 @@ class RaceTests(object):
if failed.is_set():
self.fail('\n\n'.join([_ for _ in failure if _]))
# verify storage for race in between client disconnect and external
# invalidations. https://github.com/zopefoundation/ZEO/issues/209
#
# This test is simlar to check_race_load_vs_external_invalidate, but
# increases the number of workers and also makes every worker to repeatedly
# reconnect to the storage, so that the probability of disconection is
# high. It also uses T2ObjectsInc2Phase instead of T2ObjectsInc because if
# an invalidation is skipped due to the disconnect/invalidation race,
# T2ObjectsInc won't catch the bug as both objects will be either in old
# state, or in new state after the next transaction. Contrary to that, with
# T2ObjectsInc2Phase the invariant will be detected to be broken on the
# next transaction.
def check_race_external_invalidate_vs_disconnect(self):
return self._check_race_xxx_vs_external_disconnect(
T2ObjectsInc2Phase())
@with_high_concurrency
def _check_race_xxx_vs_external_disconnect(self, spec):
assert isinstance(spec, ISpec)
# init initializes the database according to the spec.
def init():
db = self.dbopen()
_state_init(db, spec)
db.close()
nwork = 8*8 # nwork^2 from _check_race_load_vs_external_invalidate
# T is similar to T from _check_race_load_vs_external_invalidate but
# reconnects to the database often.
failed = threading.Event()
failure = [None] * nwork # [tx] is failure from T(tx)
def T(tx, N):
def t_():
def work1(db):
transaction.begin()
zconn = db.open()
root = zconn.root()
# reload some objects from zstor, while getting others from
# zconn cache
_state_invalidate_half1(root)
try:
spec.assertStateOK(root)
except AssertionError as e:
msg = "T%s: %s\n" % (tx, e)
msg += _state_details(root)
failure[tx] = msg
failed.set()
zconn.close()
transaction.abort()
return
# change objects once in a while
if randint(0, 4) == 0:
# print("T%s: modify" % tx)
spec.next(root)
spec.assertStateOK(root)
try:
transaction.commit()
except POSException.ConflictError:
# print('conflict -> ignore')
transaction.abort()
zconn.close()
db = self.dbopen()
try:
for i in range(4):
if failed.is_set():
break
work1(db)
finally:
db.close()
try:
for i in range(N):
# print('T%s.%d' % (tx, i))
if failed.is_set():
break
t_()
except: # noqa: E722 do not use bare 'except'
failed.set()
raise
# run the workers concurrently.
init()
N = 100 // (2*4) # N reduced to save time
tg = []
for x in range(nwork):
t = threading.Thread(name='T%d' % x, target=T, args=(x, N))
t.start()
tg.append(t)
for t in tg:
t.join(60)
if failed.is_set():
self.fail('\n\n'.join([_ for _ in failure if _]))
# _state_init initializes the database according to the spec.
def _state_init(db, spec):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment