Commit 5b4dd5f7 authored by Kirill Smelkov's avatar Kirill Smelkov

tests: Add test for open vs invalidation race

Add test that exercises open vs invalidation race condition that, if
happen, leads to data corruption. We are seeing such race happening on
storage level in ZEO (https://github.com/zopefoundation/ZEO/issues/166),
and previously we've seen it also to happen on Connection level
(https://github.com/zopefoundation/ZODB/issues/290). By adding this test
to be exercised wrt all storages we make sure that all storages stay
free from this race.

And it payed out. Besides catching original problems from
https://github.com/zopefoundation/ZODB/issues/290 and
https://github.com/zopefoundation/ZEO/issues/166 , this test also
discovered a concurrency bug in MVCCMappingStorage:

    Failure in test check_race_open_vs_invalidate (ZODB.tests.testMVCCMappingStorage.MVCCMappingStorageTests)
    Traceback (most recent call last):
      File "/usr/lib/python2.7/unittest/case.py", line 329, in run
        testMethod()
      File "/home/kirr/src/wendelin/z/ZODB/src/ZODB/tests/BasicStorage.py", line 492, in check_race_open_vs_invalidate
        self.fail(failure[0])
      File "/usr/lib/python2.7/unittest/case.py", line 410, in fail
        raise self.failureException(msg)
    AssertionError: T1: obj1.value (24)  !=  obj2.value (23)

The problem with MVCCMappingStorage was that instance.poll_invalidations
was correctly taking main_lock with intention to make sure main data is
not mutated during analysis, but instance.tpc_finish and
instance.tpc_abort did _not_ taken main lock, which was leading to
committed data to be propagating into main storage in non-atomic way.

This bug was also observable if both obj1 and obj2 in the added test
were always loaded from the storage (added obj2._p_invalidate after
obj1._p_invalidate).

-> Fix MVCCMappingStorage by correctly locking main MVCCMappingStorage
instance when processing transaction completion.

/cc @d-maurer, @jamadden, @jmuchemb
/reviewed-on https://github.com/zopefoundation/ZODB/pull/345
parent 0963193f
......@@ -18,7 +18,8 @@ http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storag
All storages should be able to pass these tests.
"""
from ZODB import POSException
import transaction
from ZODB import DB, POSException
from ZODB.Connection import TransactionMetaData
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
......@@ -385,3 +386,97 @@ class BasicStorage(object):
self.assertEqual(results.pop('lastTransaction'), tids[1])
for m, tid in results.items():
self.assertEqual(tid, tids[1])
# verify storage/Connection for race in between load/open and local invalidations.
# https://github.com/zopefoundation/ZEO/issues/166
# https://github.com/zopefoundation/ZODB/issues/290
def check_race_loadopen_vs_local_invalidate(self):
db = DB(self._storage)
# init initializes the database with two integer objects - obj1/obj2
# that are set to 0.
def init():
transaction.begin()
zconn = db.open()
root = zconn.root()
root['obj1'] = MinPO(0)
root['obj2'] = MinPO(0)
transaction.commit()
zconn.close()
# verify accesses obj1/obj2 and verifies that obj1.value == obj2.value
#
# access to obj1 is organized to always trigger loading from zstor.
# access to obj2 goes through zconn cache and so verifies whether the
# cache is not stale.
failed = threading.Event()
failure = [None]
def verify():
transaction.begin()
zconn = db.open()
root = zconn.root()
obj1 = root['obj1']
obj2 = root['obj2']
# obj1 - reload it from zstor
# obj2 - get it from zconn cache
obj1._p_invalidate()
# both objects must have the same values
v1 = obj1.value
v2 = obj2.value
if v1 != v2:
failure[0] = "verify: obj1.value (%d) != obj2.value (%d)" % (v1, v2)
failed.set()
transaction.abort() # we did not changed anything; also fails with commit
zconn.close()
# modify changes obj1/obj2 by doing `objX.value += 1`.
#
# Since both objects start from 0, the invariant that
# `obj1.value == obj2.value` is always preserved.
def modify():
transaction.begin()
zconn = db.open()
root = zconn.root()
obj1 = root['obj1']
obj2 = root['obj2']
obj1.value += 1
obj2.value += 1
assert obj1.value == obj2.value
transaction.commit()
zconn.close()
# xrun runs f in a loop until either N iterations, or until failed is set.
def xrun(f, N):
try:
for i in range(N):
#print('%s.%d' % (f.__name__, i))
f()
if failed.is_set():
break
except:
failed.set()
raise
# loop verify and modify concurrently.
init()
N = 500
tverify = threading.Thread(name='Tverify', target=xrun, args=(verify, N))
tmodify = threading.Thread(name='Tmodify', target=xrun, args=(modify, N))
tverify.start()
tmodify.start()
tverify.join(60)
tmodify.join(60)
if failed.is_set():
self.fail(failure[0])
......@@ -112,11 +112,13 @@ class MVCCMappingStorage(MappingStorage):
def tpc_finish(self, transaction, func = lambda tid: None):
self._data_snapshot = None
return MappingStorage.tpc_finish(self, transaction, func)
with self._main_lock:
return MappingStorage.tpc_finish(self, transaction, func)
def tpc_abort(self, transaction):
self._data_snapshot = None
MappingStorage.tpc_abort(self, transaction)
with self._main_lock:
MappingStorage.tpc_abort(self, transaction)
def pack(self, t, referencesf, gc=True):
# prevent all concurrent commits during packing
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment