Commit e85d3544 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Update all code for commit #1356.


git-svn-id: https://svn.erp5.org/repos/neo/trunk@1358 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent e3368849
......@@ -28,7 +28,7 @@ from neo.bootstrap import BootstrapManager
from neo.pt import PartitionTable
from neo import protocol
from neo.util import parseMasterList
from neo.protocol import NodeTypes, NodeStates
from neo.protocol import NodeTypes, NodeStates, Packets
class Dispatcher:
"""Dispatcher use to redirect master request to handler"""
......@@ -100,7 +100,7 @@ class Application(object):
# start the operation. This cycle will be executed permentnly,
# until the user explicitly requests a shutdown.
while 1:
self.connectToPrimaryMaster()
self.connectToPrimary()
try:
while 1:
self.em.poll(1)
......@@ -108,7 +108,7 @@ class Application(object):
logging.error('primary master is down')
def connectToPrimaryMaster(self):
def connectToPrimary(self):
"""Find a primary master node, and connect to it.
If a primary master node is not elected or ready, repeat
......@@ -145,8 +145,8 @@ class Application(object):
# passive handler
self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(protocol.askNodeInformation())
self.master_conn.ask(protocol.askPartitionTable([]))
self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable([]))
def sendPartitionTable(self, conn, min_offset, max_offset, uuid, msg_id):
# we have a pt
......@@ -170,5 +170,5 @@ class Application(object):
p = protocol.protocolError('invalid partition table offset')
conn.notify(p)
return
p = protocol.answerPartitionList(self.ptid, row_list)
p = Packets.AnswerPartitionList(self.ptid, row_list)
conn.answer(p, msg_id)
......@@ -19,6 +19,7 @@ from neo import logging
from neo.handler import EventHandler
from neo import protocol
from neo.protocol import Packets
from neo.exception import PrimaryFailure
from neo.util import dump
......@@ -32,7 +33,7 @@ class AdminEventHandler(EventHandler):
if app.pt is None:
if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.')
p = protocol.askPartitionTable([])
p = Packets.AskPartitionTable([])
msg_id = self.app.master_conn.ask(p)
app.dispatcher.register(msg_id, conn,
{'min_offset' : min_offset,
......@@ -49,7 +50,7 @@ class AdminEventHandler(EventHandler):
return n.getType() is node_type
node_list = self.app.nm.getList(node_filter)
node_information_list = [node.asTuple() for node in node_list ]
p = protocol.answerNodeList(node_information_list)
p = Packets.AnswerNodeList(node_information_list)
conn.answer(p, packet.getId())
def setNodeState(self, conn, packet, uuid, state, modify_partition_table):
......@@ -65,7 +66,7 @@ class AdminEventHandler(EventHandler):
# forward to primary master node
if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.')
p = protocol.setNodeState(uuid, state, modify_partition_table)
p = Packets.SetNodeState(uuid, state, modify_partition_table)
msg_id = self.app.master_conn.ask(p)
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
......@@ -73,7 +74,7 @@ class AdminEventHandler(EventHandler):
# forward to primary
if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.')
p = protocol.setClusterState(state)
p = Packets.SetClusterState(state)
msg_id = self.app.master_conn.ask(p)
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
......@@ -82,7 +83,7 @@ class AdminEventHandler(EventHandler):
raise protocol.NotReadyError('Not connected to a primary master.')
logging.info('Add nodes %s' % [dump(uuid) for uuid in uuid_list])
# forward the request to primary
msg_id = self.app.master_conn.ask(protocol.addPendingNodes(uuid_list))
msg_id = self.app.master_conn.ask(Packets.AddPendingNodes(uuid_list))
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
def askClusterState(self, conn, packet):
......@@ -90,17 +91,17 @@ class AdminEventHandler(EventHandler):
if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.')
# required it from PMN first
msg_id = self.app.master_conn.ask(protocol.askClusterState())
msg_id = self.app.master_conn.ask(Packets.AskClusterState())
self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
else:
conn.answer(protocol.answerClusterState(self.app.cluster_state),
conn.answer(Packets.AnswerClusterState(self.app.cluster_state),
packet.getId())
def askPrimaryMaster(self, conn, packet):
def askPrimary(self, conn, packet):
if self.app.master_conn is None:
raise protocol.NotReadyError('Not connected to a primary master.')
master_node = self.app.master_node
conn.answer(protocol.answerPrimaryMaster(master_node.getUUID(), []),
conn.answer(Packets.AnswerPrimary(master_node.getUUID(), []),
packet.getId())
class MasterEventHandler(EventHandler):
......@@ -135,7 +136,7 @@ class MasterEventHandler(EventHandler):
# unexpectexd answers and notifications
super(MasterEventHandler, self).dispatch(conn, packet)
def answerNodeInformation(self, conn, packet, node_list):
def answerNodeInformation(self, conn, packet):
# XXX: This will no more exists when the initialization module will be
# implemented for factorize code (as done for bootstrap)
logging.debug("answerNodeInformation")
......@@ -180,7 +181,7 @@ class MasterEventHandler(EventHandler):
# Re-ask partition table, in case node change filled it.
# XXX: we should only ask it if received states indicates it is
# possible (ignore TEMPORARILY_DOWN for example)
conn.ask(protocol.askPartitionTable([]))
conn.ask(Packets.AskPartitionTable([]))
class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node"""
......@@ -193,12 +194,12 @@ class MasterRequestEventHandler(EventHandler):
logging.info("answerClusterState for a conn")
self.app.cluster_state = state
self.__answerNeoCTL(packet.getId(),
protocol.answerClusterState(state))
Packets.AnswerClusterState(state))
def answerNewNodes(self, conn, packet, uuid_list):
logging.info("answerNewNodes for a conn")
self.__answerNeoCTL(packet.getId(),
protocol.answerNewNodes(uuid_list))
Packets.AnswerNewNodes(uuid_list))
def answerPartitionTable(self, conn, packet, ptid, row_list):
logging.info("answerPartitionTable for a conn")
......@@ -208,7 +209,7 @@ class MasterRequestEventHandler(EventHandler):
def answerNodeState(self, conn, packet, uuid, state):
self.__answerNeoCTL(packet.getId(),
protocol.answerNodeState(uuid, state))
Packets.AnswerNodeState(uuid, state))
def noError(self, conn, packet, msg):
self.__answerNeoCTL(packet.getId(), protocol.noError(msg))
......
......@@ -19,7 +19,7 @@ from neo import logging
from time import sleep
from neo.handler import EventHandler
from neo import protocol
from neo.protocol import Packets
from neo.util import dump
from neo.connection import ClientConnection
......@@ -43,7 +43,7 @@ class BootstrapManager(EventHandler):
def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn)
conn.ask(protocol.askPrimaryMaster())
conn.ask(Packets.AskPrimary())
def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn)
......@@ -57,7 +57,7 @@ class BootstrapManager(EventHandler):
self.current = None
conn.close()
def answerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
def answerPrimary(self, conn, packet, primary_uuid, known_master_list):
nm = self.app.nm
# Register new master nodes.
......@@ -78,10 +78,10 @@ class BootstrapManager(EventHandler):
return
logging.info('connected to a primary master node')
conn.ask(protocol.requestNodeIdentification(self.node_type,
conn.ask(Packets.RequestIdentification(self.node_type,
self.uuid, self.server, self.name))
def acceptNodeIdentification(self, conn, packet, node_type,
def acceptIdentification(self, conn, packet, node_type,
uuid, address, num_partitions, num_replicas, your_uuid):
self.num_partitions = num_partitions
self.num_replicas = num_replicas
......
......@@ -29,7 +29,7 @@ setupLog('CLIENT', verbose=True)
from neo import logging
from neo import protocol
from neo.protocol import NodeTypes
from neo.protocol import NodeTypes, Packets
from neo.event import EventManager
from neo.util import makeChecksum, dump
from neo.locking import RLock, Lock
......@@ -87,7 +87,7 @@ class ConnectionPool(object):
logging.error('Connection to storage node %s failed', node)
return None
p = protocol.requestNodeIdentification(NodeTypes.CLIENT,
p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name)
msg_id = conn.ask(app.local_var.queue, p)
finally:
......@@ -327,7 +327,7 @@ class Application(object):
handler = self.primary_handler
else:
raise ValueError, 'Unknown node type: %r' % (
node_type, )
node.__class__, )
handler.dispatch(conn, packet)
if target_conn is conn and msg_id == packet.getId():
break
......@@ -360,7 +360,7 @@ class Application(object):
try:
if self.master_conn is None:
self.new_oid_list = []
self.master_conn = self._connectToPrimaryMasterNode()
self.master_conn = self._connectToPrimaryNode()
return self.master_conn
finally:
self._connecting_to_master_node_release()
......@@ -382,7 +382,7 @@ class Application(object):
pt = self._getPartitionTable()
return pt.getCellListForTID(tid, readable, writable)
def _connectToPrimaryMasterNode(self):
def _connectToPrimaryNode(self):
logging.debug('connecting to primary master...')
ready = False
nm = self.nm
......@@ -418,7 +418,7 @@ class Application(object):
logging.error('Connection to master node %s failed',
self.trying_master_node)
continue
msg_id = conn.ask(self.local_var.queue, protocol.askPrimaryMaster())
msg_id = conn.ask(self.local_var.queue, Packets.AskPrimary())
finally:
conn.unlock()
try:
......@@ -439,7 +439,7 @@ class Application(object):
self.trying_master_node)
self.primary_master_node = None
break
p = protocol.requestNodeIdentification(NodeTypes.CLIENT,
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name)
msg_id = conn.ask(self.local_var.queue, p)
finally:
......@@ -453,7 +453,7 @@ class Application(object):
# Node identification was refused by master.
# Sleep a bit an retry.
# XXX: This should be replaced by:
# - queuing requestNodeIdentification at master side
# - queuing RequestIdentification at master side
# - sending the acceptance from master when it becomes
# ready
# Thus removing the need to:
......@@ -465,14 +465,14 @@ class Application(object):
conn.lock()
try:
msg_id = conn.ask(self.local_var.queue,
protocol.askNodeInformation())
Packets.AskNodeInformation())
finally:
conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
conn.lock()
try:
msg_id = conn.ask(self.local_var.queue,
protocol.askPartitionTable([]))
Packets.AskPartitionTable([]))
finally:
conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
......@@ -496,7 +496,7 @@ class Application(object):
# we manage a list of oid here to prevent
# from asking too many time new oid one by one
# from master node
self._askPrimary(protocol.askNewOIDs(100))
self._askPrimary(Packets.AskNewOIDs(100))
if len(self.new_oid_list) <= 0:
raise NEOStorageError('new_oid failed')
self.last_oid = self.new_oid_list.pop()
......@@ -543,7 +543,7 @@ class Application(object):
continue
try:
self._askStorage(conn, protocol.askObject(oid, serial, tid))
self._askStorage(conn, Packets.AskObject(oid, serial, tid))
except ConnectionClosed:
continue
......@@ -644,7 +644,7 @@ class Application(object):
# the master will supply us one. Otherwise the requested tid will be
# used if possible.
self.local_var.tid = None
self._askPrimary(protocol.askBeginTransaction(tid))
self._askPrimary(Packets.AskBeginTransaction(tid))
if self.local_var.tid is None:
raise NEOStorageError('tpc_begin failed')
self.local_var.txn = transaction
......@@ -673,7 +673,7 @@ class Application(object):
continue
self.local_var.object_stored = 0
p = protocol.askStoreObject(oid, serial, 1,
p = Packets.AskStoreObject(oid, serial, 1,
checksum, compressed_data, self.local_var.tid)
try:
self._askStorage(conn, p)
......@@ -722,7 +722,7 @@ class Application(object):
continue
self.local_var.txn_voted = False
p = protocol.askStoreTransaction(self.local_var.tid,
p = Packets.AskStoreTransaction(self.local_var.tid,
user, desc, ext, oid_list)
try:
self._askStorage(conn, p)
......@@ -755,7 +755,7 @@ class Application(object):
if conn is None:
continue
try:
conn.notify(protocol.abortTransaction(self.local_var.tid))
conn.notify(Packets.AbortTransaction(self.local_var.tid))
finally:
conn.unlock()
......@@ -763,7 +763,7 @@ class Application(object):
conn = self._getMasterConnection()
conn.lock()
try:
conn.notify(protocol.abortTransaction(self.local_var.tid))
conn.notify(Packets.AbortTransaction(self.local_var.tid))
finally:
conn.unlock()
self.local_var.clear()
......@@ -780,7 +780,7 @@ class Application(object):
# Call finish on master
oid_list = self.local_var.data_dict.keys()
p = protocol.finishTransaction(oid_list, self.local_var.tid)
p = Packets.FinishTransaction(oid_list, self.local_var.tid)
self._askPrimary(p)
if not self.isTransactionFinished():
......@@ -814,7 +814,7 @@ class Application(object):
self.local_var.txn_info = 0
try:
self._askStorage(conn, protocol.askTransactionInformation(transaction_id))
self._askStorage(conn, Packets.AskTransactionInformation(transaction_id))
except ConnectionClosed:
continue
......@@ -881,7 +881,7 @@ class Application(object):
try:
conn.ask(self.local_var.queue,
protocol.askTIDs(first, last, protocol.INVALID_PARTITION))
Packets.AskTIDs(first, last, protocol.INVALID_PARTITION))
finally:
conn.unlock()
......@@ -912,7 +912,7 @@ class Application(object):
if conn is not None:
self.local_var.txn_info = 0
try:
self._askStorage(conn, protocol.askTransactionInformation(tid))
self._askStorage(conn, Packets.AskTransactionInformation(tid))
except ConnectionClosed:
continue
if isinstance(self.local_var.txn_info, dict):
......@@ -954,7 +954,7 @@ class Application(object):
self.local_var.history = None
try:
self._askStorage(conn, protocol.askObjectHistory(oid, 0, length))
self._askStorage(conn, Packets.AskObjectHistory(oid, 0, length))
except ConnectionClosed:
continue
......@@ -987,7 +987,7 @@ class Application(object):
# ask transaction information
self.local_var.txn_info = None
try:
self._askStorage(conn, protocol.askTransactionInformation(serial))
self._askStorage(conn, Packets.AskTransactionInformation(serial))
except ConnectionClosed:
continue
......
......@@ -30,7 +30,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
app.trying_master_node = None
app.setNodeNotReady()
def acceptNodeIdentification(self, conn, packet, node_type,
def acceptIdentification(self, conn, packet, node_type,
uuid, address, num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getByAddress(conn.getAddress())
......@@ -57,7 +57,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
# Always create partition table
app.pt = PartitionTable(num_partitions, num_replicas)
def answerPrimaryMaster(self, conn, packet, primary_uuid,
def answerPrimary(self, conn, packet, primary_uuid,
known_master_list):
app = self.app
# Register new master nodes.
......@@ -95,7 +95,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
def answerPartitionTable(self, conn, packet, ptid, row_list):
pass
def answerNodeInformation(self, conn, packet, node_list):
def answerNodeInformation(self, conn, packet):
pass
class PrimaryNotificationsHandler(BaseHandler):
......
......@@ -52,7 +52,7 @@ class StorageBootstrapHandler(AnswerBaseHandler):
app = self.app
app.setNodeNotReady()
def acceptNodeIdentification(self, conn, packet, node_type,
def acceptIdentification(self, conn, packet, node_type,
uuid, address, num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getByAddress(conn.getAddress())
......
......@@ -19,7 +19,7 @@ from neo import logging
from neo.locking import RLock
from neo import protocol
from neo.protocol import PacketMalformedError, PacketTypes
from neo.protocol import PacketMalformedError, Packets
from neo.event import IdleEvent
from neo.connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException, \
......@@ -239,7 +239,7 @@ class Connection(BaseConnection):
try:
packet = protocol.parse(self.read_buf)
except PacketMalformedError, msg:
self.handler.packetMalformed(self, packet, msg)
self.handler._packetMalformed(self, packet, msg)
return
if packet is None:
......@@ -256,10 +256,10 @@ class Connection(BaseConnection):
try:
packet_type = packet.getType()
if packet_type == PacketTypes.PING:
if packet_type == Packets.Ping:
# Send a pong notification
self.answer(protocol.pong(), packet.getId())
elif packet_type != PacketTypes.PONG:
self.answer(Packets.Pong(), packet.getId())
elif packet_type != Packets.Pong:
# Skip PONG packets, its only purpose is to drop IdleEvent
# generated upong ping.
self._queue.append(packet)
......@@ -357,7 +357,7 @@ class Connection(BaseConnection):
PACKET_LOGGER.log(self, packet, ' to ')
try:
self.write_buf += packet.encode()
self.write_buf += str(packet)
except PacketMalformedError, m:
logging.critical('trying to send a too big message')
raise
......@@ -418,7 +418,7 @@ class Connection(BaseConnection):
def ping(self, timeout=5):
""" Send a ping and expect to receive a pong notification """
packet = protocol.ping()
packet = Packets.Ping()
msg_id = self._getNextId()
packet.setId(msg_id)
self.expectMessage(msg_id, timeout, 0)
......
......@@ -17,7 +17,7 @@
from neo import logging
from neo import protocol
from neo.protocol import NodeStates, ErrorCodes, PacketTypes
from neo.protocol import NodeStates, ErrorCodes, Packets
from neo.protocol import PacketMalformedError, UnexpectedPacketError, \
BrokenNodeDisallowedError, NotReadyError, ProtocolError
......@@ -30,7 +30,7 @@ class EventHandler(object):
self.packet_dispatch_table = self.__initPacketDispatchTable()
self.error_dispatch_table = self.__initErrorDispatchTable()
def __packetMalformed(self, conn, packet, message='', *args):
def _packetMalformed(self, conn, packet, message='', *args):
"""Called when a packet is malformed."""
args = (conn.getAddress()[0], conn.getAddress()[1], message)
if packet is None:
......@@ -71,7 +71,7 @@ class EventHandler(object):
except UnexpectedPacketError, e:
self.__unexpectedPacket(conn, packet, *e.args)
except PacketMalformedError, e:
self.__packetMalformed(conn, packet, *e.args)
self._packetMalformed(conn, packet, *e.args)
except BrokenNodeDisallowedError:
answer_packet = protocol.brokenNodeDisallowedError('go away')
conn.answer(answer_packet, packet.getId())
......@@ -140,25 +140,25 @@ class EventHandler(object):
# Packet handlers.
def requestNodeIdentification(self, conn, packet, node_type,
def requestIdentification(self, conn, packet, node_type,
uuid, address, name):
raise UnexpectedPacketError
def acceptNodeIdentification(self, conn, packet, node_type,
def acceptIdentification(self, conn, packet, node_type,
uuid, address, num_partitions, num_replicas, your_uuid):
raise UnexpectedPacketError
def askPrimaryMaster(self, conn, packet):
def askPrimary(self, conn, packet):
raise UnexpectedPacketError
def answerPrimaryMaster(self, conn, packet, primary_uuid,
def answerPrimary(self, conn, packet, primary_uuid,
known_master_list):
raise UnexpectedPacketError
def announcePrimaryMaster(self, conn, packet):
def announcePrimary(self, conn, packet):
raise UnexpectedPacketError
def reelectPrimaryMaster(self, conn, packet):
def reelectPrimary(self, conn, packet):
raise UnexpectedPacketError
def notifyNodeInformation(self, conn, packet, node_list):
......@@ -312,7 +312,8 @@ class EventHandler(object):
def askNodeInformation(self, conn, packet):
raise UnexpectedPacketError
def answerNodeInformation(self, conn, packet, node_list):
def answerNodeInformation(self, conn, packet):
# XXX: Just an acknowledge, to be removed
raise UnexpectedPacketError
def askClusterState(self, conn, packet):
......@@ -368,68 +369,68 @@ class EventHandler(object):
def __initPacketDispatchTable(self):
d = {}
d[PacketTypes.ERROR] = self.error
d[PacketTypes.REQUEST_NODE_IDENTIFICATION] = self.requestNodeIdentification
d[PacketTypes.ACCEPT_NODE_IDENTIFICATION] = self.acceptNodeIdentification
d[PacketTypes.ASK_PRIMARY_MASTER] = self.askPrimaryMaster
d[PacketTypes.ANSWER_PRIMARY_MASTER] = self.answerPrimaryMaster
d[PacketTypes.ANNOUNCE_PRIMARY_MASTER] = self.announcePrimaryMaster
d[PacketTypes.REELECT_PRIMARY_MASTER] = self.reelectPrimaryMaster
d[PacketTypes.NOTIFY_NODE_INFORMATION] = self.notifyNodeInformation
d[PacketTypes.ASK_LAST_IDS] = self.askLastIDs
d[PacketTypes.ANSWER_LAST_IDS] = self.answerLastIDs
d[PacketTypes.ASK_PARTITION_TABLE] = self.askPartitionTable
d[PacketTypes.ANSWER_PARTITION_TABLE] = self.answerPartitionTable
d[PacketTypes.SEND_PARTITION_TABLE] = self.sendPartitionTable
d[PacketTypes.NOTIFY_PARTITION_CHANGES] = self.notifyPartitionChanges
d[PacketTypes.START_OPERATION] = self.startOperation
d[PacketTypes.STOP_OPERATION] = self.stopOperation
d[PacketTypes.ASK_UNFINISHED_TRANSACTIONS] = self.askUnfinishedTransactions
d[PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS] = self.answerUnfinishedTransactions
d[PacketTypes.ASK_OBJECT_PRESENT] = self.askObjectPresent
d[PacketTypes.ANSWER_OBJECT_PRESENT] = self.answerObjectPresent
d[PacketTypes.DELETE_TRANSACTION] = self.deleteTransaction
d[PacketTypes.COMMIT_TRANSACTION] = self.commitTransaction
d[PacketTypes.ASK_BEGIN_TRANSACTION] = self.askBeginTransaction
d[PacketTypes.ANSWER_BEGIN_TRANSACTION] = self.answerBeginTransaction
d[PacketTypes.FINISH_TRANSACTION] = self.finishTransaction
d[PacketTypes.NOTIFY_TRANSACTION_FINISHED] = self.notifyTransactionFinished
d[PacketTypes.LOCK_INFORMATION] = self.lockInformation
d[PacketTypes.NOTIFY_INFORMATION_LOCKED] = self.notifyInformationLocked
d[PacketTypes.INVALIDATE_OBJECTS] = self.invalidateObjects
d[PacketTypes.UNLOCK_INFORMATION] = self.unlockInformation
d[PacketTypes.ASK_NEW_OIDS] = self.askNewOIDs
d[PacketTypes.ANSWER_NEW_OIDS] = self.answerNewOIDs
d[PacketTypes.ASK_STORE_OBJECT] = self.askStoreObject
d[PacketTypes.ANSWER_STORE_OBJECT] = self.answerStoreObject
d[PacketTypes.ABORT_TRANSACTION] = self.abortTransaction
d[PacketTypes.ASK_STORE_TRANSACTION] = self.askStoreTransaction
d[PacketTypes.ANSWER_STORE_TRANSACTION] = self.answerStoreTransaction
d[PacketTypes.ASK_OBJECT] = self.askObject
d[PacketTypes.ANSWER_OBJECT] = self.answerObject
d[PacketTypes.ASK_TIDS] = self.askTIDs
d[PacketTypes.ANSWER_TIDS] = self.answerTIDs
d[PacketTypes.ASK_TRANSACTION_INFORMATION] = self.askTransactionInformation
d[PacketTypes.ANSWER_TRANSACTION_INFORMATION] = self.answerTransactionInformation
d[PacketTypes.ASK_OBJECT_HISTORY] = self.askObjectHistory
d[PacketTypes.ANSWER_OBJECT_HISTORY] = self.answerObjectHistory
d[PacketTypes.ASK_OIDS] = self.askOIDs
d[PacketTypes.ANSWER_OIDS] = self.answerOIDs
d[PacketTypes.ASK_PARTITION_LIST] = self.askPartitionList
d[PacketTypes.ANSWER_PARTITION_LIST] = self.answerPartitionList
d[PacketTypes.ASK_NODE_LIST] = self.askNodeList
d[PacketTypes.ANSWER_NODE_LIST] = self.answerNodeList
d[PacketTypes.SET_NODE_STATE] = self.setNodeState
d[PacketTypes.ANSWER_NODE_STATE] = self.answerNodeState
d[PacketTypes.SET_CLUSTER_STATE] = self.setClusterState
d[PacketTypes.ADD_PENDING_NODES] = self.addPendingNodes
d[PacketTypes.ANSWER_NEW_NODES] = self.answerNewNodes
d[PacketTypes.ASK_NODE_INFORMATION] = self.askNodeInformation
d[PacketTypes.ANSWER_NODE_INFORMATION] = self.answerNodeInformation
d[PacketTypes.ASK_CLUSTER_STATE] = self.askClusterState
d[PacketTypes.ANSWER_CLUSTER_STATE] = self.answerClusterState
d[PacketTypes.NOTIFY_CLUSTER_INFORMATION] = self.notifyClusterInformation
d[PacketTypes.NOTIFY_LAST_OID] = self.notifyLastOID
d[Packets.Error] = self.error
d[Packets.RequestIdentification] = self.requestIdentification
d[Packets.AcceptIdentification] = self.acceptIdentification
d[Packets.AskPrimary] = self.askPrimary
d[Packets.AnswerPrimary] = self.answerPrimary
d[Packets.AnnouncePrimary] = self.announcePrimary
d[Packets.ReelectPrimary] = self.reelectPrimary
d[Packets.NotifyNodeInformation] = self.notifyNodeInformation
d[Packets.AskLastIDs] = self.askLastIDs
d[Packets.AnswerLastIDs] = self.answerLastIDs
d[Packets.AskPartitionTable] = self.askPartitionTable
d[Packets.AnswerPartitionTable] = self.answerPartitionTable
d[Packets.SendPartitionTable] = self.sendPartitionTable
d[Packets.NotifyPartitionChanges] = self.notifyPartitionChanges
d[Packets.StartOperation] = self.startOperation
d[Packets.StopOperation] = self.stopOperation
d[Packets.AskUnfinishedTransactions] = self.askUnfinishedTransactions
d[Packets.AnswerUnfinishedTransactions] = self.answerUnfinishedTransactions
d[Packets.AskObjectPresent] = self.askObjectPresent
d[Packets.AnswerObjectPresent] = self.answerObjectPresent
d[Packets.DeleteTransaction] = self.deleteTransaction
d[Packets.CommitTransaction] = self.commitTransaction
d[Packets.AskBeginTransaction] = self.askBeginTransaction
d[Packets.AnswerBeginTransaction] = self.answerBeginTransaction
d[Packets.FinishTransaction] = self.finishTransaction
d[Packets.NotifyTransactionFinished] = self.notifyTransactionFinished
d[Packets.LockInformation] = self.lockInformation
d[Packets.NotifyInformationLocked] = self.notifyInformationLocked
d[Packets.InvalidateObjects] = self.invalidateObjects
d[Packets.UnlockInformation] = self.unlockInformation
d[Packets.AskNewOIDs] = self.askNewOIDs
d[Packets.AnswerNewOIDs] = self.answerNewOIDs
d[Packets.AskStoreObject] = self.askStoreObject
d[Packets.AnswerStoreObject] = self.answerStoreObject
d[Packets.AbortTransaction] = self.abortTransaction
d[Packets.AskStoreTransaction] = self.askStoreTransaction
d[Packets.AnswerStoreTransaction] = self.answerStoreTransaction
d[Packets.AskObject] = self.askObject
d[Packets.AnswerObject] = self.answerObject
d[Packets.AskTIDs] = self.askTIDs
d[Packets.AnswerTIDs] = self.answerTIDs
d[Packets.AskTransactionInformation] = self.askTransactionInformation
d[Packets.AnswerTransactionInformation] = self.answerTransactionInformation
d[Packets.AskObjectHistory] = self.askObjectHistory
d[Packets.AnswerObjectHistory] = self.answerObjectHistory
d[Packets.AskOIDs] = self.askOIDs
d[Packets.AnswerOIDs] = self.answerOIDs
d[Packets.AskPartitionList] = self.askPartitionList
d[Packets.AnswerPartitionList] = self.answerPartitionList
d[Packets.AskNodeList] = self.askNodeList
d[Packets.AnswerNodeList] = self.answerNodeList
d[Packets.SetNodeState] = self.setNodeState
d[Packets.AnswerNodeState] = self.answerNodeState
d[Packets.SetClusterState] = self.setClusterState
d[Packets.AddPendingNodes] = self.addPendingNodes
d[Packets.AnswerNewNodes] = self.answerNewNodes
d[Packets.AskNodeInformation] = self.askNodeInformation
d[Packets.AnswerNodeInformation] = self.answerNodeInformation
d[Packets.AskClusterState] = self.askClusterState
d[Packets.AnswerClusterState] = self.answerClusterState
d[Packets.NotifyClusterInformation] = self.notifyClusterInformation
d[Packets.NotifyLastOID] = self.notifyLastOID
return d
......
......@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import logging
from neo.protocol import PacketTypes
from neo.protocol import Packets, PacketMalformedError
from neo.util import dump
class PacketLogger(object):
......@@ -28,17 +28,21 @@ class PacketLogger(object):
def log(self, conn, packet, direction):
"""This is a helper method to handle various packet types."""
# default log message
type = packet.getType()
klass = packet.getType()
uuid = dump(conn.getUUID())
ip, port = conn.getAddress()
logging.debug('#0x%08x %-30s %s %s (%s:%d)', packet.getId(),
type, direction, uuid, ip, port)
logger = self.fetch_table.get(type, None)
packet.__class__.__name__, direction, uuid, ip, port)
logger = self.fetch_table.get(klass, None)
if logger is None:
logging.warning('No logger found for packet %s' % type)
logging.warning('No logger found for packet %s' % klass)
return
# enhanced log
try:
args = packet.decode() or ()
except PacketMalformedError:
logging.warning("Can't decode packet for logging")
return
log_message = logger(conn, packet, *args)
if log_message is not None:
logging.debug('#0x%08x %s', packet.getId(), log_message)
......@@ -49,25 +53,26 @@ class PacketLogger(object):
def error(self, conn, packet, code, message):
return "%s (%s)" % (code, message)
def requestNodeIdentification(self, conn, packet, node_type,
def requestIdentification(self, conn, packet, node_type,
uuid, address, name):
logging.debug('Request identification for cluster %s' % (name, ))
pass
def acceptNodeIdentification(self, conn, packet, node_type,
def acceptIdentification(self, conn, packet, node_type,
uuid, address, num_partitions, num_replicas, your_uuid):
pass
def askPrimaryMaster(self, conn, packet):
def askPrimary(self, conn, packet):
pass
def answerPrimaryMaster(self, conn, packet, primary_uuid,
def answerPrimary(self, conn, packet, primary_uuid,
known_master_list):
pass
def announcePrimaryMaster(self, conn, packet):
def announcePrimary(self, conn, packet):
pass
def reelectPrimaryMaster(self, conn, packet):
def reelectPrimary(self, conn, packet):
pass
def notifyNodeInformation(self, conn, packet, node_list):
......@@ -221,7 +226,7 @@ class PacketLogger(object):
def askNodeInformation(self, conn, packet):
pass
def answerNodeInformation(self, conn, packet, node_list):
def answerNodeInformation(self, conn, packet):
pass
def askClusterState(self, conn, packet):
......@@ -243,68 +248,68 @@ class PacketLogger(object):
# Fetch tables initialization
def initFetchTable(self):
d = {}
d[PacketTypes.ERROR] = self.error
d[PacketTypes.REQUEST_NODE_IDENTIFICATION] = self.requestNodeIdentification
d[PacketTypes.ACCEPT_NODE_IDENTIFICATION] = self.acceptNodeIdentification
d[PacketTypes.ASK_PRIMARY_MASTER] = self.askPrimaryMaster
d[PacketTypes.ANSWER_PRIMARY_MASTER] = self.answerPrimaryMaster
d[PacketTypes.ANNOUNCE_PRIMARY_MASTER] = self.announcePrimaryMaster
d[PacketTypes.REELECT_PRIMARY_MASTER] = self.reelectPrimaryMaster
d[PacketTypes.NOTIFY_NODE_INFORMATION] = self.notifyNodeInformation
d[PacketTypes.ASK_LAST_IDS] = self.askLastIDs
d[PacketTypes.ANSWER_LAST_IDS] = self.answerLastIDs
d[PacketTypes.ASK_PARTITION_TABLE] = self.askPartitionTable
d[PacketTypes.ANSWER_PARTITION_TABLE] = self.answerPartitionTable
d[PacketTypes.SEND_PARTITION_TABLE] = self.sendPartitionTable
d[PacketTypes.NOTIFY_PARTITION_CHANGES] = self.notifyPartitionChanges
d[PacketTypes.START_OPERATION] = self.startOperation
d[PacketTypes.STOP_OPERATION] = self.stopOperation
d[PacketTypes.ASK_UNFINISHED_TRANSACTIONS] = self.askUnfinishedTransactions
d[PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS] = self.answerUnfinishedTransactions
d[PacketTypes.ASK_OBJECT_PRESENT] = self.askObjectPresent
d[PacketTypes.ANSWER_OBJECT_PRESENT] = self.answerObjectPresent
d[PacketTypes.DELETE_TRANSACTION] = self.deleteTransaction
d[PacketTypes.COMMIT_TRANSACTION] = self.commitTransaction
d[PacketTypes.ASK_BEGIN_TRANSACTION] = self.askBeginTransaction
d[PacketTypes.ANSWER_BEGIN_TRANSACTION] = self.answerBeginTransaction
d[PacketTypes.FINISH_TRANSACTION] = self.finishTransaction
d[PacketTypes.NOTIFY_TRANSACTION_FINISHED] = self.notifyTransactionFinished
d[PacketTypes.LOCK_INFORMATION] = self.lockInformation
d[PacketTypes.NOTIFY_INFORMATION_LOCKED] = self.notifyInformationLocked
d[PacketTypes.INVALIDATE_OBJECTS] = self.invalidateObjects
d[PacketTypes.UNLOCK_INFORMATION] = self.unlockInformation
d[PacketTypes.ASK_NEW_OIDS] = self.askNewOIDs
d[PacketTypes.ANSWER_NEW_OIDS] = self.answerNewOIDs
d[PacketTypes.ASK_STORE_OBJECT] = self.askStoreObject
d[PacketTypes.ANSWER_STORE_OBJECT] = self.answerStoreObject
d[PacketTypes.ABORT_TRANSACTION] = self.abortTransaction
d[PacketTypes.ASK_STORE_TRANSACTION] = self.askStoreTransaction
d[PacketTypes.ANSWER_STORE_TRANSACTION] = self.answerStoreTransaction
d[PacketTypes.ASK_OBJECT] = self.askObject
d[PacketTypes.ANSWER_OBJECT] = self.answerObject
d[PacketTypes.ASK_TIDS] = self.askTIDs
d[PacketTypes.ANSWER_TIDS] = self.answerTIDs
d[PacketTypes.ASK_TRANSACTION_INFORMATION] = self.askTransactionInformation
d[PacketTypes.ANSWER_TRANSACTION_INFORMATION] = self.answerTransactionInformation
d[PacketTypes.ASK_OBJECT_HISTORY] = self.askObjectHistory
d[PacketTypes.ANSWER_OBJECT_HISTORY] = self.answerObjectHistory
d[PacketTypes.ASK_OIDS] = self.askOIDs
d[PacketTypes.ANSWER_OIDS] = self.answerOIDs
d[PacketTypes.ASK_PARTITION_LIST] = self.askPartitionList
d[PacketTypes.ANSWER_PARTITION_LIST] = self.answerPartitionList
d[PacketTypes.ASK_NODE_LIST] = self.askNodeList
d[PacketTypes.ANSWER_NODE_LIST] = self.answerNodeList
d[PacketTypes.SET_NODE_STATE] = self.setNodeState
d[PacketTypes.ANSWER_NODE_STATE] = self.answerNodeState
d[PacketTypes.SET_CLUSTER_STATE] = self.setClusterState
d[PacketTypes.ADD_PENDING_NODES] = self.addPendingNodes
d[PacketTypes.ANSWER_NEW_NODES] = self.answerNewNodes
d[PacketTypes.ASK_NODE_INFORMATION] = self.askNodeInformation
d[PacketTypes.ANSWER_NODE_INFORMATION] = self.answerNodeInformation
d[PacketTypes.ASK_CLUSTER_STATE] = self.askClusterState
d[PacketTypes.ANSWER_CLUSTER_STATE] = self.answerClusterState
d[PacketTypes.NOTIFY_CLUSTER_INFORMATION] = self.notifyClusterInformation
d[PacketTypes.NOTIFY_LAST_OID] = self.notifyLastOID
d[Packets.Error] = self.error
d[Packets.RequestIdentification] = self.requestIdentification
d[Packets.AcceptIdentification] = self.acceptIdentification
d[Packets.AskPrimary] = self.askPrimary
d[Packets.AnswerPrimary] = self.answerPrimary
d[Packets.AnnouncePrimary] = self.announcePrimary
d[Packets.ReelectPrimary] = self.reelectPrimary
d[Packets.NotifyNodeInformation] = self.notifyNodeInformation
d[Packets.AskLastIDs] = self.askLastIDs
d[Packets.AnswerLastIDs] = self.answerLastIDs
d[Packets.AskPartitionTable] = self.askPartitionTable
d[Packets.AnswerPartitionTable] = self.answerPartitionTable
d[Packets.SendPartitionTable] = self.sendPartitionTable
d[Packets.NotifyPartitionChanges] = self.notifyPartitionChanges
d[Packets.StartOperation] = self.startOperation
d[Packets.StopOperation] = self.stopOperation
d[Packets.AskUnfinishedTransactions] = self.askUnfinishedTransactions
d[Packets.AnswerUnfinishedTransactions] = self.answerUnfinishedTransactions
d[Packets.AskObjectPresent] = self.askObjectPresent
d[Packets.AnswerObjectPresent] = self.answerObjectPresent
d[Packets.DeleteTransaction] = self.deleteTransaction
d[Packets.CommitTransaction] = self.commitTransaction
d[Packets.AskBeginTransaction] = self.askBeginTransaction
d[Packets.AnswerBeginTransaction] = self.answerBeginTransaction
d[Packets.FinishTransaction] = self.finishTransaction
d[Packets.NotifyTransactionFinished] = self.notifyTransactionFinished
d[Packets.LockInformation] = self.lockInformation
d[Packets.NotifyInformationLocked] = self.notifyInformationLocked
d[Packets.InvalidateObjects] = self.invalidateObjects
d[Packets.UnlockInformation] = self.unlockInformation
d[Packets.AskNewOIDs] = self.askNewOIDs
d[Packets.AnswerNewOIDs] = self.answerNewOIDs
d[Packets.AskStoreObject] = self.askStoreObject
d[Packets.AnswerStoreObject] = self.answerStoreObject
d[Packets.AbortTransaction] = self.abortTransaction
d[Packets.AskStoreTransaction] = self.askStoreTransaction
d[Packets.AnswerStoreTransaction] = self.answerStoreTransaction
d[Packets.AskObject] = self.askObject
d[Packets.AnswerObject] = self.answerObject
d[Packets.AskTIDs] = self.askTIDs
d[Packets.AnswerTIDs] = self.answerTIDs
d[Packets.AskTransactionInformation] = self.askTransactionInformation
d[Packets.AnswerTransactionInformation] = self.answerTransactionInformation
d[Packets.AskObjectHistory] = self.askObjectHistory
d[Packets.AnswerObjectHistory] = self.answerObjectHistory
d[Packets.AskOIDs] = self.askOIDs
d[Packets.AnswerOIDs] = self.answerOIDs
d[Packets.AskPartitionList] = self.askPartitionList
d[Packets.AnswerPartitionList] = self.answerPartitionList
d[Packets.AskNodeList] = self.askNodeList
d[Packets.AnswerNodeList] = self.answerNodeList
d[Packets.SetNodeState] = self.setNodeState
d[Packets.AnswerNodeState] = self.answerNodeState
d[Packets.SetClusterState] = self.setClusterState
d[Packets.AddPendingNodes] = self.addPendingNodes
d[Packets.AnswerNewNodes] = self.answerNewNodes
d[Packets.AskNodeInformation] = self.askNodeInformation
d[Packets.AnswerNodeInformation] = self.answerNodeInformation
d[Packets.AskClusterState] = self.askClusterState
d[Packets.AnswerClusterState] = self.answerClusterState
d[Packets.NotifyClusterInformation] = self.notifyClusterInformation
d[Packets.NotifyLastOID] = self.notifyLastOID
return d
......
......@@ -21,7 +21,8 @@ from time import time
from struct import pack, unpack
from neo import protocol
from neo.protocol import UUID_NAMESPACES, ClusterStates, NodeStates, NodeTypes
from neo.protocol import UUID_NAMESPACES
from neo.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.node import NodeManager
from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection
......@@ -201,7 +202,7 @@ class Application(object):
self.primary = True
logging.debug('I am the primary, so sending an announcement')
for conn in em.getClientList():
conn.notify(protocol.announcePrimaryMaster())
conn.notify(Packets.AnnouncePrimary())
conn.abort()
t = time()
while em.getClientList():
......@@ -241,7 +242,7 @@ class Application(object):
# Ask all connected nodes to reelect a single primary master.
for conn in em.getClientList():
conn.notify(protocol.reelectPrimaryMaster())
conn.notify(Packets.ReelectPrimary())
conn.abort()
# Wait until the connections are closed.
......@@ -282,12 +283,12 @@ class Application(object):
n = self.nm.getByUUID(c.getUUID())
if n.isMaster() or n.isStorage() or n.isAdmin():
node_list = [(node_type, address, uuid, state)]
c.notify(protocol.notifyNodeInformation(node_list))
c.notify(Packets.NotifyNodeInformation(node_list))
elif node.isMaster() or node.isStorage():
for c in self.em.getConnectionList():
if c.getUUID() is not None:
node_list = [(node_type, address, uuid, state)]
c.notify(protocol.notifyNodeInformation(node_list))
c.notify(Packets.NotifyNodeInformation(node_list))
elif not node.isAdmin():
raise RuntimeError('unknown node type')
......@@ -307,7 +308,7 @@ class Application(object):
while size:
amt = min(10000, size)
cell_list = cell_list[start:start+amt]
p = protocol.notifyPartitionChanges(ptid, cell_list)
p = Packets.NotifyPartitionChanges(ptid, cell_list)
c.notify(p)
size -= amt
start += amt
......@@ -325,10 +326,10 @@ class Application(object):
row_list.append((offset, self.pt.getRow(offset)))
# Split the packet if too huge.
if len(row_list) == 1000:
conn.notify(protocol.sendPartitionTable( self.pt.getID(), row_list))
conn.notify(Packets.SendPartitionTable( self.pt.getID(), row_list))
del row_list[:]
if row_list:
conn.notify(protocol.sendPartitionTable(self.pt.getID(), row_list))
conn.notify(Packets.SendPartitionTable(self.pt.getID(), row_list))
def sendNodesInformations(self, conn):
""" Send informations on all nodes through the given connection """
......@@ -338,14 +339,14 @@ class Application(object):
node_list.append(n.asTuple())
# Split the packet if too huge.
if len(node_list) == 10000:
conn.notify(protocol.notifyNodeInformation(node_list))
conn.notify(Packets.NotifyNodeInformation(node_list))
del node_list[:]
if node_list:
conn.notify(protocol.notifyNodeInformation(node_list))
conn.notify(Packets.NotifyNodeInformation(node_list))
def broadcastLastOID(self, oid):
logging.debug('Broadcast last OID to storages : %s' % dump(oid))
packet = protocol.notifyLastOID(oid)
packet = Packets.NotifyLastOID(oid)
for conn in self.em.getConnectionList():
node = self.nm.getByUUID(conn.getUUID())
if node is not None and node.isStorage():
......@@ -424,7 +425,7 @@ class Application(object):
uuid = conn.getUUID()
if uuid in transaction_uuid_list:
self.asking_uuid_dict[uuid] = False
conn.ask(protocol.askTransactionInformation(tid))
conn.ask(Packets.AskTransactionInformation(tid))
if len(self.asking_uuid_dict) == 0:
raise VerificationFailure
......@@ -454,7 +455,7 @@ class Application(object):
uuid = conn.getUUID()
if uuid in object_uuid_list:
self.asking_uuid_dict[uuid] = False
conn.ask(protocol.askObjectPresent(oid, tid))
conn.ask(Packets.AskObjectPresent(oid, tid))
while 1:
em.poll(1)
......@@ -506,7 +507,7 @@ class Application(object):
node = nm.getByUUID(uuid)
if node.isStorage():
self.asking_uuid_dict[uuid] = False
conn.ask(protocol.askUnfinishedTransactions())
conn.ask(Packets.AskUnfinishedTransactions())
while 1:
em.poll(1)
......@@ -527,12 +528,12 @@ class Application(object):
if uuid is not None:
node = nm.getByUUID(uuid)
if node.isStorage():
conn.notify(protocol.deleteTransaction(tid))
conn.notify(Packets.DeleteTransaction(tid))
else:
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in uuid_set:
conn.ask(protocol.commitTransaction(tid))
conn.ask(Packets.CommitTransaction(tid))
# If possible, send the packets now.
em.poll(0)
......@@ -572,7 +573,7 @@ class Application(object):
for conn in em.getConnectionList():
node = nm.getByUUID(conn.getUUID())
if node is not None and (node.isStorage() or node.isClient()):
conn.notify(protocol.stopOperation())
conn.notify(Packets.StopOperation())
if node.isClient():
conn.abort()
......@@ -625,7 +626,7 @@ class Application(object):
# apply the new handler to the primary connection
client_list = self.em.getClientList()
assert len(client_list) == 1
client_list[0].setHandler(secondary.PrimaryMasterHandler(self))
client_list[0].setHandler(secondary.PrimaryHandler(self))
# and another for the future incoming connections
handler = identification.IdentificationHandler(self)
......@@ -653,7 +654,7 @@ class Application(object):
RuntimeError('Unexpected node type')
# change handlers
notification_packet = protocol.notifyClusterInformation(state)
notification_packet = Packets.NotifyClusterInformation(state)
for conn in em.getConnectionList():
node = nm.getByUUID(conn.getUUID())
if conn.isListening() or node is None:
......@@ -725,7 +726,7 @@ class Application(object):
if node.isClient():
node_list = [(node.getType(), node.getAddress(),
node.getUUID(), NodeStates.DOWN)]
c.notify(protocol.notifyNodeInformation(node_list))
c.notify(Packets.NotifyNodeInformation(node_list))
# then ask storages and master nodes to shutdown
logging.info("asking all remaining nodes to shutdown")
for c in self.em.getConnectionList():
......@@ -733,7 +734,7 @@ class Application(object):
if node.isStorage() or node.isMaster():
node_list = [(node.getType(), node.getAddress(),
node.getUUID(), NodeStates.DOWN)]
c.notify(protocol.notifyNodeInformation(node_list))
c.notify(Packets.NotifyNodeInformation(node_list))
# then shutdown
sys.exit("Cluster has been asked to shut down")
......
......@@ -17,9 +17,8 @@
from neo import logging
from neo import protocol
from neo.handler import EventHandler
from neo.protocol import NodeTypes, NodeStates
from neo.protocol import NodeTypes, NodeStates, Packets
class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
......@@ -27,9 +26,9 @@ class MasterHandler(EventHandler):
def protocolError(self, conn, packet, message):
logging.error('Protocol error %s %s' % (message, conn.getAddress()))
def askPrimaryMaster(self, conn, packet):
def askPrimary(self, conn, packet):
if conn.getConnector() is None:
# Connection can be closed by peer after he sent AskPrimaryMaster
# Connection can be closed by peer after he sent AskPrimary
# if he finds the primary master before we answer him.
# The connection gets closed before this message gets processed
# because this message might have been queued, but connection
......@@ -48,7 +47,7 @@ class MasterHandler(EventHandler):
if n.isBroken():
continue
known_master_list.append((n.getAddress(), n.getUUID(), ))
conn.answer(protocol.answerPrimaryMaster(
conn.answer(Packets.AnswerPrimary(
primary_uuid,
known_master_list),
packet.getId(),
......@@ -57,17 +56,17 @@ class MasterHandler(EventHandler):
def askClusterState(self, conn, packet):
assert conn.getUUID() is not None
state = self.app.getClusterState()
conn.answer(protocol.answerClusterState(state), packet.getId())
conn.answer(Packets.AnswerClusterState(state), packet.getId())
def askNodeInformation(self, conn, packet):
self.app.sendNodesInformations(conn)
conn.answer(protocol.answerNodeInformation([]), packet.getId())
conn.answer(Packets.AnswerNodeInformation(), packet.getId())
def askPartitionTable(self, conn, packet, offset_list):
assert len(offset_list) == 0
app = self.app
app.sendPartitionTable(conn)
conn.answer(protocol.answerPartitionTable(app.pt.getID(), []),
conn.answer(Packets.AnswerPartitionTable(app.pt.getID(), []),
packet.getId())
......
......@@ -19,7 +19,7 @@ from neo import logging
from neo import protocol
from neo.master.handlers import MasterHandler
from neo.protocol import ClusterStates, NodeStates
from neo.protocol import ClusterStates, NodeStates, Packets
from neo.util import dump
class AdministrationHandler(MasterHandler):
......@@ -29,10 +29,10 @@ class AdministrationHandler(MasterHandler):
node = self.app.nm.getByUUID(conn.getUUID())
self.app.nm.remove(node)
def askPrimaryMaster(self, conn, packet):
def askPrimary(self, conn, packet):
app = self.app
# I'm the primary
conn.answer(protocol.answerPrimaryMaster(app.uuid, []), packet.getId())
conn.answer(Packets.AnswerPrimary(app.uuid, []), packet.getId())
def setClusterState(self, conn, packet, state):
self.app.changeClusterState(state)
......@@ -123,8 +123,8 @@ class AdministrationHandler(MasterHandler):
# start nodes
for s_conn in em.getConnectionList():
if s_conn.getUUID() in uuid_set:
s_conn.notify(protocol.notifyLastOID(app.loid))
s_conn.notify(protocol.startOperation())
s_conn.notify(Packets.NotifyLastOID(app.loid))
s_conn.notify(Packets.StartOperation())
# broadcast the new partition table
app.broadcastPartitionChanges(app.pt.setNextID(), cell_list)
p = protocol.noError('node added')
......
......@@ -18,9 +18,8 @@
from neo import logging
from neo import protocol
from neo.protocol import NodeStates
from neo.protocol import NodeStates, Packets, UnexpectedPacketError
from neo.master.handlers import BaseServiceHandler
from neo.protocol import UnexpectedPacketError
from neo.util import dump, getNextTID
class FinishingTransaction(object):
......@@ -90,11 +89,11 @@ class ClientServiceHandler(BaseServiceHandler):
tid = getNextTID(app.ltid)
app.ltid = tid
app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.answer(protocol.answerBeginTransaction(tid), packet.getId())
conn.answer(Packets.AnswerBeginTransaction(tid), packet.getId())
def askNewOIDs(self, conn, packet, num_oids):
oid_list = self.app.getNewOIDList(num_oids)
conn.answer(protocol.answerNewOIDs(oid_list), packet.getId())
conn.answer(Packets.AnswerNewOIDs(oid_list), packet.getId())
def finishTransaction(self, conn, packet, oid_list, tid):
app = self.app
......@@ -121,7 +120,7 @@ class ClientServiceHandler(BaseServiceHandler):
used_uuid_set = set()
for c in app.em.getConnectionList():
if c.getUUID() in uuid_set:
c.ask(protocol.lockInformation(tid), timeout=60)
c.ask(Packets.LockInformation(tid), timeout=60)
used_uuid_set.add(c.getUUID())
try:
......
......@@ -18,7 +18,7 @@
from neo import logging
from neo import protocol
from neo.protocol import NodeTypes, NodeStates
from neo.protocol import NodeTypes, NodeStates, Packets
from neo.master.handlers import MasterHandler
from neo.exception import ElectionFailure
......@@ -78,7 +78,7 @@ class ClientElectionHandler(ElectionHandler):
MasterHandler.connectionStarted(self, conn)
def connectionCompleted(self, conn):
conn.ask(protocol.askPrimaryMaster())
conn.ask(Packets.AskPrimary())
MasterHandler.connectionCompleted(self, conn)
def connectionClosed(self, conn):
......@@ -110,7 +110,7 @@ class ClientElectionHandler(ElectionHandler):
app.negotiating_master_node_set.discard(addr)
MasterHandler.peerBroken(self, conn)
def acceptNodeIdentification(self, conn, packet, node_type,
def acceptIdentification(self, conn, packet, node_type,
uuid, address, num_partitions,
num_replicas, your_uuid):
app = self.app
......@@ -146,10 +146,10 @@ class ClientElectionHandler(ElectionHandler):
app.negotiating_master_node_set.discard(conn.getAddress())
def answerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
def answerPrimary(self, conn, packet, primary_uuid, known_master_list):
if conn.getConnector() is None:
# Connection can be closed by peer after he sent
# AnswerPrimaryMaster if he finds the primary master before we
# AnswerPrimary if he finds the primary master before we
# give him our UUID.
# The connection gets closed before this message gets processed
# because this message might have been queued, but connection
......@@ -198,7 +198,7 @@ class ClientElectionHandler(ElectionHandler):
[primary_server])
# Request a node idenfitication.
conn.ask(protocol.requestNodeIdentification(
conn.ask(Packets.RequestIdentification(
NodeTypes.MASTER,
app.uuid,
app.server,
......@@ -208,7 +208,7 @@ class ClientElectionHandler(ElectionHandler):
class ServerElectionHandler(ElectionHandler):
def reelectPrimaryMaster(self, conn, packet):
def reelectPrimary(self, conn, packet):
raise ElectionFailure, 'reelection requested'
def peerBroken(self, conn):
......@@ -219,11 +219,11 @@ class ServerElectionHandler(ElectionHandler):
node.setBroken()
MasterHandler.peerBroken(self, conn)
def requestNodeIdentification(self, conn, packet, node_type,
def requestIdentification(self, conn, packet, node_type,
uuid, address, name):
if conn.getConnector() is None:
# Connection can be closed by peer after he sent
# RequestNodeIdentification if he finds the primary master before
# RequestIdentification if he finds the primary master before
# we answer him.
# The connection gets closed before this message gets processed
# because this message might have been queued, but connection
......@@ -250,7 +250,7 @@ class ServerElectionHandler(ElectionHandler):
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(
p = Packets.AcceptIdentification(
NodeTypes.MASTER,
app.uuid,
app.server,
......@@ -260,7 +260,7 @@ class ServerElectionHandler(ElectionHandler):
)
conn.answer(p, packet.getId())
def announcePrimaryMaster(self, conn, packet):
def announcePrimary(self, conn, packet):
uuid = conn.getUUID()
if uuid is None:
raise protocol.UnexpectedPacketError
......
......@@ -18,7 +18,7 @@
from neo import logging
from neo import protocol
from neo.protocol import NodeTypes
from neo.protocol import NodeTypes, Packets
from neo.master.handlers import MasterHandler
class IdentificationHandler(MasterHandler):
......@@ -27,7 +27,7 @@ class IdentificationHandler(MasterHandler):
def nodeLost(self, conn, node):
logging.warning('lost a node in IdentificationHandler : %s' % node)
def requestNodeIdentification(self, conn, packet, node_type,
def requestIdentification(self, conn, packet, node_type,
uuid, address, name):
self.checkClusterName(name)
......@@ -76,7 +76,7 @@ class IdentificationHandler(MasterHandler):
# answer
args = (NodeTypes.MASTER, app.uuid, app.server,
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
conn.answer(protocol.acceptNodeIdentification(*args), packet.getId())
conn.answer(Packets.AcceptIdentification(*args), packet.getId())
# trigger the event
handler.connectionCompleted(conn)
app.broadcastNodeInformation(node)
......
......@@ -17,7 +17,7 @@
from neo import logging
from neo import protocol
from neo.protocol import Packets
from neo.master.handlers import MasterHandler
from neo.util import dump
......@@ -26,7 +26,7 @@ class RecoveryHandler(MasterHandler):
def connectionCompleted(self, conn):
# ask the last IDs to perform the recovery
conn.ask(protocol.askLastIDs())
conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, conn, packet, loid, ltid, lptid):
app = self.app
......@@ -39,7 +39,7 @@ class RecoveryHandler(MasterHandler):
# something newer
app.target_uuid = conn.getUUID()
app.pt.setID(lptid)
conn.ask(protocol.askPartitionTable([]))
conn.ask(Packets.AskPartitionTable([]))
def answerPartitionTable(self, conn, packet, ptid, row_list):
uuid = conn.getUUID()
......
......@@ -34,17 +34,17 @@ class SecondaryMasterHandler(MasterHandler):
def connectionCompleted(self, conn):
pass
def announcePrimaryMaster(self, conn, packet):
def announcePrimary(self, conn, packet):
raise ElectionFailure, 'another primary arises'
def reelectPrimaryMaster(self, conn, packet):
def reelectPrimary(self, conn, packet):
raise ElectionFailure, 'reelection requested'
def notifyNodeInformation(self, conn, packet, node_list):
logging.error('/!\ NotifyNodeInformation packet from secondary master')
class PrimaryMasterHandler(MasterHandler):
class PrimaryHandler(MasterHandler):
""" Handler used by secondaries to handle primary master"""
def packetReceived(self, conn, packet):
......@@ -58,10 +58,10 @@ class PrimaryMasterHandler(MasterHandler):
self.app.primary_master_node.setDown()
raise PrimaryFailure, 'primary master is dead'
def announcePrimaryMaster(self, conn, packet):
def announcePrimary(self, conn, packet):
raise protocol.UnexpectedPacketError
def reelectPrimaryMaster(self, conn, packet):
def reelectPrimary(self, conn, packet):
raise ElectionFailure, 'reelection requested'
def notifyNodeInformation(self, conn, packet, node_list):
......@@ -86,7 +86,7 @@ class PrimaryMasterHandler(MasterHandler):
if n.getUUID() is None:
n.setUUID(uuid)
def acceptNodeIdentification(self, conn, packet, node_type,
def acceptIdentification(self, conn, packet, node_type,
uuid, address, num_partitions,
num_replicas, your_uuid):
app = self.app
......@@ -101,7 +101,7 @@ class PrimaryMasterHandler(MasterHandler):
conn.setUUID(uuid)
node.setUUID(uuid)
def answerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
def answerPrimary(self, conn, packet, primary_uuid, known_master_list):
pass
def notifyClusterInformation(self, conn, packet, state):
......
......@@ -22,13 +22,13 @@ from neo.master.handlers import BaseServiceHandler
class ShutdownHandler(BaseServiceHandler):
"""This class deals with events for a shutting down phase."""
def requestNodeIdentification(self, conn, packet, node_type,
def requestIdentification(self, conn, packet, node_type,
uuid, address, name):
logging.error('reject any new connection')
raise protocol.ProtocolError('cluster is shutting down')
def askPrimaryMaster(self, conn, packet):
def askPrimary(self, conn, packet):
logging.error('reject any new demand for primary master')
raise protocol.ProtocolError('cluster is shutting down')
......
......@@ -19,7 +19,7 @@ from neo import logging
from neo import protocol
from neo.protocol import UnexpectedPacketError, ProtocolError
from neo.protocol import CellStates, ErrorCodes
from neo.protocol import CellStates, ErrorCodes, Packets
from neo.master.handlers import BaseServiceHandler
from neo.exception import OperationFailure
from neo.util import dump
......@@ -31,8 +31,8 @@ class StorageServiceHandler(BaseServiceHandler):
def connectionCompleted(self, conn):
node = self.app.nm.getByUUID(conn.getUUID())
if node.isRunning():
conn.notify(protocol.notifyLastOID(self.app.loid))
conn.notify(protocol.startOperation())
conn.notify(Packets.NotifyLastOID(self.app.loid))
conn.notify(Packets.StartOperation())
def nodeLost(self, conn, node):
logging.info('storage node lost')
......@@ -44,11 +44,11 @@ class StorageServiceHandler(BaseServiceHandler):
def askLastIDs(self, conn, packet):
app = self.app
conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.pt.getID()), packet.getId())
conn.answer(Packets.AnswerLastIDs(app.loid, app.ltid, app.pt.getID()), packet.getId())
def askUnfinishedTransactions(self, conn, packet):
app = self.app
p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys())
p = Packets.AnswerUnfinishedTransactions(app.finishing_transaction_dict.keys())
conn.answer(p, packet.getId())
def notifyInformationLocked(self, conn, packet, tid):
......@@ -75,14 +75,14 @@ class StorageServiceHandler(BaseServiceHandler):
node = app.nm.getByUUID(uuid)
if node.isClient():
if c is t.getConnection():
p = protocol.notifyTransactionFinished(tid)
p = Packets.NotifyTransactionFinished(tid)
c.answer(p, t.getMessageId())
else:
p = protocol.invalidateObjects(t.getOIDList(), tid)
p = Packets.InvalidateObjects(t.getOIDList(), tid)
c.notify(p)
elif node.isStorage():
if uuid in t.getUUIDSet():
p = protocol.unlockInformation(tid)
p = Packets.UnlockInformation(tid)
c.notify(p)
del app.finishing_transaction_dict[tid]
except KeyError:
......
......@@ -24,7 +24,7 @@ action_dict = {
'pt': 'getPartitionRowList',
'node': 'getNodeList',
'cluster': 'getClusterState',
'primary': 'getPrimaryMaster',
'primary': 'getPrimary',
},
'set': {
'node': 'setNodeState',
......@@ -179,11 +179,11 @@ class TerminalNeoCTL(object):
assert len(params) == 1
self.neoctl.dropNode(self.asNode(params[0]))
def getPrimaryMaster(self, params):
def getPrimary(self, params):
"""
Get primary master node.
"""
return self.formatUUID(self.neoctl.getPrimaryMaster())
return self.formatUUID(self.neoctl.getPrimary())
class Application(object):
"""The storage node application."""
......
......@@ -64,4 +64,4 @@ class CommandEventHandler(EventHandler):
answerNodeState = __answer
answerClusterState = __answer
answerNewNodes = __answer
answerPrimaryMaster = __answer
answerPrimary = __answer
......@@ -20,7 +20,7 @@ from neo.connection import ClientConnection
from neo.event import EventManager
from neo.neoctl.handler import CommandEventHandler
from neo import protocol
from neo.protocol import ClusterStates, NodeStates, ErrorCodes, PacketTypes
from neo.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
class NotReadyException(Exception):
pass
......@@ -59,7 +59,7 @@ class NeoCTL(object):
if not self.connected:
raise NotReadyException, 'Connection closed'
response = response_queue.pop()
if response[0] == PacketTypes.ERROR and \
if response[0] == Packets.Error and \
response[1] == ErrorCodes.NOT_READY:
raise NotReadyException(response[2])
return response
......@@ -68,18 +68,18 @@ class NeoCTL(object):
"""
Put all given storage nodes in "running" state.
"""
packet = protocol.addPendingNodes(uuid_list)
packet = Packets.AddPendingNodes(uuid_list)
response = self.__ask(packet)
assert response[0] == PacketTypes.ERROR
assert response[0] == Packets.Error
assert response[1] == ErrorCodes.NO_ERROR
def setClusterState(self, state):
"""
Set cluster state.
"""
packet = protocol.setClusterState(state)
packet = Packets.SetClusterState(state)
response = self.__ask(packet)
assert response[0] == PacketTypes.ERROR
assert response[0] == Packets.Error
assert response[1] == ErrorCodes.NO_ERROR
def setNodeState(self, node, state, update_partition_table=False):
......@@ -90,27 +90,27 @@ class NeoCTL(object):
update_partition_table = 1
else:
update_partition_table = 0
packet = protocol.setNodeState(node, state, update_partition_table)
packet = Packets.SetNodeState(node, state, update_partition_table)
response = self.__ask(packet)
assert response[0] == PacketTypes.ERROR
assert response[0] == Packets.Error
assert response[1] == ErrorCodes.NO_ERROR
def getClusterState(self):
"""
Get cluster state.
"""
packet = protocol.askClusterState()
packet = Packets.AskClusterState()
response = self.__ask(packet)
assert response[0] == PacketTypes.ANSWER_CLUSTER_STATE
assert response[0] == Packets.AnswerClusterState
return response[1]
def getNodeList(self, node_type=None):
"""
Get a list of nodes, filtering with given type.
"""
packet = protocol.askNodeList(node_type)
packet = Packets.AskNodeList(node_type)
response = self.__ask(packet)
assert response[0] == PacketTypes.ANSWER_NODE_LIST
assert response[0] == Packets.AnswerNodeList
return response[1]
def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
......@@ -118,9 +118,9 @@ class NeoCTL(object):
Get a list of partition rows, bounded by min & max and involving
given node.
"""
packet = protocol.askPartitionList(min_offset, max_offset, node)
packet = Packets.AskPartitionList(min_offset, max_offset, node)
response = self.__ask(packet)
assert response[0] == PacketTypes.ANSWER_PARTITION_LIST
assert response[0] == Packets.AnswerPartitionList
return (response[1], response[2])
def startCluster(self):
......@@ -135,12 +135,12 @@ class NeoCTL(object):
"""
self.setNodeState(node, NodeStates.DOWN, update_partition_table=1)
def getPrimaryMaster(self):
def getPrimary(self):
"""
Return the primary master UUID.
"""
packet = protocol.askPrimaryMaster()
packet = Packets.AskPrimary()
response = self.__ask(packet)
assert response[0] == PacketTypes.ANSWER_PRIMARY_MASTER
assert response[0] == Packets.AnswerPrimary
return response[1]
......@@ -362,6 +362,6 @@ class NodeManager(object):
address = node.getAddress() or ''
if address:
address = '%s:%d' % address
logging.debug(' * %32s | %17s | %22s | %s' % (
logging.debug(' * %32s | %8s | %22s | %s' % (
uuid, node.getType(), address, node.getState()))
......@@ -19,8 +19,7 @@ from neo import logging
import sys
from collections import deque
from neo import protocol
from neo.protocol import NodeTypes, CellStates
from neo.protocol import NodeTypes, CellStates, Packets
from neo.node import NodeManager
from neo.event import EventManager
from neo.storage.mysqldb import MySQLDatabaseManager
......@@ -159,7 +158,7 @@ class Application(object):
# until the user explicitly requests a shutdown.
while 1:
# look for the primary master
self.connectToPrimaryMaster()
self.connectToPrimary()
self.operational = False
try:
while 1:
......@@ -180,7 +179,7 @@ class Application(object):
except PrimaryFailure, msg:
logging.error('primary master is down : %s' % msg)
def connectToPrimaryMaster(self):
def connectToPrimary(self):
"""Find a primary master node, and connect to it.
If a primary master node is not elected or ready, repeat
......@@ -242,8 +241,8 @@ class Application(object):
self.has_node_information = False
self.has_partition_table = False
self.pt.clear()
self.master_conn.ask(protocol.askNodeInformation())
self.master_conn.ask(protocol.askPartitionTable(()))
self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable(()))
while not self.has_node_information or not self.has_partition_table:
self.em.poll(1)
self.ready = True
......
......@@ -21,7 +21,7 @@ from neo.handler import EventHandler
from neo import protocol
from neo.util import dump
from neo.exception import PrimaryFailure, OperationFailure
from neo.protocol import NodeStates
from neo.protocol import NodeStates, Packets
class BaseStorageHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
......@@ -33,7 +33,7 @@ class BaseMasterHandler(BaseStorageHandler):
def connectionLost(self, conn, new_state):
raise PrimaryFailure('connection lost')
def reelectPrimaryMaster(self, conn, packet):
def reelectPrimary(self, conn, packet):
raise PrimaryFailure('re-election occurs')
def notifyClusterInformation(self, conn, packet, state):
......@@ -85,7 +85,7 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
tid_list = app.dm.getTIDList(first, last - first,
app.pt.getPartitions(), partition_list)
conn.answer(protocol.answerTIDs(tid_list), packet.getId())
conn.answer(Packets.AnswerTIDs(tid_list), packet.getId())
def askObjectHistory(self, conn, packet, oid, first, last):
if first >= last:
......@@ -95,7 +95,7 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None:
history_list = []
p = protocol.answerObjectHistory(oid, history_list)
p = Packets.AnswerObjectHistory(oid, history_list)
conn.answer(p, packet.getId())
def askTransactionInformation(self, conn, packet, tid):
......@@ -104,7 +104,7 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
if t is None:
p = protocol.tidNotFound('%s does not exist' % dump(tid))
else:
p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], t[0])
conn.answer(p, packet.getId())
def askObject(self, conn, packet, oid, serial, tid):
......@@ -119,7 +119,7 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
serial, next_serial, compression, checksum, data = o
logging.debug('oid = %s, serial = %s, next_serial = %s',
dump(oid), dump(serial), dump(next_serial))
p = protocol.answerObject(oid, serial, next_serial,
p = Packets.AnswerObject(oid, serial, next_serial,
compression, checksum, data)
else:
logging.debug('oid = %s not found', dump(oid))
......
......@@ -18,6 +18,7 @@
from neo import logging
from neo import protocol
from neo.protocol import Packets
from neo.storage.handlers import BaseClientAndStorageOperationHandler
from neo.util import dump
......@@ -112,7 +113,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
if t.isLastOIDChanged():
self.app.dm.setLastOID(self.app.loid)
t.addTransaction(oid_list, user, desc, ext)
conn.answer(protocol.answerStoreTransaction(tid), packet.getId())
conn.answer(Packets.AnswerStoreTransaction(tid), packet.getId())
def askStoreObject(self, conn, packet, oid, serial,
compression, checksum, data, tid):
......@@ -130,7 +131,7 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
# If a newer transaction already locks this object,
# do not try to resolve a conflict, so return immediately.
logging.info('unresolvable conflict in %s', dump(oid))
p = protocol.answerStoreObject(1, oid, locking_tid)
p = Packets.AnswerStoreObject(1, oid, locking_tid)
conn.answer(p, packet.getId())
return
......@@ -140,13 +141,13 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
last_serial = history_list[0][0]
if last_serial != serial:
logging.info('resolvable conflict in %s', dump(oid))
p = protocol.answerStoreObject(1, oid, last_serial)
p = Packets.AnswerStoreObject(1, oid, last_serial)
conn.answer(p, packet.getId())
return
# Now store the object.
t = app.transaction_dict.setdefault(tid, TransactionInformation(uuid))
t.addObject(oid, compression, checksum, data)
p = protocol.answerStoreObject(0, oid, serial)
p = Packets.AnswerStoreObject(0, oid, serial)
conn.answer(p, packet.getId())
app.store_lock_dict[oid] = tid
......
......@@ -42,15 +42,15 @@ class HiddenHandler(BaseMasterHandler):
erase_db = state == NodeStates.DOWN
self.app.shutdown(erase=erase_db)
def requestNodeIdentification(self, conn, packet, node_type,
def requestIdentification(self, conn, packet, node_type,
uuid, address, name):
pass
def acceptNodeIdentification(self, conn, packet, node_type,
def acceptIdentification(self, conn, packet, node_type,
uuid, address, num_partitions, num_replicas, your_uuid):
pass
def answerPrimaryMaster(self, conn, packet, primary_uuid,
def answerPrimary(self, conn, packet, primary_uuid,
known_master_list):
pass
......
......@@ -18,7 +18,7 @@
from neo import logging
from neo.storage.handlers import BaseStorageHandler
from neo.protocol import NodeTypes
from neo.protocol import NodeTypes, Packets
from neo import protocol
from neo.util import dump
......@@ -28,7 +28,7 @@ class IdentificationHandler(BaseStorageHandler):
def connectionLost(self, conn, new_state):
logging.warning('A connection was lost during identification')
def requestNodeIdentification(self, conn, packet, node_type,
def requestIdentification(self, conn, packet, node_type,
uuid, address, name):
self.checkClusterName(name)
# reject any incoming connections if not ready
......@@ -61,6 +61,6 @@ class IdentificationHandler(BaseStorageHandler):
args = (NodeTypes.STORAGE, app.uuid, app.server,
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
# accept the identification and trigger an event
conn.answer(protocol.acceptNodeIdentification(*args), packet.getId())
conn.answer(Packets.AcceptIdentification(*args), packet.getId())
handler.connectionCompleted(conn)
......@@ -22,8 +22,7 @@ from neo import protocol
class InitializationHandler(BaseMasterHandler):
def answerNodeInformation(self, conn, packet, node_list):
assert not node_list
def answerNodeInformation(self, conn, packet):
self.app.has_node_information = True
def notifyNodeInformation(self, conn, packet, node_list):
......
......@@ -18,7 +18,7 @@
from neo import logging
from neo import protocol
from neo.protocol import CellStates
from neo.protocol import CellStates, Packets
from neo.storage.handlers import BaseMasterHandler
from neo.exception import OperationFailure
......@@ -68,7 +68,7 @@ class MasterOperationHandler(BaseMasterHandler):
app.dm.storeTransaction(tid, object_list, t.getTransaction())
except KeyError:
pass
conn.answer(protocol.notifyInformationLocked(tid), packet.getId())
conn.answer(Packets.NotifyInformationLocked(tid), packet.getId())
def unlockInformation(self, conn, packet, tid):
app = self.app
......
......@@ -19,7 +19,7 @@
from neo import logging
from neo.storage.handlers import BaseStorageHandler
from neo import protocol
from neo.protocol import Packets
class ReplicationHandler(BaseStorageHandler):
"""This class handles events for replications."""
......@@ -36,7 +36,7 @@ class ReplicationHandler(BaseStorageHandler):
logging.error('replication is stopped due to connection failure')
self.app.replicator.reset()
def acceptNodeIdentification(self, conn, packet, node_type,
def acceptIdentification(self, conn, packet, node_type,
uuid, address, num_partitions, num_replicas, your_uuid):
# set the UUID on the connection
conn.setUUID(uuid)
......@@ -52,18 +52,18 @@ class ReplicationHandler(BaseStorageHandler):
present_tid_list = app.dm.getTIDListPresent(tid_list)
tid_set = set(tid_list) - set(present_tid_list)
for tid in tid_set:
conn.ask(protocol.askTransactionInformation(tid), timeout=300)
conn.ask(Packets.AskTransactionInformation(tid), timeout=300)
# And, ask more TIDs.
app.replicator.tid_offset += 1000
offset = app.replicator.tid_offset
p = protocol.askTIDs(offset, offset + 1000,
p = Packets.AskTIDs(offset, offset + 1000,
app.replicator.current_partition.getRID())
conn.ask(p, timeout=300)
else:
# If no more TID, a replication of transactions is finished.
# So start to replicate objects now.
p = protocol.askOIDs(0, 1000,
p = Packets.AskOIDs(0, 1000,
app.replicator.current_partition.getRID())
conn.ask(p, timeout=300)
app.replicator.oid_offset = 0
......@@ -85,7 +85,7 @@ class ReplicationHandler(BaseStorageHandler):
if oid_list:
# Pick one up, and ask the history.
oid = oid_list.pop()
conn.ask(protocol.askObjectHistory(oid, 0, 1000), timeout=300)
conn.ask(Packets.AskObjectHistory(oid, 0, 1000), timeout=300)
app.replicator.serial_offset = 0
app.replicator.oid_list = oid_list
else:
......@@ -104,12 +104,12 @@ class ReplicationHandler(BaseStorageHandler):
present_serial_list = app.dm.getSerialListPresent(oid, serial_list)
serial_set = set(serial_list) - set(present_serial_list)
for serial in serial_set:
conn.ask(protocol.askObject(oid, serial, None), timeout=300)
conn.ask(Packets.AskObject(oid, serial, None), timeout=300)
# And, ask more serials.
app.replicator.serial_offset += 1000
offset = app.replicator.serial_offset
p = protocol.askObjectHistory(oid, offset, offset + 1000)
p = Packets.AskObjectHistory(oid, offset, offset + 1000)
conn.ask(p, timeout=300)
else:
# This OID is finished. So advance to next.
......@@ -117,13 +117,13 @@ class ReplicationHandler(BaseStorageHandler):
if oid_list:
# If I have more pending OIDs, pick one up.
oid = oid_list.pop()
conn.ask(protocol.askObjectHistory(oid, 0, 1000), timeout=300)
conn.ask(Packets.AskObjectHistory(oid, 0, 1000), timeout=300)
app.replicator.serial_offset = 0
else:
# Otherwise, acquire more OIDs.
app.replicator.oid_offset += 1000
offset = app.replicator.oid_offset
p = protocol.askOIDs(offset, offset + 1000,
p = Packets.AskOIDs(offset, offset + 1000,
app.replicator.current_partition.getRID())
conn.ask(p, timeout=300)
......
......@@ -17,6 +17,7 @@
from neo import protocol
from neo.storage.handlers import BaseClientAndStorageOperationHandler
from neo.protocol import Packets
class StorageOperationHandler(BaseClientAndStorageOperationHandler):
......@@ -24,7 +25,7 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler):
app = self.app
oid = app.dm.getLastOID()
tid = app.dm.getLastTID()
p = protocol.answerLastIDs(oid, tid, app.pt.getID())
p = Packets.AnswerLastIDs(oid, tid, app.pt.getID())
conn.answer(p, packet.getId())
def askOIDs(self, conn, packet, first, last, partition):
......@@ -48,5 +49,5 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler):
partition_list = [partition]
oid_list = app.dm.getOIDList(first, last - first,
app.pt.getPartitions(), partition_list)
conn.answer(protocol.answerOIDs(oid_list), packet.getId())
conn.answer(Packets.AnswerOIDs(oid_list), packet.getId())
......@@ -19,6 +19,7 @@ from neo import logging
from neo.storage.handlers import BaseMasterHandler
from neo import protocol
from neo.protocol import Packets
from neo.util import dump
from neo.exception import OperationFailure
......@@ -29,7 +30,7 @@ class VerificationHandler(BaseMasterHandler):
app = self.app
oid = app.dm.getLastOID()
tid = app.dm.getLastTID()
p = protocol.answerLastIDs(oid, tid, app.pt.getID())
p = Packets.AnswerLastIDs(oid, tid, app.pt.getID())
conn.answer(p, packet.getId())
def askPartitionTable(self, conn, packet, offset_list):
......@@ -50,7 +51,7 @@ class VerificationHandler(BaseMasterHandler):
except IndexError:
raise protocol.ProtocolError('invalid partition table offset')
p = protocol.answerPartitionTable(app.pt.getID(), row_list)
p = Packets.AnswerPartitionTable(app.pt.getID(), row_list)
conn.answer(p, packet.getId())
def notifyPartitionChanges(self, conn, packet, ptid, cell_list):
......@@ -73,7 +74,7 @@ class VerificationHandler(BaseMasterHandler):
def askUnfinishedTransactions(self, conn, packet):
tid_list = self.app.dm.getUnfinishedTIDList()
p = protocol.answerUnfinishedTransactions(tid_list)
p = Packets.AnswerUnfinishedTransactions(tid_list)
conn.answer(p, packet.getId())
def askTransactionInformation(self, conn, packet, tid):
......@@ -82,12 +83,12 @@ class VerificationHandler(BaseMasterHandler):
if t is None:
p = protocol.tidNotFound('%s does not exist' % dump(tid))
else:
p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
p = Packets.AnswerTransactionInformation(tid, t[1], t[2], t[3], t[0])
conn.answer(p, packet.getId())
def askObjectPresent(self, conn, packet, oid, tid):
if self.app.dm.objectPresent(oid, tid):
p = protocol.answerObjectPresent(oid, tid)
p = Packets.AnswerObjectPresent(oid, tid)
else:
p = protocol.oidNotFound(
'%s:%s do not exist' % (dump(oid), dump(tid)))
......
......@@ -20,7 +20,7 @@ from random import choice
from neo.storage.handlers import replication
from neo import protocol
from neo.protocol import NodeTypes, NodeStates, CellStates
from neo.protocol import NodeTypes, NodeStates, CellStates, Packets
from neo.connection import ClientConnection
from neo.util import dump
......@@ -130,7 +130,7 @@ class Replicator(object):
def _askCriticalTID(self):
conn = self.primary_master_connection
msg_id = conn.ask(protocol.askLastIDs())
msg_id = conn.ask(Packets.AskLastIDs())
self.critical_tid_dict[msg_id] = self.new_partition_dict.values()
self.partition_dict.update(self.new_partition_dict)
self.new_partition_dict = {}
......@@ -144,7 +144,7 @@ class Replicator(object):
def _askUnfinishedTIDs(self):
conn = self.primary_master_connection
conn.ask(protocol.askUnfinishedTransactions())
conn.ask(Packets.AskUnfinishedTransactions())
self.waiting_for_unfinished_tids = True
def _startReplication(self):
......@@ -179,12 +179,12 @@ class Replicator(object):
self.current_connection = ClientConnection(app.em, handler,
addr = addr,
connector_handler = app.connector_handler)
p = protocol.requestNodeIdentification(NodeTypes.STORAGE,
p = Packets.RequestIdentification(NodeTypes.STORAGE,
app.uuid, app.server, app.name)
self.current_connection.ask(p)
self.tid_offset = 0
p = protocol.askTIDs(0, 1000, self.current_partition.getRID())
p = Packets.AskTIDs(0, 1000, self.current_partition.getRID())
self.current_connection.ask(p, timeout=300)
self.replication_done = False
......@@ -195,7 +195,7 @@ class Replicator(object):
self.partition_dict.pop(self.current_partition.getRID())
# Notify to a primary master node that my cell is now up-to-date.
conn = self.primary_master_connection
p = protocol.notifyPartitionChanges(app.pt.getID(),
p = Packets.NotifyPartitionChanges(app.pt.getID(),
[(self.current_partition.getRID(), app.uuid, CellStates.UP_TO_DATE)])
conn.notify(p)
except KeyError:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment