Commit 59698faa authored by Julien Muchembled's avatar Julien Muchembled

master: notify replicating nodes of aborted watched transactions

This fixes stuck replication when a client loses connection to the master
during a commit.
parent edf58ece
...@@ -596,3 +596,12 @@ class Application(BaseApplication): ...@@ -596,3 +596,12 @@ class Application(BaseApplication):
def getStorageReadySet(self, readiness=float('inf')): def getStorageReadySet(self, readiness=float('inf')):
return {k for k, v in self.storage_ready_dict.iteritems() return {k for k, v in self.storage_ready_dict.iteritems()
if v <= readiness} if v <= readiness}
def notifyTransactionAborted(self, ttid, uuids):
uuid_set = self.getStorageReadySet()
uuid_set.intersection_update(uuids)
if uuid_set:
p = Packets.AbortTransaction(ttid, ())
getByUUID = self.nm.getByUUID
for uuid in uuid_set:
getByUUID(uuid).send(p)
...@@ -27,7 +27,8 @@ class ClientServiceHandler(MasterHandler): ...@@ -27,7 +27,8 @@ class ClientServiceHandler(MasterHandler):
app = self.app app = self.app
node = app.nm.getByUUID(conn.getUUID()) node = app.nm.getByUUID(conn.getUUID())
assert node is not None, conn assert node is not None, conn
app.tm.clientLost(node) for x in app.tm.clientLost(node):
app.notifyTransactionAborted(*x)
node.setUnknown() node.setUnknown()
app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
...@@ -118,12 +119,7 @@ class ClientServiceHandler(MasterHandler): ...@@ -118,12 +119,7 @@ class ClientServiceHandler(MasterHandler):
app = self.app app = self.app
involved = app.tm.abort(tid, conn.getUUID()) involved = app.tm.abort(tid, conn.getUUID())
involved.update(uuid_list) involved.update(uuid_list)
involved.intersection_update(app.getStorageReadySet()) app.notifyTransactionAborted(tid, involved)
if involved:
p = Packets.AbortTransaction(tid, ())
getByUUID = app.nm.getByUUID
for involved in involved:
getByUUID(involved).send(p)
# like ClientServiceHandler but read-only & only for tid <= backup_tid # like ClientServiceHandler but read-only & only for tid <= backup_tid
......
...@@ -444,7 +444,9 @@ class TransactionManager(EventQueue): ...@@ -444,7 +444,9 @@ class TransactionManager(EventQueue):
def clientLost(self, node): def clientLost(self, node):
for txn in self._ttid_dict.values(): for txn in self._ttid_dict.values():
if txn.clientLost(node): if txn.clientLost(node):
del self[txn.getTTID()] tid = txn.getTTID()
del self[tid]
yield tid, txn.getNotificationUUIDList()
def log(self): def log(self):
logging.info('Transactions:') logging.info('Transactions:')
......
...@@ -61,8 +61,8 @@ class testTransactionManager(NeoUnitTestBase): ...@@ -61,8 +61,8 @@ class testTransactionManager(NeoUnitTestBase):
tid1 = self.getNextTID() tid1 = self.getNextTID()
tid2 = self.getNextTID() tid2 = self.getNextTID()
tm.begin(node1, 0, tid1) tm.begin(node1, 0, tid1)
tm.clientLost(node1) self.assertEqual(1, len(list(tm.clientLost(node1))))
self.assertTrue(tid1 not in tm) self.assertNotIn(tid1, tm)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment