Commit 1578ad9f authored by Julien Muchembled's avatar Julien Muchembled

Fix IStorage.iterator potentially skipping recent transactions

This fixes a critical bug in the storage node that ignored read locks,
so transactions that have just been committed could be missing in the
results.

Because read locks are released by order of tid, a single storage node
never returned a list with holes (i.e. the missing transactions could
only be the last ones). However, holes could happen when merging
results from multiple storage nodes.

The change in the client node avoids extra RPC at the end:
it should speed up in most cases and it makes the new test clearer.
parent 13ef5f15
......@@ -71,4 +71,6 @@ def iterator(app, start=None, stop=None):
break # nothing more
for txn in chunk:
yield Transaction(app, txn)
if stop == max_tid:
break
start = add64(max_tid, 1)
......@@ -157,6 +157,9 @@ class ClientOperationHandler(BaseHandler):
conn.answer(Packets.AnswerRelockObject(conflict))
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition):
app = self.app
if app.tm.isLockedTid(max_tid):
raise DelayEvent
conn.answer(Packets.AnswerTIDsFrom(self.app.dm.getReplicationTIDList(
min_tid, max_tid, length, partition)))
......
......@@ -515,7 +515,7 @@ class Test(NEOThreadedTest):
self.assertIn(get_ident(), conflicts)
@with_cluster()
def testDelayedLoad(self, cluster):
def testReadLockVsAskObject(self, cluster):
"""
Check that a storage node delays reads from the database,
when the requested data may still be in a temporary place.
......@@ -549,6 +549,47 @@ class Test(NEOThreadedTest):
self.assertEqual(idle, [1, 0])
self.assertIn('', r)
@with_cluster(partitions=2, storage_count=2)
def testReadLockVsAskTIDsFrom(self, cluster):
"""
Similar to testReadLockVsAskObject, here to check IStorage.iterator
"""
l = threading.Lock()
l.acquire()
idle = []
def askTIDsFrom(orig, *args):
try:
orig(*args)
finally:
idle.append(s1.em.isIdle())
l.release()
s0, s1 = cluster.sortStorageList()
t, c = cluster.getTransaction()
t0 = cluster.last_tid
r = c.root()
r[''] = ''
with Patch(ClientOperationHandler(s1), askTIDsFrom=askTIDsFrom):
with cluster.master.filterConnection(s1) as m2s1:
m2s1.delayNotifyUnlockInformation()
with cluster.moduloTID(1):
t.commit()
t1 = cluster.last_tid
r._p_changed = 1
with cluster.moduloTID(0):
t.commit()
t2 = cluster.last_tid
tid_list = []
cluster.client._cache.clear()
@self.newThread
def iterTrans():
for txn in c.db().storage.iterator():
tid_list.append(txn.tid)
l.acquire()
l.acquire()
iterTrans.join()
self.assertEqual(idle, [1, 0])
self.assertEqual(tid_list, [t0, t1, t2])
@with_cluster(replicas=1)
def test_notifyNodeInformation(self, cluster):
# translated from MasterNotificationsHandlerTests
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment