Commit 6ba61bca authored by Grégory Wisniewski's avatar Grégory Wisniewski

Use a dedicated handler for a secondary master peer. Be carefull because old

'SecondaryEventHandler', used in the secondary role, is now renamed as
PrimaryMasterEventHandler since a secondary is only connected to the primary. On
the other side, the primary apply the SecondaryMasterEventHandler on it's master
connections.
Factorize code in connection(Closed|Timeout)/peerBroken and avoid some checks of 
conditions that can be assume in specialized handlers.
Clean imports and remove some instance checks.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@691 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 2a9f2764
...@@ -38,7 +38,7 @@ from neo.master.election import ElectionEventHandler ...@@ -38,7 +38,7 @@ from neo.master.election import ElectionEventHandler
from neo.master.recovery import RecoveryEventHandler from neo.master.recovery import RecoveryEventHandler
from neo.master.verification import VerificationEventHandler from neo.master.verification import VerificationEventHandler
from neo.master.service import ClientServiceEventHandler, StorageServiceEventHandler from neo.master.service import ClientServiceEventHandler, StorageServiceEventHandler
from neo.master.secondary import SecondaryEventHandler from neo.master.secondary import PrimaryMasterEventHandler, SecondaryMasterEventHandler
from neo.master.pt import PartitionTable from neo.master.pt import PartitionTable
from neo.util import dump from neo.util import dump
from neo.connector import getConnectorHandler from neo.connector import getConnectorHandler
...@@ -704,7 +704,7 @@ class Application(object): ...@@ -704,7 +704,7 @@ class Application(object):
logging.info('play the secondary role with %s (%s:%d)', logging.info('play the secondary role with %s (%s:%d)',
dump(self.uuid), *(self.server)) dump(self.uuid), *(self.server))
handler = SecondaryEventHandler(self) handler = PrimaryMasterEventHandler(self)
em = self.em em = self.em
nm = self.nm nm = self.nm
...@@ -892,7 +892,7 @@ class Application(object): ...@@ -892,7 +892,7 @@ class Application(object):
elif node_type == protocol.MASTER_NODE_TYPE: elif node_type == protocol.MASTER_NODE_TYPE:
# always put other master in waiting state # always put other master in waiting state
klass = MasterNode klass = MasterNode
# FIXME: Apply a dedicated handler handler = SecondaryMasterEventHandler
logging.info('Accept a master %s' % dump(uuid)) logging.info('Accept a master %s' % dump(uuid))
elif node_type == protocol.CLIENT_NODE_TYPE: elif node_type == protocol.CLIENT_NODE_TYPE:
# refuse any client before running # refuse any client before running
......
...@@ -18,9 +18,7 @@ ...@@ -18,9 +18,7 @@
import logging import logging
from neo import protocol from neo import protocol
from neo.node import AdminNode, MasterNode, ClientNode, StorageNode
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo import decorators
class IdentificationEventHandler(MasterEventHandler): class IdentificationEventHandler(MasterEventHandler):
"""This class deals with messages from the admin node only""" """This class deals with messages from the admin node only"""
...@@ -34,20 +32,6 @@ class IdentificationEventHandler(MasterEventHandler): ...@@ -34,20 +32,6 @@ class IdentificationEventHandler(MasterEventHandler):
def peerBroken(self, conn): def peerBroken(self, conn):
logging.warning('lost a node in IdentificationEventHandler') logging.warning('lost a node in IdentificationEventHandler')
# TODO: move this into a new handler
@decorators.identification_required
def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
def handleNotifyNodeInformation(self, conn, packet, node_list):
# XXX: Secondary master can send this packet
logging.error('ignoring NotifyNodeInformation packet')
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
...@@ -66,29 +50,30 @@ class IdentificationEventHandler(MasterEventHandler): ...@@ -66,29 +50,30 @@ class IdentificationEventHandler(MasterEventHandler):
if cell_list: if cell_list:
ptid = app.pt.setNextID() ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list) app.broadcastPartitionChanges(ptid, cell_list)
# TODO: check this, avoid copy()
# set it to down state # set it to down state
node.setState(protocol.DOWN_STATE) node.setState(protocol.DOWN_STATE)
app.broadcastNodeInformation(node) app.broadcastNodeInformation(node)
nm.remove(node)
# then update the address and set it to running state # then update the address and set it to running state
node = copy(node)
node.setServer(server) node.setServer(server)
node.setState(protocol.RUNNING_STATE) node.setState(protocol.RUNNING_STATE)
nm.add(node)
return node return node
# handle conflicts and broken nodes # handle conflicts and broken nodes
node = node_by_uuid or node_by_addr node = node_by_uuid or node_by_addr
if node_by_uuid is not None: if node_by_uuid is not None:
if node.getServer() == server and node.getState() == protocol.BROKEN_STATE: if node.getServer() == server:
if node.getState() == protocol.BROKEN_STATE:
raise protocol.BrokenNodeDisallowedError raise protocol.BrokenNodeDisallowedError
# the node is still alive
node.setState(protocol.RUNNING_STATE)
if node.getServer() != server: if node.getServer() != server:
if node.getState() == protocol.RUNNING_STATE: if node.getState() == protocol.RUNNING_STATE:
# still running, reject this new node
raise protocol.ProtocolError('invalid server address') raise protocol.ProtocolError('invalid server address')
node = changeNodeAddress(node, server) node = changeNodeAddress(node, server)
if node_by_uuid is None and node_by_addr is not None: if node_by_uuid is None and node_by_addr is not None:
if node.getState() == protocol.RUNNING_STATE: if node.getState() == protocol.RUNNING_STATE:
# still running, reject this new node
raise protocol.ProtocolError('invalid server address') raise protocol.ProtocolError('invalid server address')
node = changeNodeAddress(node, server) node = changeNodeAddress(node, server)
......
...@@ -22,35 +22,49 @@ from neo.protocol import MASTER_NODE_TYPE, \ ...@@ -22,35 +22,49 @@ from neo.protocol import MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
DOWN_STATE, ADMIN_NODE_TYPE DOWN_STATE, ADMIN_NODE_TYPE
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.connection import ClientConnection
from neo.exception import ElectionFailure, PrimaryFailure from neo.exception import ElectionFailure, PrimaryFailure
from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.node import MasterNode from neo.node import MasterNode
from neo import decorators
class SecondaryEventHandler(MasterEventHandler): class SecondaryMasterEventHandler(MasterEventHandler):
"""This class deals with events for a secondary master.""" """ Handler used by primary to handle secondary masters"""
def connectionCompleted(self, conn):
pass
def handleAnnouncePrimaryMaster(self, conn, packet):
raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
def handleNotifyNodeInformation(self, conn, packet, node_list):
logging.error('/!\ NotifyNodeInformation packet from secondary master')
class PrimaryMasterEventHandler(MasterEventHandler):
""" Handler used by secondaries to handle primary master"""
def connectionClosed(self, conn): def connectionClosed(self, conn):
if isinstance(conn, ClientConnection): if not conn.isServerConnection():
self.app.primary_master_node.setState(DOWN_STATE) self.app.primary_master_node.setState(DOWN_STATE)
raise PrimaryFailure, 'primary master is dead' raise PrimaryFailure, 'primary master is dead'
MasterEventHandler.connectionClosed(self, conn) MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
if isinstance(conn, ClientConnection): if not conn.isServerConnection():
self.app.primary_master_node.setState(DOWN_STATE) self.app.primary_master_node.setState(DOWN_STATE)
raise PrimaryFailure, 'primary master is down' raise PrimaryFailure, 'primary master is down'
MasterEventHandler.timeoutExpired(self, conn) MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn): def peerBroken(self, conn):
if isinstance(conn, ClientConnection): if not conn.isServerConnection():
self.app.primary_master_node.setState(DOWN_STATE) self.app.primary_master_node.setState(DOWN_STATE)
raise PrimaryFailure, 'primary master is crazy' raise PrimaryFailure, 'primary master is crazy'
MasterEventHandler.peerBroken(self, conn) MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet): def packetReceived(self, conn, packet):
if isinstance(conn, ClientConnection): if not conn.isServerConnection():
node = self.app.nm.getNodeByServer(conn.getAddress()) node = self.app.nm.getNodeByServer(conn.getAddress())
if node.getState() != BROKEN_STATE: if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE) node.setState(RUNNING_STATE)
......
...@@ -16,73 +16,40 @@ ...@@ -16,73 +16,40 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging import logging
from copy import copy
from neo import protocol from neo import protocol
from neo.protocol import CLIENT_NODE_TYPE, \ from neo.protocol import CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \ UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \
STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \ STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \
HIDDEN_STATE, PENDING_STATE HIDDEN_STATE, PENDING_STATE, INVALID_UUID
from neo.master.handler import MasterEventHandler from neo.master.handler import MasterEventHandler
from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID from neo.protocol import UnexpectedPacketError
from neo.exception import OperationFailure, ElectionFailure from neo.exception import OperationFailure
from neo.node import ClientNode, StorageNode, MasterNode, AdminNode
from neo.util import dump from neo.util import dump
from neo.master import ENABLE_PENDING_NODES
class ServiceEventHandler(MasterEventHandler): class ServiceEventHandler(MasterEventHandler):
"""This class deals with events for a service phase.""" """This class deals with events for a service phase."""
def _dealWithNodeFailure(self, conn, new_state): def _dropIt(self, conn, new_state):
uuid = conn.getUUID() raise RuntimeError('rhis method must be overriden')
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node is not None and node.getState() == RUNNING_STATE:
node.setState(new_state)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
if node.getNodeType() == CLIENT_NODE_TYPE:
# If this node is a client, just forget it.
app.nm.remove(node)
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
elif node.getNodeType() == STORAGE_NODE_TYPE:
if not app.pt.operational():
# Catastrophic.
raise OperationFailure, 'cannot continue operation'
def connectionClosed(self, conn): def connectionClosed(self, conn):
self._dealWithNodeFailure(conn, TEMPORARILY_DOWN_STATE) node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() == RUNNING_STATE:
self._dropIt(conn, node, TEMPORARILY_DOWN_STATE)
MasterEventHandler.connectionClosed(self, conn) MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
self._dealWithNodeFailure(conn, TEMPORARILY_DOWN_STATE) node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() == RUNNING_STATE:
self._dropIt(conn, node, TEMPORARILY_DOWN_STATE)
MasterEventHandler.timeoutExpired(self, conn) MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn): def peerBroken(self, conn):
uuid = conn.getUUID() node = self.app.nm.getNodeByUUID(conn.getUUID())
app = self.app if node.getState() != BROKEN_STATE:
node = app.nm.getNodeByUUID(uuid) self._dropIt(conn, node, BROKEN_STATE)
if node is not None and node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
if node.getNodeType() == CLIENT_NODE_TYPE:
# If this node is a client, just forget it.
app.nm.remove(node)
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
elif node.getNodeType() == STORAGE_NODE_TYPE:
cell_list = app.pt.dropNode(node)
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
if not app.pt.operational():
# Catastrophic.
raise OperationFailure, 'cannot continue operation'
MasterEventHandler.peerBroken(self, conn) MasterEventHandler.peerBroken(self, conn)
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
...@@ -201,10 +168,20 @@ class FinishingTransaction(object): ...@@ -201,10 +168,20 @@ class FinishingTransaction(object):
class ClientServiceEventHandler(ServiceEventHandler): class ClientServiceEventHandler(ServiceEventHandler):
""" Handler dedicated to client during service state """
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
pass pass
def _dropIt(self, node, new_state):
app = self.app
node.setState(new_state)
app.broadcastNodeInformation(node)
app.nm.remove(node)
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
def handleAbortTransaction(self, conn, packet, tid): def handleAbortTransaction(self, conn, packet, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
node = self.app.nm.getNodeByUUID(uuid) node = self.app.nm.getNodeByUUID(uuid)
...@@ -270,12 +247,38 @@ class ClientServiceEventHandler(ServiceEventHandler): ...@@ -270,12 +247,38 @@ class ClientServiceEventHandler(ServiceEventHandler):
class StorageServiceEventHandler(ServiceEventHandler): class StorageServiceEventHandler(ServiceEventHandler):
""" Handler dedicated to storages during service state """
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID()) node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
conn.notify(protocol.startOperation()) conn.notify(protocol.startOperation())
def _dropIt(self, conn, node, new_state):
app = self.app
node.setState(new_state)
app.broadcastNodeInformation(node)
cell_list = app.pt.dropNode(node)
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
if not app.pt.operational():
raise OperationFailure, 'cannot continue operation'
def connectionClosed(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() == RUNNING_STATE:
self._dropIt(conn, node, TEMPORARILY_DOWN_STATE)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() == RUNNING_STATE:
self._dropIt(conn, node, TEMPORARILY_DOWN_STATE)
def peerBroken(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() != BROKEN_STATE:
self._dropIt(conn, node, BROKEN_STATE)
def handleNotifyInformationLocked(self, conn, packet, tid): def handleNotifyInformationLocked(self, conn, packet, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
...@@ -369,3 +372,4 @@ class StorageServiceEventHandler(ServiceEventHandler): ...@@ -369,3 +372,4 @@ class StorageServiceEventHandler(ServiceEventHandler):
ptid = app.pt.setNextID() ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, new_cell_list) app.broadcastPartitionChanges(ptid, new_cell_list)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment