diff --git a/neo/master/tests/testMasterService.py b/neo/master/tests/testMasterService.py new file mode 100644 index 0000000000000000000000000000000000000000..edae3d2e2542c55bd9fe7008cef122224a741ef4 --- /dev/null +++ b/neo/master/tests/testMasterService.py @@ -0,0 +1,1041 @@ +import os +import unittest +import logging +from tempfile import mkstemp +from mock import Mock +from struct import pack, unpack +from neo.protocol import Packet, INVALID_UUID +from neo.master.service import ServiceEventHandler +from neo.master.app import Application +from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \ + PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \ + REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \ + STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \ + ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \ + ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \ + ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \ + DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_NEW_TID, ANSWER_NEW_TID, \ + FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \ + NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \ + ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \ + ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \ + ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \ + ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \ + ASK_OIDS, ANSWER_OIDS, \ + NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \ + PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \ + INTERNAL_ERROR_CODE, \ + STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, MASTER_NODE_TYPE, \ + RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ + UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, DISCARDED_STATE +from neo.exception import OperationFailure, ElectionFailure +from neo.node import MasterNode, StorageNode + +class MasterServiceTests(unittest.TestCase): + + def setUp(self): + logging.basicConfig(level = logging.DEBUG) + # create an application object + config_file_text = """# Default parameters. +[DEFAULT] +# The list of master nodes. +master_nodes: 127.0.0.1:10010 +# The number of replicas. +replicas: 2 +# The number of partitions. +partitions: 1009 +# The name of this cluster. +name: main +# The user name for the database. +user: neo +# The password for the database. +password: neo + +# The first master. +[mastertest] +server: 127.0.0.1:10010 + +# The first storage. +[storage1] +database: neotest1 +server: 127.0.0.1:10020 + +# The second storage. +[storage2] +database: neotest2 +server: 127.0.0.1:10021 + +# The third storage. +[storage3] +database: neotest3 +server: 127.0.0.1:10022 + +# The fourth storage. +[storage4] +database: neotest4 +server: 127.0.0.1:10023 +""" + tmp_id, self.tmp_path = mkstemp() + tmp_file = os.fdopen(tmp_id, "w+b") + tmp_file.write(config_file_text) + tmp_file.close() + self.app = Application(self.tmp_path, "mastertest") + self.app.pt.clear() + self.app.finishing_transaction_dict = {} + for server in self.app.master_node_list: + self.app.nm.add(MasterNode(server = server)) + self.service = ServiceEventHandler(self.app) + # define some variable to simulate client and storage node + self.client_port = 11022 + self.storage_port = 10021 + self.master_port = 10010 + + def tearDown(self): + # Delete tmp file + os.remove(self.tmp_path) + + # Method to test the kind of packet returned in answer + def checkCalledAbort(self, conn, packet_number=0): + """Check the abort method has been called and an error packet has been sent""" + self.assertEquals(len(conn.mockGetNamedCalls("addPacket")), 1) # XXX required here ???? + self.assertEquals(len(conn.mockGetNamedCalls("abort")), 1) + self.assertEquals(len(conn.mockGetNamedCalls("expectMessage")), 0) + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), ERROR) + + def checkCalledAcceptNodeIdentification(self, conn, packet_number=0): + """ Check Accept Node Identification has been send""" + self.assertEquals(len(conn.mockGetNamedCalls("addPacket")), 1) + self.assertEquals(len(conn.mockGetNamedCalls("abort")), 0) + self.assertEquals(len(conn.mockGetNamedCalls("expectMessage")), 1) + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), ACCEPT_NODE_IDENTIFICATION) + + def checkCalledNotifyNodeInformation(self, conn, packet_number=0): + """ Check Notify Node Information message has been send""" + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), NOTIFY_NODE_INFORMATION) + + def checkCalledAnswerPrimaryMaster(self, conn, packet_number=0): + """ Check Answer primaty master message has been send""" + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), ANSWER_PRIMARY_MASTER) + + def checkCalledSendPartitionTable(self, conn, packet_number=0): + """ Check partition table has been send""" + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), SEND_PARTITION_TABLE) + + def checkCalledStartOperation(self, conn, packet_number=0): + """ Check start operation message has been send""" + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), START_OPERATION) + + def checkCalledLockInformation(self, conn, packet_number=0): + """ Check start operation message has been send""" + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), LOCK_INFORMATION) + + def checkCalledUnlockInformation(self, conn, packet_number=0): + """ Check start operation message has been send""" + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), UNLOCK_INFORMATION) + + def checkCalledNotifyTransactionFinished(self, conn, packet_number=0): + """ Check start operation message has been send""" + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), NOTIFY_TRANSACTION_FINISHED) + + def checkCalledAnswerLastIDs(self, conn, packet_number=0): + """ Check start operation message has been send""" + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), ANSWER_LAST_IDS) + return packet._decodeAnswerLastIDs() + + def checkCalledAnswerUnfinishedTransactions(self, conn, packet_number=0): + """ Check start operation message has been send""" + call = conn.mockGetNamedCalls("addPacket")[packet_number] + packet = call.getParam(0) + self.assertTrue(isinstance(packet, Packet)) + self.assertEquals(packet.getType(), ANSWER_UNFINISHED_TRANSACTIONS) + return packet._decodeAnswerUnfinishedTransactions() + + # Common methods + def getNewUUID(self): + uuid = INVALID_UUID + while uuid == INVALID_UUID: + uuid = os.urandom(16) + self.uuid = uuid + return uuid + + def getLastUUID(self): + return self.uuid + + def identifyToMasterNode(self, node_type=STORAGE_NODE_TYPE, ip="127.0.0.1", + port=10021): + """Do first step of identification to MN + """ + packet = Packet(msg_id=1) + uuid = self.getNewUUID() + # test alien cluster + conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None}) + self.service.handleRequestNodeIdentification(conn, + packet=packet, + node_type=node_type, + uuid=uuid, + ip_address=ip, + port=port, + name=self.app.name,) + self.checkCalledAcceptNodeIdentification(conn) + return uuid + + # Tests + def test_01_handleRequestNodeIdentification(self): + service = self.service + packet = Packet(msg_id=1) + uuid = self.getNewUUID() + # test alien cluster + conn = Mock({"addPacket" : None, "abort" : None}) + ptid = self.app.lptid + service.handleRequestNodeIdentification(conn, + packet=packet, + node_type=STORAGE_NODE_TYPE, + uuid=uuid, + ip_address='127.0.0.1', + port=self.storage_port, + name="INVALID_NAME",) + self.checkCalledAbort(conn) + self.assertEquals(len(self.app.nm.getStorageNodeList()), 0) + self.assertEquals(self.app.lptid, ptid) + + # test connection of a storage node + conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None}) + ptid = self.app.lptid + service.handleRequestNodeIdentification(conn, + packet=packet, + node_type=STORAGE_NODE_TYPE, + uuid=uuid, + ip_address='127.0.0.1', + port=self.storage_port, + name=self.app.name,) + self.checkCalledAcceptNodeIdentification(conn) + self.assertEquals(len(self.app.nm.getStorageNodeList()), 1) + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getServer(), ('127.0.0.1', self.storage_port)) + self.assertEquals(sn.getUUID(), uuid) + self.assertEquals(sn.getState(), RUNNING_STATE) + self.failUnless(self.app.lptid > ptid) + + # send message again for the same storage node, MN must recognize it + conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None}) + ptid = self.app.lptid + service.handleRequestNodeIdentification(conn, + packet=packet, + node_type=STORAGE_NODE_TYPE, + uuid=uuid, + ip_address='127.0.0.1', + port=self.storage_port, + name=self.app.name,) + self.checkCalledAcceptNodeIdentification(conn) + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getServer(), ('127.0.0.1', self.storage_port)) + self.assertEquals(sn.getUUID(), uuid) + self.assertEquals(sn.getState(), RUNNING_STATE) + # No change of partition table + self.assertEquals(self.app.lptid, ptid) + + # send message again for the same storage node but different uuid + # must be rejected as SN is considered as running + conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None}) + ptid = self.app.lptid + new_uuid = self.getNewUUID() + + service.handleRequestNodeIdentification(conn, + packet=packet, + node_type=STORAGE_NODE_TYPE, + uuid=new_uuid, + ip_address='127.0.0.1', + port=self.storage_port, + name=self.app.name,) + self.checkCalledAbort(conn) + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getServer(), ('127.0.0.1', self.storage_port)) + self.assertEquals(sn.getUUID(), uuid) + self.assertEquals(sn.getState(), RUNNING_STATE) + # No change of partition table + self.assertEquals(self.app.lptid, ptid) + + # same test, but set SN as not running before + # this new node must replaced the old one + conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None}) + ptid = self.app.lptid + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getState(), RUNNING_STATE) + sn.setState(TEMPORARILY_DOWN_STATE) + self.assertEquals(sn.getState(), TEMPORARILY_DOWN_STATE) + + service.handleRequestNodeIdentification(conn, + packet=packet, + node_type=STORAGE_NODE_TYPE, + uuid=new_uuid, + ip_address='127.0.0.1', + port=self.storage_port, + name=self.app.name,) + self.checkCalledAcceptNodeIdentification(conn) + self.assertEquals(len(self.app.nm.getStorageNodeList()), 1) + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getServer(), ('127.0.0.1', self.storage_port)) + self.assertEquals(sn.getUUID(), new_uuid) + self.assertEquals(sn.getState(), RUNNING_STATE) + # Partition table changed + self.failUnless(self.app.lptid > ptid) + uuid = new_uuid + + # send message again for the same storage node but different address + # must be rejected as SN is considered as running + conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None}) + ptid = self.app.lptid + service.handleRequestNodeIdentification(conn, + packet=packet, + node_type=STORAGE_NODE_TYPE, + uuid=uuid, + ip_address='127.0.0.1', + port=10022, + name=self.app.name,) + self.checkCalledAbort(conn) + self.assertEquals(len(self.app.nm.getStorageNodeList()), 1) + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getServer(), ('127.0.0.1', self.storage_port)) + self.assertEquals(sn.getUUID(), uuid) + self.assertEquals(sn.getState(), RUNNING_STATE) + # No change of partition table + self.assertEquals(self.app.lptid, ptid) + + # same test, but set SN as not running before + # this new node must replaced the old one + conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None}) + ptid = self.app.lptid + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getState(), RUNNING_STATE) + sn.setState(TEMPORARILY_DOWN_STATE) + self.assertEquals(sn.getState(), TEMPORARILY_DOWN_STATE) + service.handleRequestNodeIdentification(conn, + packet=packet, + node_type=STORAGE_NODE_TYPE, + uuid=uuid, + ip_address='127.0.0.1', + port=10022, + name=self.app.name,) + self.checkCalledAcceptNodeIdentification(conn) + self.assertEquals(len(self.app.nm.getStorageNodeList()), 1) + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getServer(), ('127.0.0.1', 10022)) + self.assertEquals(sn.getUUID(), new_uuid) + self.assertEquals(sn.getState(), RUNNING_STATE) + # Partition table changed + self.failUnless(self.app.lptid > ptid) + + # mark the node as broken and request identification, this must be forbidden + conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None}) + ptid = self.app.lptid + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getState(), RUNNING_STATE) + sn.setState(BROKEN_STATE) + self.assertEquals(sn.getState(), BROKEN_STATE) + + service.handleRequestNodeIdentification(conn, + packet=packet, + node_type=STORAGE_NODE_TYPE, + uuid=uuid, + ip_address='127.0.0.1', + port=10022, + name=self.app.name,) + self.checkCalledAbort(conn) + self.assertEquals(len(self.app.nm.getStorageNodeList()), 1) + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getServer(), ('127.0.0.1', 10022)) + self.assertEquals(sn.getUUID(), new_uuid) + self.assertEquals(sn.getState(), BROKEN_STATE) + # No change of partition table + self.assertEqual(self.app.lptid, ptid) + + + def test_02_handleAskPrimaryMaster(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=2) + # test answer to a storage node + conn = Mock({"addPacket" : None, + "answerPrimaryMaster" : None, + "notifyNodeInformation" : None, + "sendPartitionTable" : None, + "getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + service.handleAskPrimaryMaster(conn, packet) + self.assertEquals(len(conn.mockGetNamedCalls("addPacket")), 5) + self.assertEquals(len(conn.mockGetNamedCalls("abort")), 0) + self.checkCalledAnswerPrimaryMaster(conn, 0) + self.checkCalledNotifyNodeInformation(conn, 1) + self.checkCalledSendPartitionTable(conn, 2) + self.checkCalledSendPartitionTable(conn, 3) + self.checkCalledStartOperation(conn, 4) + + # Same but identify as a client node, must not get start operation message + uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=11021) + packet = Packet(msg_id=2) + conn = Mock({"addPacket" : None, "abort" : None, "answerPrimaryMaster" : None, + "notifyNodeInformation" : None, "sendPartitionTable" : None, + "getUUID" : uuid, "getAddress" : ("127.0.0.1", 11021)}) + service.handleAskPrimaryMaster(conn, packet) + self.assertEquals(len(conn.mockGetNamedCalls("addPacket")), 4) + self.assertEquals(len(conn.mockGetNamedCalls("abort")), 0) + self.checkCalledAnswerPrimaryMaster(conn, 0) + self.checkCalledNotifyNodeInformation(conn, 1) + self.checkCalledSendPartitionTable(conn, 2) + self.checkCalledSendPartitionTable(conn, 3) + + def test_03_handleAnnouncePrimaryMaster(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=3, msg_type=ANNOUNCE_PRIMARY_MASTER) + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + # must do a relection + self.assertRaises(ElectionFailure, service.handleAnnouncePrimaryMaster, conn, packet) + # if no uuid in conn, no reelection done + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + service.handleAnnouncePrimaryMaster(conn, packet) + self.checkCalledAbort(conn) + + + def test_04_handleReelectPrimaryMaster(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=4, msg_type=REELECT_PRIMARY_MASTER) + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + # must do a relection + self.assertRaises(ElectionFailure, service.handleReelectPrimaryMaster, conn, packet) + + + def test_05_handleNotifyNodeInformation(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=5, msg_type=NOTIFY_NODE_INFORMATION) + # do not answer if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + service.handleNotifyNodeInformation(conn, packet, node_list) + self.checkCalledAbort(conn) + # tell the master node that is not running any longer, it must raises + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [(MASTER_NODE_TYPE, '127.0.0.1', self.master_port, self.app.uuid, DOWN_STATE),] + self.assertRaises(RuntimeError, service.handleNotifyNodeInformation, conn, packet, node_list) + # tell the master node that it's running, nothing change + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [(MASTER_NODE_TYPE, '127.0.0.1', self.master_port, self.app.uuid, RUNNING_STATE),] + service.handleNotifyNodeInformation(conn, packet, node_list) + for call in conn.mockGetAllCalls(): + self.assertEquals(call.getName(), "getUUID") + # notify about a client node, don't care + new_uuid = self.getNewUUID() + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [(CLIENT_NODE_TYPE, '127.0.0.1', self.client_port, new_uuid, BROKEN_STATE),] + service.handleNotifyNodeInformation(conn, packet, node_list) + for call in conn.mockGetAllCalls(): + self.assertEquals(call.getName(), "getUUID") + # notify about an unknown node, don't care + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [(STORAGE_NODE_TYPE, '127.0.0.1', 11010, new_uuid, BROKEN_STATE),] + service.handleNotifyNodeInformation(conn, packet, node_list) + for call in conn.mockGetAllCalls(): + self.assertEquals(call.getName(), "getUUID") + # notify about a known node but with bad address, don't care + self.app.nm.add(StorageNode(("127.0.0.1", 11011), self.getNewUUID())) + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [(STORAGE_NODE_TYPE, '127.0.0.1', 11012, uuid, BROKEN_STATE),] + service.handleNotifyNodeInformation(conn, packet, node_list) + for call in conn.mockGetAllCalls(): + self.assertEquals(call.getName(), "getUUID") + # notify node is running, as PMN already know it, nothing is done + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [(STORAGE_NODE_TYPE, '127.0.0.1', self.storage_port, uuid, RUNNING_STATE),] + service.handleNotifyNodeInformation(conn, packet, node_list) + for call in conn.mockGetAllCalls(): + self.assertEquals(call.getName(), "getUUID") + # notify node is temp down, must be taken into account + ptid = self.app.lptid + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [(STORAGE_NODE_TYPE, '127.0.0.1', self.storage_port, uuid, TEMPORARILY_DOWN_STATE),] + service.handleNotifyNodeInformation(conn, packet, node_list) + for call in conn.mockGetAllCalls(): + self.assertEquals(call.getName(), "getUUID") + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getState(), TEMPORARILY_DOWN_STATE) + self.assertEquals(ptid, self.app.lptid) + # notify node is broken, must be taken into account and partition must changed + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [(STORAGE_NODE_TYPE, '127.0.0.1', self.storage_port, uuid, BROKEN_STATE),] + service.handleNotifyNodeInformation(conn, packet, node_list) + for call in conn.mockGetAllCalls(): + self.assertEquals(call.getName(), "getUUID") + sn = self.app.nm.getStorageNodeList()[0] + self.assertEquals(sn.getState(), BROKEN_STATE) + self.failUnless(ptid < self.app.lptid) + + def test_06_handleAnswerLastIDs(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=5, msg_type=ANSWER_LAST_IDS) + loid = self.app.loid + ltid = self.app.ltid + lptid = self.app.lptid + # do not answer if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + service.handleAnswerLastIDs(conn, packet, None, None, None) + self.checkCalledAbort(conn) + self.assertEquals(loid, self.app.loid) + self.assertEquals(ltid, self.app.ltid) + self.assertEquals(lptid, self.app.lptid) + # do not care if client node call it + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + node_list = [] + service.handleAnswerLastIDs(conn, packet, None, None, None) + self.checkCalledAbort(conn) + self.assertEquals(loid, self.app.loid) + self.assertEquals(ltid, self.app.ltid) + self.assertEquals(lptid, self.app.lptid) + # send information which are later to what PMN knows, this must raise + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + new_ptid = unpack('!Q', lptid)[0] + new_ptid = pack('!Q', new_ptid + 1) + self.failUnless(new_ptid > self.app.lptid) + self.assertRaises(OperationFailure, service.handleAnswerLastIDs, conn, packet, None, None, new_ptid) + self.assertEquals(loid, self.app.loid) + self.assertEquals(ltid, self.app.ltid) + self.assertEquals(lptid, self.app.lptid) + + + def test_07_handleAskNewTID(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=5, msg_type=ASK_NEW_TID) + ltid = self.app.ltid + # do not answer if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + service.handleAskNewTID(conn, packet) + self.checkCalledAbort(conn) + self.assertEquals(ltid, self.app.ltid) + # do not care if storage node call it + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + self.assertRaises(OperationFailure, service.handleAskNewTID, conn, packet) + self.checkCalledAbort(conn) + self.assertEquals(ltid, self.app.ltid) + # client call it + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + service.handleAskNewTID(conn, packet) + self.failUnless(ltid < self.app.ltid) + self.assertEquals(len(self.app.finishing_transaction_dict), 1) + tid = self.app.finishing_transaction_dict.keys()[0] + self.assertEquals(tid, self.app.ltid) + + + def test_08_handleAskNewOIDs(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=5, msg_type=ASK_NEW_OIDS) + loid = self.app.loid + # do not answer if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + service.handleAskNewOIDs(conn, packet, 1) + self.checkCalledAbort(conn) + self.assertEquals(loid, self.app.loid) + # do not care if storage node call it + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + self.assertRaises(OperationFailure, service.handleAskNewOIDs, conn, packet, 1) + self.checkCalledAbort(conn) + self.assertEquals(loid, self.app.loid) + # client call it + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + service.handleAskNewOIDs(conn, packet, 1) + self.failUnless(loid < self.app.loid) + + def test_09_handleFinishTransaction(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=9, msg_type=FINISH_TRANSACTION) + # do not answer if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + service.handleFinishTransaction(conn, packet, [], None, ) + self.checkCalledAbort(conn) + # do not care if storage node call it + storage_conn = conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port), + "getSockect" : uuid}) + node_list = [] + self.assertRaises(OperationFailure, service.handleFinishTransaction, conn, packet, [], None) + self.checkCalledAbort(conn) + # give an older tid than the PMN known, must abort + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + oid_list = [] + upper, lower = unpack('!LL', self.app.ltid) + new_tid = pack('!LL', upper, lower + 10) + service.handleFinishTransaction(conn, packet, oid_list, new_tid) + self.checkCalledAbort(conn) + # do the right job + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) + storage_uuid = self.identifyToMasterNode() + storage_conn = Mock({"getUUID" : storage_uuid, + "getAddress" : ("127.0.0.1", self.storage_port), + "getSockect" : 1, + "getDescriptor" : 1}) + self.app.em.register(storage_conn) + self.assertNotEquals(uuid, client_uuid) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + service.handleAskNewTID(conn, packet) + oid_list = [] + tid = self.app.ltid + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + service.handleFinishTransaction(conn, packet, oid_list, tid) + self.checkCalledLockInformation(storage_conn) + self.assertEquals(len(storage_conn.mockGetNamedCalls("expectMessage")), 1) + self.assertEquals(len(self.app.finishing_transaction_dict), 1) + apptid = self.app.finishing_transaction_dict.keys()[0] + self.assertEquals(tid, apptid) + txn = self.app.finishing_transaction_dict.values()[0] + self.assertEquals(len(txn.getOIDList()), 0) + self.assertEquals(len(txn.getUUIDSet()), 1) + self.assertEquals(txn.getMessageId(), 9) + + def test_10_handleNotifyInformationLocked(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=10, msg_type=NOTIFY_INFORMATION_LOCKED) + # do not answer if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + service.handleNotifyInformationLocked(conn, packet, None, ) + self.checkCalledAbort(conn) + # do not care if client node call it + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=11021) + client = conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", 11021), + "getSockect" : client_uuid}) + service.handleNotifyInformationLocked(conn, packet, None) + self.checkCalledAbort(conn) + # give an older tid than the PMN known, must abort + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + oid_list = [] + upper, lower = unpack('!LL', self.app.ltid) + new_tid = pack('!LL', upper, lower + 10) + self.assertRaises(OperationFailure, service.handleNotifyInformationLocked, conn, packet, new_tid) + self.checkCalledAbort(conn) + + # do the right job + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) + storage_uuid_1 = self.identifyToMasterNode() + storage_uuid_2 = self.identifyToMasterNode(port=10022) + storage_conn_1 = Mock({"getUUID" : storage_uuid_1, + "getAddress" : ("127.0.0.1", self.storage_port), + "getSockect" : 1, + "getDescriptor" : 1}) + self.app.em.register(storage_conn_1) + storage_conn_2 = Mock({"getUUID" : storage_uuid_2, + "getAddress" : ("127.0.0.1", 10022), + "getSockect" : 2, + "getDescriptor" : 2}) + self.app.em.register(storage_conn_2) + + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port), + "getSockect" : 3, + "getDescriptor" : 3}) + self.app.em.register(conn) + service.handleAskNewTID(conn, packet) + oid_list = [] + tid = self.app.ltid + service.handleFinishTransaction(conn, packet, oid_list, tid) + self.checkCalledLockInformation(storage_conn_1) + self.checkCalledLockInformation(storage_conn_2) + self.assertFalse(self.app.finishing_transaction_dict.values()[0].allLocked()) + self.assertEquals(len(storage_conn_1.mockGetNamedCalls("addPacket")), 1) + service.handleNotifyInformationLocked(storage_conn_1, packet, tid) + self.assertEquals(len(storage_conn_1.mockGetNamedCalls("addPacket")), 1) + self.assertFalse(self.app.finishing_transaction_dict.values()[0].allLocked()) + service.handleNotifyInformationLocked(storage_conn_2, packet, tid) + self.assertEquals(len(storage_conn_1.mockGetNamedCalls("addPacket")), 2) + self.assertEquals(len(storage_conn_2.mockGetNamedCalls("addPacket")), 2) + self.checkCalledLockInformation(storage_conn_1) + self.checkCalledLockInformation(storage_conn_2) + self.checkCalledNotifyTransactionFinished(conn, 1) + + + def test_11_handleAbortTransaction(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=11, msg_type=ABORT_TRANSACTION) + # do not answer if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + service.handleAbortTransaction(conn, packet, None, ) + self.checkCalledAbort(conn) + # do not answer if not a client + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + node_list = [] + self.assertRaises(OperationFailure, service.handleAbortTransaction, conn, packet, None) + self.checkCalledAbort(conn) + # give a bad tid, must not failed, just ignored it + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + + self.assertEqual(len(self.app.finishing_transaction_dict.keys()), 0) + service.handleAbortTransaction(conn, packet, None) + self.assertEqual(len(self.app.finishing_transaction_dict.keys()), 0) + # give a known tid + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + tid = self.app.ltid + self.app.finishing_transaction_dict[tid] = None + self.assertEqual(len(self.app.finishing_transaction_dict.keys()), 1) + service.handleAbortTransaction(conn, packet, tid) + self.assertEqual(len(self.app.finishing_transaction_dict.keys()), 0) + + + def test_12_handleAskLastIDs(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=11, msg_type=ASK_LAST_IDS) + # do not answer if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + service.handleAskLastIDs(conn, packet ) + self.checkCalledAbort(conn) + # give a uuid + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + ptid = self.app.lptid + tid = self.app.ltid + oid = self.app.loid + service.handleAskLastIDs(conn, packet) + loid, ltid, lptid = self.checkCalledAnswerLastIDs(conn) + self.assertEqual(loid, oid) + self.assertEqual(ltid, tid) + self.assertEqual(lptid, ptid) + + + def test_13_handleAskUnfinishedTransactions(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=11, msg_type=ASK_UNFINISHED_TRANSACTIONS) + # do not answer if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + service.handleAskUnfinishedTransactions(conn, packet) + self.checkCalledAbort(conn) + # give a uuid + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + service.handleAskUnfinishedTransactions(conn, packet) + tid_list = self.checkCalledAnswerUnfinishedTransactions(conn)[0] + self.assertEqual(len(tid_list), 0) + # create some transaction + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, + port=self.client_port) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + service.handleAskNewTID(conn, packet) + service.handleAskNewTID(conn, packet) + service.handleAskNewTID(conn, packet) + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + service.handleAskUnfinishedTransactions(conn, packet) + tid_list = self.checkCalledAnswerUnfinishedTransactions(conn)[0] + self.assertEqual(len(tid_list), 3) + + + def test_14_handleNotifyPartitionChanges(self): + service = self.service + uuid = self.identifyToMasterNode() + packet = Packet(msg_id=11, msg_type=NOTIFY_PARTITION_CHANGES) + # do not answer if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + service.handleNotifyPartitionChanges(conn, packet, None, None) + self.checkCalledAbort(conn) + # do not answer if not a storage node + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, + port=self.client_port) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + service.handleNotifyPartitionChanges(conn, packet, None, None) + + # send a bad state, must not be take into account + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + storage_uuid = self.identifyToMasterNode(port=self.storage_port+1) + offset = 1 + cell_list = [(offset, uuid, FEEDING_STATE),] + cells = self.app.pt.getRow(offset) + for cell, state in cells: + self.assertEquals(state, OUT_OF_DATE_STATE) + service.handleNotifyPartitionChanges(conn, packet, self.app.lptid, cell_list) + cells = self.app.pt.getRow(offset) + for cell, state in cells: + self.assertEquals(state, OUT_OF_DATE_STATE) + + # send for another node, must not be take into account + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + offset = 1 + cell_list = [(offset, storage_uuid, UP_TO_DATE_STATE),] + cells = self.app.pt.getRow(offset) + for cell, state in cells: + self.assertEquals(state, OUT_OF_DATE_STATE) + service.handleNotifyPartitionChanges(conn, packet, self.app.lptid, cell_list) + cells = self.app.pt.getRow(offset) + for cell, state in cells: + self.assertEquals(state, OUT_OF_DATE_STATE) + + # send for itself, must be taken into account + # and the feeding node must be removed + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + cell_list = [(offset, uuid, UP_TO_DATE_STATE),] + cells = self.app.pt.getRow(offset) + for cell, state in cells: + self.assertEquals(state, OUT_OF_DATE_STATE) + # mark the second storage node as feeding and say we are up to date + # second node must go to discarded state and first one to up to date state + self.app.pt.setCell(offset, self.app.nm.getNodeByUUID(storage_uuid), FEEDING_STATE) + cell_list = [(offset, uuid, UP_TO_DATE_STATE),] + cells = self.app.pt.getRow(offset) + for cell, state in cells: + if cell == storage_uuid: + self.assertEquals(state, FEEDING_STATE) + else: + self.assertEquals(state, OUT_OF_DATE_STATE) + lptid = self.app.lptid + service.handleNotifyPartitionChanges(conn, packet, self.app.lptid, cell_list) + self.failUnless(lptid < self.app.lptid) + cells = self.app.pt.getRow(offset) + for cell, state in cells: + if cell == uuid: + self.assertEquals(state, UP_TO_DATE_STATE) + else: + self.assertEquals(state, DISCARDED_STATE) + + + def test_15_peerBroken(self): + service = self.service + uuid = self.identifyToMasterNode() + # do nothing if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) + service.peerBroken(conn) + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) + # add a second storage node and then declare it as broken + self.identifyToMasterNode(port = self.storage_port+2) + storage_uuid = self.identifyToMasterNode(port = self.storage_port+1) + # filled the pt + self.app.pt.make(self.app.nm.getStorageNodeList()) + self.assertTrue(self.app.pt.filled()) + self.assertTrue(self.app.pt.operational()) + conn = Mock({"getUUID" : storage_uuid, + "getAddress" : ("127.0.0.1", self.storage_port+1)}) + lptid = self.app.lptid + self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), RUNNING_STATE) + service.peerBroken(conn) + self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), BROKEN_STATE) + self.failUnless(lptid < self.app.lptid) + # give an uuid, must raise as no other storage node available + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + lptid = self.app.lptid + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) + self.assertRaises(OperationFailure, service.peerBroken, conn) + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), BROKEN_STATE) + self.failUnless(lptid < self.app.lptid) + # give a client uuid which have unfinished transactions + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, + port = self.client_port) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + lptid = self.app.lptid + packet = Packet(msg_id=15, msg_type=ASK_NEW_TID) + service.handleAskNewTID(conn, packet) + service.handleAskNewTID(conn, packet) + service.handleAskNewTID(conn, packet) + self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE) + self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3) + service.peerBroken(conn) + # node must be have been remove, and no more transaction must remains + self.assertEquals(self.app.nm.getNodeByUUID(client_uuid), None) + self.assertEquals(lptid, self.app.lptid) + self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 0) + + + def test_16_timeoutExpired(self): + service = self.service + uuid = self.identifyToMasterNode() + # do nothing if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) + service.timeoutExpired(conn) + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) + # add a second storage node and then declare it as temp down + self.identifyToMasterNode(port = self.storage_port+2) + storage_uuid = self.identifyToMasterNode(port = self.storage_port+1) + # filled the pt + self.app.pt.make(self.app.nm.getStorageNodeList()) + self.assertTrue(self.app.pt.filled()) + self.assertTrue(self.app.pt.operational()) + conn = Mock({"getUUID" : storage_uuid, + "getAddress" : ("127.0.0.1", self.storage_port+1)}) + lptid = self.app.lptid + self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), RUNNING_STATE) + service.timeoutExpired(conn) + self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), TEMPORARILY_DOWN_STATE) + self.assertEquals(lptid, self.app.lptid) + # give an uuid, must raise as no other storage node available + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + lptid = self.app.lptid + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) + self.assertRaises(OperationFailure, service.timeoutExpired, conn) + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), TEMPORARILY_DOWN_STATE) + self.assertEquals(lptid, self.app.lptid) + # give a client uuid which have unfinished transactions + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, + port = self.client_port) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + lptid = self.app.lptid + packet = Packet(msg_id=15, msg_type=ASK_NEW_TID) + service.handleAskNewTID(conn, packet) + service.handleAskNewTID(conn, packet) + service.handleAskNewTID(conn, packet) + self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE) + self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3) + service.timeoutExpired(conn) + # node must be have been remove, and no more transaction must remains + self.assertEquals(self.app.nm.getNodeByUUID(client_uuid), None) + self.assertEquals(lptid, self.app.lptid) + self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 0) + + + def test_17_connectionClosed(self): + service = self.service + uuid = self.identifyToMasterNode() + # do nothing if no uuid + conn = Mock({"getUUID" : None, + "getAddress" : ("127.0.0.1", self.storage_port)}) + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) + service.connectionClosed(conn) + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) + # add a second storage node and then declare it as temp down + self.identifyToMasterNode(port = self.storage_port+2) + storage_uuid = self.identifyToMasterNode(port = self.storage_port+1) + # filled the pt + self.app.pt.make(self.app.nm.getStorageNodeList()) + self.assertTrue(self.app.pt.filled()) + self.assertTrue(self.app.pt.operational()) + conn = Mock({"getUUID" : storage_uuid, + "getAddress" : ("127.0.0.1", self.storage_port+1)}) + lptid = self.app.lptid + self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), RUNNING_STATE) + service.connectionClosed(conn) + self.assertEquals(self.app.nm.getNodeByUUID(storage_uuid).getState(), TEMPORARILY_DOWN_STATE) + self.assertEquals(lptid, self.app.lptid) + # give an uuid, must raise as no other storage node available + conn = Mock({"getUUID" : uuid, + "getAddress" : ("127.0.0.1", self.storage_port)}) + lptid = self.app.lptid + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), RUNNING_STATE) + self.assertRaises(OperationFailure, service.connectionClosed, conn) + self.assertEquals(self.app.nm.getNodeByUUID(uuid).getState(), TEMPORARILY_DOWN_STATE) + self.assertEquals(lptid, self.app.lptid) + # give a client uuid which have unfinished transactions + client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, + port = self.client_port) + conn = Mock({"getUUID" : client_uuid, + "getAddress" : ("127.0.0.1", self.client_port)}) + lptid = self.app.lptid + packet = Packet(msg_id=15, msg_type=ASK_NEW_TID) + service.handleAskNewTID(conn, packet) + service.handleAskNewTID(conn, packet) + service.handleAskNewTID(conn, packet) + self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE) + self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3) + service.connectionClosed(conn) + # node must be have been remove, and no more transaction must remains + self.assertEquals(self.app.nm.getNodeByUUID(client_uuid), None) + self.assertEquals(lptid, self.app.lptid) + self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 0) + + + + +if __name__ == '__main__': + unittest.main() +