Commit e6ca0260 authored by Vincent Pelletier's avatar Vincent Pelletier

Split handler class into 2:

 - One to handle events expected as answers only, and to treat in their own thread.
 - One to handle all other events, and to tread them in dispatcher thread.
Update "expected answer" caller (app.py's _waitMessage).
Update test.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@318 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f604e270
......@@ -31,7 +31,7 @@ from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, INVALID_SERIAL
from neo.client.handler import ClientEventHandler
from neo.client.handler import ClientEventHandler, ClientAnswerEventHandler
from neo.client.exception import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError
from neo.util import makeChecksum, dump
......@@ -198,6 +198,7 @@ class Application(object):
self.ptid = None
self.num_replicas = 0
self.num_partitions = 0
self.answer_handler = ClientAnswerEventHandler(self, dispatcher)
# Transaction specific variable
self.tid = None
self.txn = None
......@@ -256,7 +257,7 @@ class Application(object):
else:
continue
conn.handler.dispatch(conn, packet)
self.answer_handler.dispatch(conn, packet)
if target_conn is conn and msg_id == packet.getId() \
and packet.getType() & 0x8000:
......
......@@ -32,14 +32,27 @@ from neo.util import dump
from ZODB.TimeStamp import TimeStamp
from ZODB.utils import p64
class ClientEventHandler(EventHandler):
"""This class deals with events for a master."""
class BaseClientEventHandler(EventHandler):
"""Base class for client-side EventHandler implementations."""
def __init__(self, app, dispatcher):
self.app = app
self.dispatcher = dispatcher
super(ClientEventHandler, self).__init__()
super(BaseClientEventHandler, self).__init__()
def dispatch(self, conn, packet):
# Before calling superclass's dispatch method, lock the connection.
# This covers the case where handler sends a response to received
# packet.
conn.lock()
try:
super(BaseClientEventHandler, self).dispatch(conn, packet)
finally:
conn.release()
class ClientEventHandler(BaseClientEventHandler):
"""This class discriminates answers from non-answers, queues answer to
their requester and handles non-answers directly."""
def packetReceived(self, conn, packet):
"""Redirect all received packet to dispatcher thread."""
......@@ -51,16 +64,6 @@ class ClientEventHandler(EventHandler):
else:
queue.put((conn, packet))
def dispatch(self, conn, packet):
# Before calling superclass's dispatch method, lock the connection.
# This covers the case where handler sends a response to received
# packet.
conn.lock()
try:
super(ClientEventHandler, self).dispatch(conn, packet)
finally:
conn.release()
def _dealWithStorageFailure(self, conn, node, state):
app = self.app
......@@ -167,97 +170,20 @@ class ClientEventHandler(EventHandler):
super(ClientEventHandler, self).peerBroken(conn)
def handleNotReady(self, conn, packet, message):
app = self.app
app.local_var.node_not_ready = 1
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# It can be eiter a master node or a storage node
if node_type == CLIENT_NODE_TYPE:
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
app.nm.remove(node)
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
if node_type == MASTER_NODE_TYPE:
# Create partition table if necessary
if app.pt is None:
app.pt = PartitionTable(num_partitions, num_replicas)
app.num_partitions = num_partitions
app.num_replicas = num_replicas
# Ask a primary master.
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
p.askPrimaryMaster(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, app.getQueue())
finally:
conn.unlock()
elif node_type == STORAGE_NODE_TYPE:
app.storage_node = node
# Master node handler
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
if primary_node.getUUID() == primary_uuid:
# Whatever the situation is, I trust this master.
app.primary_master_node = primary_node
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
# XXX: This handler should be in ClientAnswerEventHandler, since this
# basicaly is an answer to askPrimaryMaster.
# Extract from P-NEO-Protocol.Description:
# Connection to primary master node (PMN in service state)
# CN -> PMN : askPrimaryMaster
# PMN -> CN : answerPrimaryMaster containing primary uuid and no
# known master list
# PMN -> CN : notifyNodeInformation containing list of all nodes
# PMN -> CN : sendPartitionTable containing partition table id and
# list of rows
# notifyNodeInformation is valid as asynchrounous event, but
# sendPartitionTable is only triggered after askPrimaryMaster.
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
......@@ -359,17 +285,6 @@ class ClientEventHandler(EventHandler):
pt.setCell(offset, node, state)
def handleAnswerNewTID(self, conn, packet, tid):
app = self.app
app.tid = tid
def handleNotifyTransactionFinished(self, conn, packet, tid):
app = self.app
if tid != app.tid:
app.txn_finished = -1
else:
app.txn_finished = 1
def handleInvalidateObjects(self, conn, packet, oid_list, tid):
app = self.app
app._cache_lock_acquire()
......@@ -387,13 +302,117 @@ class ClientEventHandler(EventHandler):
finally:
app._cache_lock_release()
def handleStopOperation(self, conn, packet):
logging.critical("master node ask to stop operation")
class ClientAnswerEventHandler(BaseClientEventHandler):
"""This class handles events only expected as answers to requests."""
def handleNotReady(self, conn, packet, message):
app = self.app
app.local_var.node_not_ready = 1
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# It can be eiter a master node or a storage node
if node_type == CLIENT_NODE_TYPE:
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
app.nm.remove(node)
conn.close()
return
conn.setUUID(uuid)
node.setUUID(uuid)
if node_type == MASTER_NODE_TYPE:
# Create partition table if necessary
if app.pt is None:
app.pt = PartitionTable(num_partitions, num_replicas)
app.num_partitions = num_partitions
app.num_replicas = num_replicas
# Ask a primary master.
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
p.askPrimaryMaster(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, app.getQueue())
finally:
conn.unlock()
elif node_type == STORAGE_NODE_TYPE:
app.storage_node = node
# Master node handler
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
if primary_node.getUUID() == primary_uuid:
# Whatever the situation is, I trust this master.
app.primary_master_node = primary_node
def handleAnswerNewTID(self, conn, packet, tid):
app = self.app
app.tid = tid
def handleAnswerNewOIDs(self, conn, packet, oid_list):
app = self.app
app.new_oid_list = oid_list
app.new_oid_list.reverse()
def handleStopOperation(self, conn, packet):
logging.critical("master node ask to stop operation")
def handleNotifyTransactionFinished(self, conn, packet, tid):
app = self.app
if tid != app.tid:
app.txn_finished = -1
else:
app.txn_finished = 1
# Storage node handler
......
......@@ -43,7 +43,7 @@ from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIF
UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, DISCARDED_STATE
from neo.exception import ElectionFailure
from neo.client.handler import ClientEventHandler
from neo.client.handler import ClientEventHandler, ClientAnswerEventHandler
from neo.node import StorageNode
MARKER = []
......@@ -227,7 +227,7 @@ class ClientEventHandlerTest(unittest.TestCase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleNotReady(conn, None, None)
self.assertEquals(app.local_var.node_not_ready, 1)
......@@ -240,7 +240,7 @@ class ClientEventHandlerTest(unittest.TestCase):
pt = None
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
uuid = self.getUUID()
client_handler.handleAcceptNodeIdentification(conn, None, CLIENT_NODE_TYPE,
......@@ -261,7 +261,7 @@ class ClientEventHandlerTest(unittest.TestCase):
return None
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
uuid = self.getUUID()
client_handler.handleAcceptNodeIdentification(conn, None, MASTER_NODE_TYPE,
......@@ -286,7 +286,7 @@ class ClientEventHandlerTest(unittest.TestCase):
return None
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
uuid = self.getUUID()
client_handler.handleAcceptNodeIdentification(conn, None, STORAGE_NODE_TYPE,
......@@ -319,7 +319,7 @@ class ClientEventHandlerTest(unittest.TestCase):
# Master node handler
def test_initialAnswerPrimaryMaster(self):
client_handler = ClientEventHandler(None, self.getDispatcher())
client_handler = ClientAnswerEventHandler(None, self.getDispatcher())
conn = Mock({'getUUID': None})
call_list = self._testHandleUnexpectedPacketCalledWithMedhod(
client_handler, client_handler.handleAnswerPrimaryMaster,
......@@ -334,7 +334,7 @@ class ClientEventHandlerTest(unittest.TestCase):
class App:
nm = Mock({'getNodeByUUID': node, 'getNodeByServer': None, 'add': None})
app = App()
client_handler = ClientEventHandler(app, self.getDispatcher())
client_handler = ClientAnswerEventHandler(app, self.getDispatcher())
conn = self.getConnection()
client_handler.handleAnswerPrimaryMaster(conn, None, 0, [])
# Check that nothing happened
......@@ -348,7 +348,7 @@ class ClientEventHandlerTest(unittest.TestCase):
# TODO: add an expectation on getNodeByUUID parameter: must be conn.getUUID()
primary_master_node = None
app = App()
client_handler = ClientEventHandler(app, self.getDispatcher())
client_handler = ClientAnswerEventHandler(app, self.getDispatcher())
conn = self.getConnection()
test_master_list = [('127.0.0.1', 10010, self.getUUID())]
client_handler.handleAnswerPrimaryMaster(conn, None, INVALID_UUID, test_master_list)
......@@ -375,7 +375,7 @@ class ClientEventHandlerTest(unittest.TestCase):
# TODO: add an expectation on getNodeByServer parameter: must be (ip_address, port)
primary_master_node = None
app = App()
client_handler = ClientEventHandler(app, self.getDispatcher())
client_handler = ClientAnswerEventHandler(app, self.getDispatcher())
conn = self.getConnection()
test_node_uuid = self.getUUID()
test_master_list = [('127.0.0.1', 10010, test_node_uuid)]
......@@ -402,7 +402,7 @@ class ClientEventHandlerTest(unittest.TestCase):
# TODO: add an expectation on getNodeByServer parameter: must be (ip_address, port)
primary_master_node = None
app = App()
client_handler = ClientEventHandler(app, self.getDispatcher())
client_handler = ClientAnswerEventHandler(app, self.getDispatcher())
conn = self.getConnection()
test_master_list = [('127.0.0.1', 10010, test_node_uuid)]
client_handler.handleAnswerPrimaryMaster(conn, None, INVALID_UUID, test_master_list)
......@@ -437,7 +437,7 @@ class ClientEventHandlerTest(unittest.TestCase):
# TODO: add an expectation on getNodeByServer parameter: must be (ip_address, port)
primary_master_node = test_primary_master_node
app = App()
client_handler = ClientEventHandler(app, self.getDispatcher())
client_handler = ClientAnswerEventHandler(app, self.getDispatcher())
conn = self.getConnection()
# If primary master is already set *and* is not given primary master
# handle call raises.
......@@ -452,7 +452,7 @@ class ClientEventHandlerTest(unittest.TestCase):
nm = Mock({'getNodeByUUID': node, 'getNodeByServer': node, 'add': None})
primary_master_node = node
app = App()
client_handler = ClientEventHandler(app, self.getDispatcher())
client_handler = ClientAnswerEventHandler(app, self.getDispatcher())
conn = self.getConnection()
client_handler.handleAnswerPrimaryMaster(conn, None, test_node_uuid, [])
# Check that primary node is (still) node.
......@@ -469,7 +469,7 @@ class ClientEventHandlerTest(unittest.TestCase):
# TODO: add an expectation on getNodeByUUID parameter: must be conn.getUUID(), then test_primary_node_uuid
primary_master_node = None
app = App()
client_handler = ClientEventHandler(app, self.getDispatcher())
client_handler = ClientAnswerEventHandler(app, self.getDispatcher())
conn = self.getConnection()
client_handler.handleAnswerPrimaryMaster(conn, None, test_primary_node_uuid, [])
# Check that primary node was not updated.
......@@ -484,7 +484,7 @@ class ClientEventHandlerTest(unittest.TestCase):
# TODO: add an expectation on getNodeByServer parameter: must be (ip_address, port)
primary_master_node = None
app = App()
client_handler = ClientEventHandler(app, self.getDispatcher())
client_handler = ClientAnswerEventHandler(app, self.getDispatcher())
conn = self.getConnection()
test_master_list = [('127.0.0.1', 10010, test_node_uuid)]
client_handler.handleAnswerPrimaryMaster(conn, None, test_node_uuid, test_master_list)
......@@ -822,7 +822,7 @@ class ClientEventHandlerTest(unittest.TestCase):
tid = None
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
test_tid = 1
client_handler.handleAnswerNewTID(conn, None, test_tid)
......@@ -834,7 +834,7 @@ class ClientEventHandlerTest(unittest.TestCase):
txn_finished = None
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleNotifyTransactionFinished(conn, None, 1)
self.assertEquals(app.txn_finished, 1)
......@@ -878,7 +878,7 @@ class ClientEventHandlerTest(unittest.TestCase):
new_oid_list = []
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
test_oid_list = ['\x00\x00\x00\x00\x00\x00\x00\x01', '\x00\x00\x00\x00\x00\x00\x00\x02']
client_handler.handleAnswerNewOIDs(conn, None, test_oid_list[:])
......@@ -896,7 +896,7 @@ class ClientEventHandlerTest(unittest.TestCase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
# TODO: use realistic values
test_object_data = ('\x00\x00\x00\x00\x00\x00\x00\x01', 0, 0, 0, 0, 'test')
......@@ -905,7 +905,7 @@ class ClientEventHandlerTest(unittest.TestCase):
def _testAnswerStoreObject(self, app, conflicting, oid, serial):
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleAnswerStoreObject(conn, None, conflicting, oid, serial)
......@@ -933,7 +933,7 @@ class ClientEventHandlerTest(unittest.TestCase):
txn_voted = 0
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
test_tid = 10
client_handler.handleAnswerStoreTransaction(conn, None, test_tid)
......@@ -947,7 +947,7 @@ class ClientEventHandlerTest(unittest.TestCase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
tid = '\x00\x00\x00\x00\x00\x00\x00\x01' # TODO: use a more realistic tid
user = 'bar'
......@@ -969,7 +969,7 @@ class ClientEventHandlerTest(unittest.TestCase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
test_oid = '\x00\x00\x00\x00\x00\x00\x00\x01'
# TODO: use realistic values
......@@ -988,7 +988,7 @@ class ClientEventHandlerTest(unittest.TestCase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleOidNotFound(conn, None, None)
self.assertEquals(app.local_var.asked_object, -1)
......@@ -1001,7 +1001,7 @@ class ClientEventHandlerTest(unittest.TestCase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
client_handler.handleTidNotFound(conn, None, None)
self.assertEquals(app.local_var.txn_info, -1)
......@@ -1013,7 +1013,7 @@ class ClientEventHandlerTest(unittest.TestCase):
local_var = FakeLocal()
app = App()
dispatcher = self.getDispatcher()
client_handler = ClientEventHandler(app, dispatcher)
client_handler = ClientAnswerEventHandler(app, dispatcher)
conn = self.getConnection()
test_tid_list = ['\x00\x00\x00\x00\x00\x00\x00\x01', '\x00\x00\x00\x00\x00\x00\x00\x02']
client_handler.handleAnswerTIDs(conn, None, test_tid_list[:])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment