Commit 5a55c98d authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add more checks on storage/operation.py :

* that nothing is put in queue when should not
* that there is no change with a 'pass' of 'return'
* that UUIDs matches with Identification packets
And some tests are grouped


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@379 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent e5669276
......@@ -20,6 +20,7 @@ import unittest
import logging
import MySQLdb
from tempfile import mkstemp
from struct import pack, unpack
from mock import Mock
from collections import deque
from neo.master.app import MasterNode
......@@ -316,6 +317,7 @@ server: 127.0.0.1:10020
def test_09_handleRequestNodeIdentification1(self):
# reject client connection
count = len(self.app.nm.getNodeList())
self.checkHandleUnexpectedPacket(
_call=self.operation.handleRequestNodeIdentification,
_listening=False,
......@@ -325,6 +327,7 @@ server: 127.0.0.1:10020
ip_address='127.0.0.1',
port=self.master_port,
name=self.app.name)
self.assertEquals(len(self.app.nm.getNodeList()), count)
def test_09_handleRequestNodeIdentification2(self):
# bad app name
......@@ -335,6 +338,7 @@ server: 127.0.0.1:10020
"isListeningConnection": True,
"getAddress" : ("127.0.0.1", self.master_port),
})
count = len(self.app.nm.getNodeList())
self.operation.handleRequestNodeIdentification(
conn=conn,
packet=packet,
......@@ -344,6 +348,7 @@ server: 127.0.0.1:10020
port=self.master_port,
name='INVALID_NAME')
self.checkCalledAbort(conn)
self.assertEquals(len(self.app.nm.getNodeList()), count)
def test_09_handleRequestNodeIdentification3(self):
# broken node
......@@ -355,6 +360,7 @@ server: 127.0.0.1:10020
"isListeningConnection": True,
"getAddress" : ("127.0.0.1", self.master_port),
})
count = len(self.app.nm.getNodeList())
self.operation.handleRequestNodeIdentification(
conn=conn,
packet=packet,
......@@ -364,6 +370,7 @@ server: 127.0.0.1:10020
port=self.master_port,
name=self.app.name)
self.checkPacket(conn, packet_type=ERROR)
self.assertEquals(len(self.app.nm.getNodeList()), count)
def test_09_handleRequestNodeIdentification4(self):
# new non-master, rejected
......@@ -372,6 +379,7 @@ server: 127.0.0.1:10020
"isListeningConnection": True,
"getAddress" : ("127.0.0.1", self.master_port),
})
count = len(self.app.nm.getNodeList())
self.operation.handleRequestNodeIdentification(
conn=conn,
uuid=self.getNewUUID(),
......@@ -381,6 +389,7 @@ server: 127.0.0.1:10020
ip_address='192.168.1.1',
name=self.app.name,)
self.checkCalledAbort(conn)
self.assertEquals(len(self.app.nm.getNodeList()), count)
def test_09_handleRequestNodeIdentification5(self):
# new master, accepted
......@@ -401,6 +410,9 @@ server: 127.0.0.1:10020
name=self.app.name,)
# node added
self.assertEquals(len(self.app.nm.getNodeList()), count + 1)
n = self.app.nm.getNodeByServer(('192.168.1.1', self.master_port))
self.assertNotEquals(n, None)
self.assertEquals(n.getUUID(), uuid)
self.checkPacket(conn, packet_type=ACCEPT_NODE_IDENTIFICATION)
# uuid
self.assertEquals(len(conn.mockGetNamedCalls("setUUID")), 1)
......@@ -416,10 +428,13 @@ server: 127.0.0.1:10020
"isListeningConnection": True,
"getAddress" : ("127.0.0.1", self.master_port),
})
mn = self.app.nm.getNodeByServer(('127.0.0.1', self.master_port))
uuid = self.getNewUUID()
mn.setUUID(uuid)
count = len(self.app.nm.getNodeList())
self.operation.handleRequestNodeIdentification(
conn=conn,
uuid=self.master_uuid,
uuid=self.uuid,
packet=packet,
port=self.master_port,
node_type=STORAGE_NODE_TYPE,
......@@ -431,7 +446,8 @@ server: 127.0.0.1:10020
# uuid
self.assertEquals(len(conn.mockGetNamedCalls("setUUID")), 1)
call = conn.mockGetNamedCalls("setUUID")[0]
self.assertEquals(call.getParam(0), self.master_uuid)
self.assertEquals(call.getParam(0), self.uuid)
self.assertEquals(mn.getUUID(), self.uuid)
# abort
self.assertEquals(len(conn.mockGetNamedCalls('abort')), 0)
......@@ -508,80 +524,77 @@ server: 127.0.0.1:10020
ptid=0,
cell_list=()
)
def test_14_handleNotifyPartitionChanges2(self):
# old partition change
# old partition change -> do nothing
app = self.app
conn = Mock({
"isListeningConnection": False,
"getAddress" : ("127.0.0.1", self.master_port),
})
app.replicator = Mock({})
packet = Packet(msg_id=1, msg_type=NOTIFY_PARTITION_CHANGES)
self.app.ptid = 1
count = len(self.app.nm.getNodeList())
self.operation.handleNotifyPartitionChanges(conn, packet, 0, ())
self.assertEquals(self.app.ptid, 1)
def test_14_handleNotifyPartitionChanges3(self):
# new node
conn = Mock({
"isListeningConnection": False,
"getAddress" : ("127.0.0.1", self.master_port),
})
packet = Packet(msg_id=1, msg_type=NOTIFY_PARTITION_CHANGES)
cell = (0, self.getNewUUID(), UP_TO_DATE_STATE)
count = len(self.app.nm.getNodeList())
self.app.pt = PartitionTable(1, 1)
self.app.dm = Mock({ })
ptid, self.ptid = self.getTwoIDs()
# pt updated
self.operation.handleNotifyPartitionChanges(conn, packet, ptid, (cell, ))
self.assertEquals(len(self.app.nm.getNodeList()), count + 1)
# check db update
calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
self.assertEquals(len(calls), 1)
self.assertEquals(calls[0].getParam(0), ptid)
self.assertEquals(calls[0].getParam(1), (cell, ))
def test_14_handleNotifyPartitionChanges4(self):
# check with DISCARDED_STATE
conn = Mock({
"isListeningConnection": False,
"getAddress" : ("127.0.0.1", self.master_port),
})
packet = Packet(msg_id=1, msg_type=NOTIFY_PARTITION_CHANGES)
self.app.uuid = self.getNewUUID()
self.app.pt = PartitionTable(1, 1)
self.app.dm = Mock({ })
self.app.replicator = Mock({})
ptid, self.ptids = self.getTwoIDs()
cell = (0, self.app.uuid, DISCARDED_STATE)
self.app.partition_list = [object()]
self.operation.handleNotifyPartitionChanges(conn, packet, ptid, (cell,))
self.assertEquals(len(self.app.nm.getNodeList()), count)
calls = self.app.replicator.mockGetNamedCalls('removePartition')
self.assertEquals(len(calls), 1)
self.assertEquals(calls[0].getParam(0), 0)
self.assertEquals(len(calls), 0)
calls = self.app.replicator.mockGetNamedCalls('addPartition')
self.assertEquals(len(calls), 0)
def test_14_handleNotifyPartitionChanges4(self):
# check with OUT_OF_DATE_STATE
def test_14_handleNotifyPartitionChanges2(self):
# cases :
uuid = self.getNewUUID()
cells = (
(0, uuid, UP_TO_DATE_STATE),
(1, self.app.uuid, DISCARDED_STATE),
(2, self.app.uuid, OUT_OF_DATE_STATE),
)
# context
conn = Mock({
"isListeningConnection": False,
"getAddress" : ("127.0.0.1", self.master_port),
})
packet = Packet(msg_id=1, msg_type=NOTIFY_PARTITION_CHANGES)
self.app.uuid = self.getNewUUID()
self.app.pt = PartitionTable(1, 1)
self.app.dm = Mock({ })
self.app.replicator = Mock({})
ptid, self.ptid = self.getTwoIDs()
cell = (0, self.app.uuid, OUT_OF_DATE_STATE)
self.app.partition_list = [object()]
self.operation.handleNotifyPartitionChanges(conn, packet, ptid, (cell,))
calls = self.app.replicator.mockGetNamedCalls('addPartition')
self.assertEquals(len(calls), 1)
app = self.app
ptid1, ptid2 = self.getTwoIDs()
self.assertNotEquals(ptid1, ptid2)
app.ptid = ptid1
app.pt = PartitionTable(3, 1)
app.pt = Mock({ })
app.dm = Mock({ })
app.replicator = Mock({})
count = len(app.nm.getNodeList())
self.operation.handleNotifyPartitionChanges(conn, packet, ptid2, cells)
# ptid set
self.assertEquals(app.ptid, ptid2)
# two nodes added
self.assertEquals(len(app.nm.getNodeList()), count + 2)
# uuid != app.uuid -> TEMPORARILY_DOWN_STATE
self.assertEquals(app.nm.getNodeByUUID(uuid).getState(), TEMPORARILY_DOWN_STATE)
# pt calls
calls = self.app.pt.mockGetNamedCalls('setCell')
self.assertEquals(len(calls), 3)
self.assertEquals(calls[0].getParam(0), 0)
self.assertEquals(calls[1].getParam(0), 1)
self.assertEquals(calls[2].getParam(0), 2)
self.assertEquals(calls[0].getParam(1), app.nm.getNodeByUUID(uuid))
self.assertEquals(calls[1].getParam(1), app.nm.getNodeByUUID(app.uuid))
self.assertEquals(calls[2].getParam(1), app.nm.getNodeByUUID(app.uuid))
self.assertEquals(calls[0].getParam(2), UP_TO_DATE_STATE)
self.assertEquals(calls[1].getParam(2), DISCARDED_STATE)
self.assertEquals(calls[2].getParam(2), OUT_OF_DATE_STATE)
# replicator calls
calls = self.app.replicator.mockGetNamedCalls('removePartition')
self.assertEquals(len(calls), 0)
self.assertEquals(len(calls), 1)
self.assertEquals(calls[0].getParam(0), 1)
calls = self.app.replicator.mockGetNamedCalls('addPartition')
# dm call
calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
self.assertEquals(len(calls), 1)
self.assertEquals(calls[0].getParam(0), ptid2)
self.assertEquals(calls[0].getParam(1), cells)
self.assertEquals(len(calls), 1)
def test_15_handleStartOperation(self):
# unexpected packet
......@@ -653,12 +666,14 @@ server: 127.0.0.1:10020
def test_22_handleLockInformation1(self):
# reject server connection
self.app.dm = Mock()
self.checkHandleUnexpectedPacket(
_call=self.operation.handleLockInformation,
_msg_type=LOCK_INFORMATION,
_listening=True,
tid=INVALID_TID,
)
self.assertEquals(len(self.app.dm.mockGetNamedCalls('storeTransaction')), 0)
def test_22_handleLockInformation2(self):
# load transaction informations
......@@ -672,15 +687,22 @@ server: 127.0.0.1:10020
calls = self.app.dm.mockGetNamedCalls('storeTransaction')
self.assertEquals(len(calls), 1)
self.checkPacket(conn, packet_type=NOTIFY_INFORMATION_LOCKED)
# transaction not in transaction_dict -> KeyError
transaction = Mock({ 'getObjectList': ((0, ), ), })
conn = Mock({ 'isListeningConnection': False, })
self.operation.handleLockInformation(conn, packet, '\x01' * 8)
self.checkPacket(conn, packet_type=NOTIFY_INFORMATION_LOCKED)
def test_23_handleUnlockInformation1(self):
# reject server connection
self.app.dm = Mock()
self.checkHandleUnexpectedPacket(
_call=self.operation.handleUnlockInformation,
_msg_type=UNLOCK_INFORMATION,
_listening=True,
tid=INVALID_TID,
)
self.assertEquals(len(self.app.dm.mockGetNamedCalls('storeTransaction')), 0)
def test_23_handleUnlockInformation2(self):
# delete transaction informations
......@@ -698,10 +720,16 @@ server: 127.0.0.1:10020
calls = self.app.dm.mockGetNamedCalls('finishTransaction')
self.assertEquals(len(calls), 1)
self.assertEquals(calls[0].getParam(0), INVALID_TID)
# transaction not in transaction_dict -> KeyError
transaction = Mock({ 'getObjectList': ((0, ), ), })
conn = Mock({ 'isListeningConnection': False, })
self.operation.handleLockInformation(conn, packet, '\x01' * 8)
self.checkPacket(conn, packet_type=NOTIFY_INFORMATION_LOCKED)
def test_24_handleAskObject1(self):
# delayed response
conn = Mock({})
self.app.dm = Mock()
packet = Packet(msg_id=1, msg_type=ASK_OBJECT)
self.app.load_lock_dict[INVALID_OID] = object()
self.assertEquals(len(self.app.event_queue), 0)
......@@ -710,40 +738,52 @@ server: 127.0.0.1:10020
serial=INVALID_SERIAL,
tid=INVALID_TID)
self.assertEquals(len(self.app.event_queue), 1)
self.assertEquals(len(conn.mockGetNamedCalls('addPacket')), 0)
self.assertEquals(len(self.app.dm.mockGetNamedCalls('getObject')), 0)
def test_24_handleAskObject1(self):
def test_24_handleAskObject2(self):
# invalid serial / tid / packet not found
self.app.dm = Mock({'getObject': None})
conn = Mock({})
packet = Packet(msg_id=1, msg_type=ASK_OBJECT)
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
oid=INVALID_OID,
serial=INVALID_SERIAL,
tid=INVALID_TID)
calls = self.app.dm.mockGetNamedCalls('getObject')
self.assertEquals(len(self.app.event_queue), 0)
self.assertEquals(len(calls), 1)
self.assertEquals(calls[0].getParam(0), INVALID_OID)
self.assertEquals(calls[0].getParam(1), None)
self.assertEquals(calls[0].getParam(2), None)
self.checkPacket(conn, packet_type=ERROR)
def test_24_handleAskObject2(self):
def test_24_handleAskObject3(self):
# object found => answer
self.app.dm = Mock({'getObject': ('', '', 0, 0, '', )})
conn = Mock({})
packet = Packet(msg_id=1, msg_type=ASK_OBJECT)
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
oid=INVALID_OID,
serial=INVALID_SERIAL,
tid=INVALID_TID)
self.assertEquals(len(self.app.event_queue), 0)
self.checkPacket(conn, packet_type=ANSWER_OBJECT)
def test_25_handleAskTIDs1(self):
# invalid offsets => error
app = self.app
app.pt = Mock()
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_id=1, msg_type=ASK_TIDS)
self.operation.handleAskTIDs(conn, packet, 1, 1, None)
self.checkPacket(conn, packet_type=ERROR)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
self.assertEquals(len(app.dm.mockGetNamedCalls('getTIDList')), 0)
def test_25_handleAskTIDs2(self):
# well case => answer
......@@ -780,10 +820,13 @@ server: 127.0.0.1:10020
def test_26_handleAskObjectHistory1(self):
# invalid offsets => error
app = self.app
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_id=1, msg_type=ASK_OBJECT_HISTORY)
self.operation.handleAskObjectHistory(conn, packet, 1, 1, None)
self.checkPacket(conn, packet_type=ERROR)
self.assertEquals(len(app.dm.mockGetNamedCalls('getObjectHistory')), 0)
def test_26_handleAskObjectHistory2(self):
# first case: empty history
......@@ -800,6 +843,7 @@ server: 127.0.0.1:10020
def test_27_handleAskStoreTransaction1(self):
# no uuid => abort
before = self.app.transaction_dict.items()[:]
self.checkHandleUnexpectedPacket(
_call=self.operation.handleAskStoreTransaction,
_msg_type=ASK_STORE_TRANSACTION,
......@@ -809,6 +853,8 @@ server: 127.0.0.1:10020
ext='',
oid_list=()
)
after = self.app.transaction_dict.items()
self.assertEquals(before, after)
def test_27_handleAskStoreTransaction2(self):
# add transaction entry
......@@ -824,26 +870,45 @@ server: 127.0.0.1:10020
def test_28_handleAskStoreObject1(self):
# no uuid => abort
app = self.app
oid = '\x01' * 8
app.dm = Mock()
l_before = len(app.event_queue)
t_before = self.app.transaction_dict.items()[:]
self.assertTrue(oid not in app.store_lock_dict)
self.checkHandleUnexpectedPacket(
_call=self.operation.handleAskStoreObject,
_msg_type=ASK_STORE_OBJECT,
oid=INVALID_OID,
oid=oid,
serial=INVALID_SERIAL,
compression=0,
checksum='',
data='',
tid=INVALID_TID
)
l_after = len(app.event_queue)
self.assertEquals(l_before, l_after)
self.assertEquals(len(app.dm.mockGetNamedCalls('getObjectHistory')), 0)
t_after = self.app.transaction_dict.items()
self.assertEquals(t_before, t_after)
self.assertTrue(oid not in app.store_lock_dict)
def test_28_handleAskStoreObject2(self):
# locked => delayed response
packet = Packet(msg_id=1, msg_type=ASK_STORE_OBJECT)
conn = Mock({'getUUID': self.app.uuid})
oid = '\x02' * 8
tid1, tid2 = self.getTwoIDs()
self.app.store_lock_dict[INVALID_OID] = tid1
self.operation.handleAskStoreObject(conn, packet, INVALID_OID,
self.app.store_lock_dict[oid] = tid1
self.assertTrue(oid in self.app.store_lock_dict)
t_before = self.app.transaction_dict.items()[:]
self.operation.handleAskStoreObject(conn, packet, oid,
INVALID_SERIAL, 0, 0, '', tid2)
self.assertEquals(len(self.app.event_queue), 1)
t_after = self.app.transaction_dict.items()[:]
self.assertEquals(t_before, t_after)
self.assertEquals(len(conn.mockGetNamedCalls('addPacket')), 0)
self.assertTrue(oid in self.app.store_lock_dict)
def test_28_handleAskStoreObject3(self):
# locked => unresolvable conflict => answer
......@@ -854,15 +919,24 @@ server: 127.0.0.1:10020
self.operation.handleAskStoreObject(conn, packet, INVALID_OID,
INVALID_SERIAL, 0, 0, '', tid1)
self.checkPacket(conn, packet_type=ANSWER_STORE_OBJECT)
self.assertEquals(self.app.store_lock_dict[INVALID_OID], tid2)
# conflicting
packet = conn.mockGetNamedCalls('addPacket')[0].getParam(0)
self.assertTrue(unpack('!B8s8s', packet._body)[0])
def test_28_handleAskStoreObject4(self):
# resolvable conflict => answer
packet = Packet(msg_id=1, msg_type=ASK_STORE_OBJECT)
conn = Mock({'getUUID': self.app.uuid})
self.app.dm = Mock({'getObjectHistory':((self.getNewUUID(), ), )})
self.assertEquals(self.app.store_lock_dict.get(INVALID_OID, None), None)
self.operation.handleAskStoreObject(conn, packet, INVALID_OID,
INVALID_SERIAL, 0, 0, '', INVALID_TID)
self.checkPacket(conn, packet_type=ANSWER_STORE_OBJECT)
self.assertEquals(self.app.store_lock_dict.get(INVALID_OID, None), None)
# conflicting
packet = conn.mockGetNamedCalls('addPacket')[0].getParam(0)
self.assertTrue(unpack('!B8s8s', packet._body)[0])
def test_28_handleAskStoreObject5(self):
# no conflict => answer
......@@ -876,29 +950,38 @@ server: 127.0.0.1:10020
object = t.getObjectList()[0]
self.assertEquals(object, (INVALID_OID, 0, 0, ''))
self.checkPacket(conn, packet_type=ANSWER_STORE_OBJECT)
# no conflict
packet = conn.mockGetNamedCalls('addPacket')[0].getParam(0)
self.assertFalse(unpack('!B8s8s', packet._body)[0])
def test_29_handleAbortTransaction1(self):
def test_29_handleAbortTransaction(self):
# no uuid => abort
before = self.app.transaction_dict.items()[:]
self.checkHandleUnexpectedPacket(
_call=self.operation.handleAbortTransaction,
_msg_type=ABORT_TRANSACTION,
tid=INVALID_TID
)
def test_29_handleAbortTransaction2(self):
after = self.app.transaction_dict.items()
self.assertEquals(before, after)
# remove transaction
packet = Packet(msg_id=1, msg_type=ABORT_TRANSACTION)
conn = Mock({'getUUID': self.app.uuid})
transaction = Mock({ 'getObjectList': ((0, ), ), })
self.called = False
def called():
self.called = True
self.app.executeQueuedEvents = called
self.app.load_lock_dict[0] = object()
self.app.store_lock_dict[0] = object()
self.app.transaction_dict[INVALID_TID] = transaction
self.operation.handleAbortTransaction(conn, packet, INVALID_TID)
self.assertTrue(self.called)
self.assertEquals(len(self.app.load_lock_dict), 0)
self.assertEquals(len(self.app.store_lock_dict), 0)
self.assertEquals(len(self.app.store_lock_dict), 0)
def test_30_handleAnswerLastIDs1(self):
def test_30_handleAnswerLastIDs(self):
# listening connection => unexpected packet
self.checkHandleUnexpectedPacket(
_call=self.operation.handleAnswerLastIDs,
......@@ -908,8 +991,6 @@ server: 127.0.0.1:10020
ltid=INVALID_TID,
lptid=INVALID_TID,
)
def test_30_handleAnswerLastIDs2(self):
# set critical TID on replicator
conn = Mock()
packet = Packet(msg_id=1, msg_type=ANSWER_LAST_IDS)
......@@ -926,7 +1007,7 @@ server: 127.0.0.1:10020
self.assertEquals(calls[0].getParam(0), packet)
self.assertEquals(calls[0].getParam(1), INVALID_TID)
def test_31_handleAnswerUnfinishedTransactions1(self):
def test_31_handleAnswerUnfinishedTransactions(self):
# unexpected packet
self.checkHandleUnexpectedPacket(
_call=self.operation.handleAnswerUnfinishedTransactions,
......@@ -934,8 +1015,6 @@ server: 127.0.0.1:10020
_listening=True,
tid_list=(INVALID_TID, ),
)
def test_31_handleAnswerUnfinishedTransactions2(self):
# set unfinished TID on replicator
conn = Mock()
packet = Packet(msg_id=1, msg_type=ANSWER_UNFINISHED_TRANSACTIONS)
......@@ -951,10 +1030,15 @@ server: 127.0.0.1:10020
def test_25_handleAskOIDs1(self):
# invalid offsets => error
app = self.app
app.pt = Mock()
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_id=1, msg_type=ASK_OIDS)
self.operation.handleAskOIDs(conn, packet, 1, 1, None)
self.checkPacket(conn, packet_type=ERROR)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
self.assertEquals(len(app.dm.mockGetNamedCalls('getOIDList')), 0)
def test_25_handleAskOIDs2(self):
# well case > answer OIDs
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment