Commit ab435b28 authored by Julien Muchembled's avatar Julien Muchembled

client: also vote to nodes that only check serials

Not doing so was an incorrect optimization. Checking serials does take
write-locks and they must not be released when a client-storage connection
breaks between vote and lock, otherwise a concurrent transaction modifying such
serials may finish before.
parent d7245ee9
...@@ -557,7 +557,7 @@ class Application(ThreadedApplication): ...@@ -557,7 +557,7 @@ class Application(ThreadedApplication):
trans_nodes = txn_context.write(self, packet, ttid) trans_nodes = txn_context.write(self, packet, ttid)
packet = Packets.AskVoteTransaction(ttid) packet = Packets.AskVoteTransaction(ttid)
for uuid, status in involved_nodes.iteritems(): for uuid, status in involved_nodes.iteritems():
if status == 1 and uuid not in trans_nodes: if status < 2 and uuid not in trans_nodes:
self._askStorageForWrite(txn_context, uuid, packet) self._askStorageForWrite(txn_context, uuid, packet)
self.waitStoreResponses(txn_context) self.waitStoreResponses(txn_context)
if 2 in involved_nodes.itervalues(): # unlikely if 2 in involved_nodes.itervalues(): # unlikely
......
...@@ -308,6 +308,8 @@ class TransactionManager(EventQueue): ...@@ -308,6 +308,8 @@ class TransactionManager(EventQueue):
dict.fromkeys(transaction.store_dict, ttid)) dict.fromkeys(transaction.store_dict, ttid))
if transaction.voted == 2: if transaction.voted == 2:
self._app.dm.lockTransaction(tid, ttid) self._app.dm.lockTransaction(tid, ttid)
else:
assert transaction.voted
def unlock(self, ttid): def unlock(self, ttid):
""" """
......
...@@ -978,6 +978,7 @@ class Test(NEOThreadedTest): ...@@ -978,6 +978,7 @@ class Test(NEOThreadedTest):
@with_cluster(storage_count=2, partitions=2) @with_cluster(storage_count=2, partitions=2)
def testReadVerifyingStorage(self, cluster): def testReadVerifyingStorage(self, cluster):
s1, s2 = cluster.sortStorageList()
t1, c1 = cluster.getTransaction() t1, c1 = cluster.getTransaction()
c1.root()['x'] = x = PCounter() c1.root()['x'] = x = PCounter()
t1.commit() t1.commit()
...@@ -985,7 +986,7 @@ class Test(NEOThreadedTest): ...@@ -985,7 +986,7 @@ class Test(NEOThreadedTest):
with cluster.newClient(1) as db: with cluster.newClient(1) as db:
t2, c2 = cluster.getTransaction(db) t2, c2 = cluster.getTransaction(db)
r = c2.root() r = c2.root()
r['y'] = None r._p_changed = 1
self.readCurrent(r['x']) self.readCurrent(r['x'])
# Force the new tid to be even, like the modified oid and unlike # Force the new tid to be even, like the modified oid and unlike
# the oid on which we used readCurrent. Thus we check that the node # the oid on which we used readCurrent. Thus we check that the node
...@@ -1000,11 +1001,35 @@ class Test(NEOThreadedTest): ...@@ -1000,11 +1001,35 @@ class Test(NEOThreadedTest):
# transaction before the last one, and clearing the cache before # transaction before the last one, and clearing the cache before
# reloading x. # reloading x.
c1._storage.load(x._p_oid) c1._storage.load(x._p_oid)
t0, t1, t2 = c1.db().storage.iterator() # In particular, check oid 1 is listed in the last transaction.
self.assertEqual(map(u64, t0.oid_list), [0]) self.assertEqual([[0], [0, 1], [0, 1]],
self.assertEqual(map(u64, t1.oid_list), [0, 1]) [map(u64, t.oid_list) for t in c1.db().storage.iterator()])
# Check oid 1 is part of transaction metadata.
self.assertEqual(t2.oid_list, t1.oid_list) # Another test, this time to check we also vote to storage nodes
# that are only involved in checking current serial.
t1.begin()
s2c2, = s2.getConnectionList(db.storage.app)
def t1_vote(txn):
# Check that the storage hasn't answered to the store,
# which means that a lock is still taken for r['x'] by t2.
self.tic()
txn_context = cluster.client._txn_container.get(txn)
empty = txn_context.queue.empty()
ll()
self.assertTrue(empty)
def t2_vote(_):
s2c2.close()
commit1.start()
ll()
TransactionalResource(t1, 0, tpc_vote=t1_vote)
x.value += 1
TransactionalResource(t2, 1, tpc_vote=t2_vote)
r._p_changed = 1
self.readCurrent(r['x'])
with cluster.moduloTID(0), LockLock() as ll:
commit1 = self.newPausedThread(t1.commit)
t2.commit()
commit1.join()
@with_cluster() @with_cluster()
def testClientReconnection(self, cluster): def testClientReconnection(self, cluster):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment