Commit 4b4ed209 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Split test classes for storage handlers, for client, master and storage remote

nodes during operation state. 


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@811 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent bf8d963a
......@@ -28,6 +28,13 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler):
def connectionCompleted(self, conn):
BaseClientAndStorageOperationHandler.connectionCompleted(self, conn)
def handleAskLastIDs(self, conn, packet):
app = self.app
oid = app.dm.getLastOID() or INVALID_OID
tid = app.dm.getLastTID() or INVALID_TID
p = protocol.answerLastIDs(oid, tid, app.ptid)
conn.answer(p, packet)
def handleAskOIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return OIDs only
# about usable partitions assigned to me.
......
from neo.storage.tests.testStorageApp import StorageAppTests
from neo.storage.tests.testStorageBootstrapHandler import StorageBootstrapTests
from neo.storage.tests.testStorageMySQLdb import StorageMySQSLdbTests
from neo.storage.tests.testStorageOperation import StorageOperationTests
from neo.storage.tests.testStorageVerificationHandler import StorageVerificationTests
from neo.storage.tests.testClientHandler import StorageClientHandlerTests
from neo.storage.tests.testMasterHandler import StorageMasterHandlerTests
from neo.storage.tests.testStorageHandler import StorageStorageHandlerTests
__all__ = [
'StorageAppTests',
'StorageBootstrapTests',
'StorageMySQSLdbTests',
'StorageOperationTests',
'StorageVerificationTests',
'StorageClientHandlerTests',
'StorageMasterHandlerTests',
'StorageStorageHandlerTests',
]
#
# Copyright (C) 2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import os
import unittest
import logging
from struct import pack, unpack
from mock import Mock
from collections import deque
from neo.tests.base import NeoTestBase
from neo.master.app import MasterNode
from neo.storage.app import Application, StorageNode
from neo.storage.handlers import MasterOperationHandler
from neo.exception import PrimaryFailure, OperationFailure
from neo.pt import PartitionTable
from neo import protocol
from neo.protocol import *
class StorageMasterHandlerTests(NeoTestBase):
def checkHandleUnexpectedPacket(self, _call, _msg_type, _listening=True, **kwargs):
conn = Mock({
"getAddress" : ("127.0.0.1", self.master_port),
"isServerConnection": _listening,
})
packet = Packet(msg_type=_msg_type)
# hook
self.operation.peerBroken = lambda c: c.peerBrokendCalled()
self.checkUnexpectedPacketRaised(_call, conn=conn, packet=packet, **kwargs)
def setUp(self):
logging.basicConfig(level = logging.ERROR)
self.prepareDatabase(number=1)
# create an application object
config = self.getConfigFile(master_number=1)
self.app = Application(config, "storage1")
self.app.num_partitions = 1
self.app.num_replicas = 1
self.app.transaction_dict = {}
self.app.store_lock_dict = {}
self.app.load_lock_dict = {}
self.app.event_queue = deque()
for server in self.app.master_node_list:
master = MasterNode(server = server)
self.app.nm.add(master)
# handler
self.operation = MasterOperationHandler(self.app)
# set pmn
self.master_uuid = self.getNewUUID()
pmn = self.app.nm.getMasterNodeList()[0]
pmn.setUUID(self.master_uuid)
self.app.primary_master_node = pmn
self.master_port = 10010
def tearDown(self):
NeoTestBase.tearDown(self)
def test_06_timeoutExpired(self):
# client connection
conn = Mock({
"getUUID": self.master_uuid,
"getAddress" : ("127.0.0.1", self.master_port),
})
self.assertRaises(PrimaryFailure, self.operation.timeoutExpired, conn)
self.checkNoPacketSent(conn)
def test_07_connectionClosed2(self):
# primary has closed the connection
conn = Mock({
"getUUID": self.master_uuid,
"getAddress" : ("127.0.0.1", self.master_port),
})
self.assertRaises(PrimaryFailure, self.operation.connectionClosed, conn)
self.checkNoPacketSent(conn)
def test_08_peerBroken(self):
# client connection
conn = Mock({
"getUUID": self.master_uuid,
"getAddress" : ("127.0.0.1", self.master_port),
})
self.assertRaises(PrimaryFailure, self.operation.peerBroken, conn)
self.checkNoPacketSent(conn)
def test_14_handleNotifyPartitionChanges1(self):
# old partition change -> do nothing
app = self.app
conn = Mock({
"isServerConnection": False,
"getAddress" : ("127.0.0.1", self.master_port),
})
app.replicator = Mock({})
packet = Packet(msg_type=NOTIFY_PARTITION_CHANGES)
self.app.ptid = 1
count = len(self.app.nm.getNodeList())
self.operation.handleNotifyPartitionChanges(conn, packet, 0, ())
self.assertEquals(self.app.ptid, 1)
self.assertEquals(len(self.app.nm.getNodeList()), count)
calls = self.app.replicator.mockGetNamedCalls('removePartition')
self.assertEquals(len(calls), 0)
calls = self.app.replicator.mockGetNamedCalls('addPartition')
self.assertEquals(len(calls), 0)
def test_14_handleNotifyPartitionChanges2(self):
# cases :
uuid = self.getNewUUID()
cells = (
(0, uuid, UP_TO_DATE_STATE),
(1, self.app.uuid, DISCARDED_STATE),
(2, self.app.uuid, OUT_OF_DATE_STATE),
)
# context
conn = Mock({
"isServerConnection": False,
"getAddress" : ("127.0.0.1", self.master_port),
})
packet = Packet(msg_type=NOTIFY_PARTITION_CHANGES)
app = self.app
ptid1, ptid2 = self.getTwoIDs()
self.assertNotEquals(ptid1, ptid2)
app.ptid = ptid1
app.pt = PartitionTable(3, 1)
app.pt = Mock({ })
app.dm = Mock({ })
app.replicator = Mock({})
count = len(app.nm.getNodeList())
self.operation.handleNotifyPartitionChanges(conn, packet, ptid2, cells)
# ptid set
self.assertEquals(app.ptid, ptid2)
# two nodes added
self.assertEquals(len(app.nm.getNodeList()), count + 2)
# uuid != app.uuid -> TEMPORARILY_DOWN_STATE
self.assertEquals(app.nm.getNodeByUUID(uuid).getState(), TEMPORARILY_DOWN_STATE)
# pt calls
calls = self.app.pt.mockGetNamedCalls('setCell')
self.assertEquals(len(calls), 3)
calls[0].checkArgs(0, app.nm.getNodeByUUID(uuid), UP_TO_DATE_STATE)
calls[1].checkArgs(1, app.nm.getNodeByUUID(app.uuid), DISCARDED_STATE)
calls[2].checkArgs(2, app.nm.getNodeByUUID(app.uuid), OUT_OF_DATE_STATE)
# replicator calls
calls = self.app.replicator.mockGetNamedCalls('removePartition')
self.assertEquals(len(calls), 1)
calls[0].checkArgs(1)
calls = self.app.replicator.mockGetNamedCalls('addPartition')
# dm call
calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
self.assertEquals(len(calls), 1)
calls[0].checkArgs(ptid2, cells)
def test_16_handleStopOperation1(self):
# OperationFailure
conn = Mock({ 'isServerConnection': False })
packet = Packet(msg_type=STOP_OPERATION)
self.assertRaises(OperationFailure, self.operation.handleStopOperation, conn, packet)
def test_22_handleLockInformation2(self):
# load transaction informations
conn = Mock({ 'isServerConnection': False, })
self.app.dm = Mock({ })
packet = Packet(msg_type=LOCK_INFORMATION)
transaction = Mock({ 'getObjectList': ((0, ), ), })
self.app.transaction_dict[INVALID_TID] = transaction
self.operation.handleLockInformation(conn, packet, INVALID_TID)
self.assertEquals(self.app.load_lock_dict[0], INVALID_TID)
calls = self.app.dm.mockGetNamedCalls('storeTransaction')
self.assertEquals(len(calls), 1)
self.checkNotifyInformationLocked(conn, answered_packet=packet)
# transaction not in transaction_dict -> KeyError
transaction = Mock({ 'getObjectList': ((0, ), ), })
conn = Mock({ 'isServerConnection': False, })
self.operation.handleLockInformation(conn, packet, '\x01' * 8)
self.checkNotifyInformationLocked(conn, answered_packet=packet)
def test_23_handleUnlockInformation2(self):
# delete transaction informations
conn = Mock({ 'isServerConnection': False, })
self.app.dm = Mock({ })
packet = Packet(msg_type=LOCK_INFORMATION)
transaction = Mock({ 'getObjectList': ((0, ), ), })
self.app.transaction_dict[INVALID_TID] = transaction
self.app.load_lock_dict[0] = transaction
self.app.store_lock_dict[0] = transaction
self.operation.handleUnlockInformation(conn, packet, INVALID_TID)
self.assertEquals(len(self.app.load_lock_dict), 0)
self.assertEquals(len(self.app.store_lock_dict), 0)
self.assertEquals(len(self.app.store_lock_dict), 0)
calls = self.app.dm.mockGetNamedCalls('finishTransaction')
self.assertEquals(len(calls), 1)
calls[0].checkArgs(INVALID_TID)
# transaction not in transaction_dict -> KeyError
transaction = Mock({ 'getObjectList': ((0, ), ), })
conn = Mock({ 'isServerConnection': False, })
self.operation.handleLockInformation(conn, packet, '\x01' * 8)
self.checkNotifyInformationLocked(conn, answered_packet=packet)
def test_30_handleAnswerLastIDs(self):
# set critical TID on replicator
conn = Mock()
packet = Packet(msg_type=ANSWER_LAST_IDS)
self.app.replicator = Mock()
self.operation.handleAnswerLastIDs(
conn=conn,
packet=packet,
loid=INVALID_OID,
ltid=INVALID_TID,
lptid=INVALID_TID,
)
calls = self.app.replicator.mockGetNamedCalls('setCriticalTID')
self.assertEquals(len(calls), 1)
calls[0].checkArgs(packet, INVALID_TID)
def test_31_handleAnswerUnfinishedTransactions(self):
# set unfinished TID on replicator
conn = Mock()
packet = Packet(msg_type=ANSWER_UNFINISHED_TRANSACTIONS)
self.app.replicator = Mock()
self.operation.handleAnswerUnfinishedTransactions(
conn=conn,
packet=packet,
tid_list=(INVALID_TID, ),
)
calls = self.app.replicator.mockGetNamedCalls('setUnfinishedTIDList')
self.assertEquals(len(calls), 1)
calls[0].checkArgs((INVALID_TID, ))
if __name__ == "__main__":
unittest.main()
......@@ -41,7 +41,7 @@ class StorageBootstrapTests(NeoTestBase):
config = self.getConfigFile()
self.app = Application(config, "master1")
for server in self.app.master_node_list:
self.app.nm.add(MasterNode(server = server))
self.app.nm.add(MasterNode(server=server))
self.trying_master_node = self.app.nm.getMasterNodeList()[0]
self.bootstrap = BootstrapHandler(self.app)
# define some variable to simulate client and storage node
......
#
# Copyright (C) 2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import os
import unittest
import logging
from struct import pack, unpack
from mock import Mock
from collections import deque
from neo.tests.base import NeoTestBase
from neo.master.app import MasterNode
from neo.storage.app import Application, StorageNode
from neo.storage.handlers import StorageOperationHandler
from neo import protocol
from neo.protocol import *
class StorageStorageHandlerTests(NeoTestBase):
def checkHandleUnexpectedPacket(self, _call, _msg_type, _listening=True, **kwargs):
conn = Mock({
"getAddress" : ("127.0.0.1", self.master_port),
"isServerConnection": _listening,
})
packet = Packet(msg_type=_msg_type)
# hook
self.operation.peerBroken = lambda c: c.peerBrokendCalled()
self.checkUnexpectedPacketRaised(_call, conn=conn, packet=packet, **kwargs)
def setUp(self):
logging.basicConfig(level = logging.ERROR)
self.prepareDatabase(number=1)
# create an application object
config = self.getConfigFile(master_number=1)
self.app = Application(config, "storage1")
self.app.num_partitions = 1
self.app.num_replicas = 1
self.app.transaction_dict = {}
self.app.store_lock_dict = {}
self.app.load_lock_dict = {}
self.app.event_queue = deque()
for server in self.app.master_node_list:
master = MasterNode(server = server)
self.app.nm.add(master)
# handler
self.operation = StorageOperationHandler(self.app)
# set pmn
self.master_uuid = self.getNewUUID()
pmn = self.app.nm.getMasterNodeList()[0]
pmn.setUUID(self.master_uuid)
self.app.primary_master_node = pmn
self.master_port = 10010
def tearDown(self):
NeoTestBase.tearDown(self)
def test_18_handleAskTransactionInformation1(self):
# transaction does not exists
conn = Mock({ })
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
self.operation.handleAskTransactionInformation(conn, packet, INVALID_TID)
self.checkErrorPacket(conn)
def test_18_handleAskTransactionInformation2(self):
# answer
conn = Mock({ })
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
dm = Mock({ "getTransaction": (INVALID_TID, 'user', 'desc', '', ), })
self.app.dm = dm
self.operation.handleAskTransactionInformation(conn, packet, INVALID_TID)
self.checkAnswerTransactionInformation(conn)
def test_24_handleAskObject1(self):
# delayed response
conn = Mock({})
self.app.dm = Mock()
packet = Packet(msg_type=ASK_OBJECT)
self.app.load_lock_dict[INVALID_OID] = object()
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
oid=INVALID_OID,
serial=INVALID_SERIAL,
tid=INVALID_TID)
self.assertEquals(len(self.app.event_queue), 1)
self.checkNoPacketSent(conn)
self.assertEquals(len(self.app.dm.mockGetNamedCalls('getObject')), 0)
def test_24_handleAskObject2(self):
# invalid serial / tid / packet not found
self.app.dm = Mock({'getObject': None})
conn = Mock({})
packet = Packet(msg_type=ASK_OBJECT)
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
oid=INVALID_OID,
serial=INVALID_SERIAL,
tid=INVALID_TID)
calls = self.app.dm.mockGetNamedCalls('getObject')
self.assertEquals(len(self.app.event_queue), 0)
self.assertEquals(len(calls), 1)
calls[0].checkArgs(INVALID_OID, None, None)
self.checkErrorPacket(conn)
def test_24_handleAskObject3(self):
# object found => answer
self.app.dm = Mock({'getObject': ('', '', 0, 0, '', )})
conn = Mock({})
packet = Packet(msg_type=ASK_OBJECT)
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
oid=INVALID_OID,
serial=INVALID_SERIAL,
tid=INVALID_TID)
self.assertEquals(len(self.app.event_queue), 0)
self.checkAnswerObject(conn)
def test_25_handleAskTIDs1(self):
# invalid offsets => error
app = self.app
app.pt = Mock()
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_type=ASK_TIDS)
self.checkProtocolErrorRaised(self.operation.handleAskTIDs, conn, packet, 1, 1, None)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
self.assertEquals(len(app.dm.mockGetNamedCalls('getTIDList')), 0)
def test_25_handleAskTIDs2(self):
# well case => answer
conn = Mock({})
packet = Packet(msg_type=ASK_TIDS)
self.app.num_partitions = 1
self.app.dm = Mock({'getTIDList': (INVALID_TID, )})
self.operation.handleAskTIDs(conn, packet, 1, 2, 1)
calls = self.app.dm.mockGetNamedCalls('getTIDList')
self.assertEquals(len(calls), 1)
calls[0].checkArgs(1, 1, 1, [1, ])
self.checkAnswerTids(conn)
def test_25_handleAskTIDs3(self):
# invalid partition => answer usable partitions
conn = Mock({})
packet = Packet(msg_type=ASK_TIDS)
self.app.num_partitions = 1
cell = Mock({'getUUID':self.app.uuid})
self.app.dm = Mock({'getTIDList': (INVALID_TID, )})
self.app.pt = Mock({'getCellList': (cell, )})
self.operation.handleAskTIDs(conn, packet, 1, 2, INVALID_PARTITION)
self.assertEquals(len(self.app.pt.mockGetNamedCalls('getCellList')), 1)
calls = self.app.dm.mockGetNamedCalls('getTIDList')
self.assertEquals(len(calls), 1)
calls[0].checkArgs(1, 1, 1, [0, ])
self.checkAnswerTids(conn)
def test_26_handleAskObjectHistory1(self):
# invalid offsets => error
app = self.app
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_type=ASK_OBJECT_HISTORY)
self.checkProtocolErrorRaised(self.operation.handleAskObjectHistory, conn, packet, 1, 1, None)
self.assertEquals(len(app.dm.mockGetNamedCalls('getObjectHistory')), 0)
def test_26_handleAskObjectHistory2(self):
# first case: empty history
packet = Packet(msg_type=ASK_OBJECT_HISTORY)
conn = Mock({})
self.app.dm = Mock({'getObjectHistory': None})
self.operation.handleAskObjectHistory(conn, packet, INVALID_OID, 1, 2)
self.checkAnswerObjectHistory(conn)
# second case: not empty history
conn = Mock({})
self.app.dm = Mock({'getObjectHistory': [('', 0, ), ]})
self.operation.handleAskObjectHistory(conn, packet, INVALID_OID, 1, 2)
self.checkAnswerObjectHistory(conn)
def test_25_handleAskOIDs1(self):
# invalid offsets => error
app = self.app
app.pt = Mock()
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_type=ASK_OIDS)
self.checkProtocolErrorRaised(self.operation.handleAskOIDs, conn, packet, 1, 1, None)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
self.assertEquals(len(app.dm.mockGetNamedCalls('getOIDList')), 0)
def test_25_handleAskOIDs2(self):
# well case > answer OIDs
conn = Mock({})
packet = Packet(msg_type=ASK_OIDS)
self.app.num_partitions = 1
self.app.dm = Mock({'getOIDList': (INVALID_OID, )})
self.operation.handleAskOIDs(conn, packet, 1, 2, 1)
calls = self.app.dm.mockGetNamedCalls('getOIDList')
self.assertEquals(len(calls), 1)
calls[0].checkArgs(1, 1, 1, [1, ])
self.checkAnswerOids(conn)
def test_25_handleAskOIDs3(self):
# invalid partition => answer usable partitions
conn = Mock({})
packet = Packet(msg_type=ASK_OIDS)
self.app.num_partitions = 1
cell = Mock({'getUUID':self.app.uuid})
self.app.dm = Mock({'getOIDList': (INVALID_OID, )})
self.app.pt = Mock({'getCellList': (cell, )})
self.operation.handleAskOIDs(conn, packet, 1, 2, INVALID_PARTITION)
self.assertEquals(len(self.app.pt.mockGetNamedCalls('getCellList')), 1)
calls = self.app.dm.mockGetNamedCalls('getOIDList')
self.assertEquals(len(calls), 1)
calls[0].checkArgs(1, 1, 1, [0, ])
self.checkAnswerOids(conn)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment