diff --git a/neo/admin/app.py b/neo/admin/app.py index d6e347a3f3631e8541b1936f6536d8c0adf81502..257383668c6735da2faa5e82a1946aa100aeed2d 100644 --- a/neo/admin/app.py +++ b/neo/admin/app.py @@ -22,8 +22,8 @@ from neo.node import NodeManager, MasterNode from neo.event import EventManager from neo.connection import ListeningConnection from neo.exception import PrimaryFailure -from neo.admin.handler import MasterMonitoringEventHandler, AdminEventHandler, \ - MasterEventHandler, MasterRequestEventHandler +from neo.admin.handler import AdminEventHandler, MasterEventHandler, \ + MasterRequestEventHandler from neo.connector import getConnectorHandler from neo.bootstrap import BootstrapManager from neo.pt import PartitionTable @@ -72,7 +72,6 @@ class Application(object): self.uuid = uuid self.primary_master_node = None self.ptid = None - self.monitoring_handler = MasterMonitoringEventHandler(self) self.request_handler = MasterRequestEventHandler(self) self.master_event_handler = MasterEventHandler(self) self.dispatcher = Dispatcher() diff --git a/neo/admin/handler.py b/neo/admin/handler.py index 1d9dcdb3720367f1e139791e54ba22c12bf7ec50..9edd9e804d7fed2468e087905b1255b3b2a5e883 100644 --- a/neo/admin/handler.py +++ b/neo/admin/handler.py @@ -152,31 +152,50 @@ class MasterEventHandler(EventHandler): self._connectionLost(conn) def dispatch(self, conn, packet): - if not packet.isResponse(): - # not an answer - self.app.monitoring_handler.dispatch(conn, packet) - elif self.app.dispatcher.registered(packet.getId()): + if packet.isResponse() and \ + self.app.dispatcher.registered(packet.getId()): # expected answer self.app.request_handler.dispatch(conn, packet) else: - # unexpectexd answer, this should be answerNodeInformation or - # answerPartitionTable from the master node during initialization. - # This will no more exists when the initialization module will be - # implemented for factorize code (as done for bootstrap) - EventHandler.dispatch(self, conn, packet) + # unexpectexd answers and notifications + super(MasterEventHandler, self).dispatch(conn, packet) def handleAnswerNodeInformation(self, conn, packet, node_list): + # XXX: This will no more exists when the initialization module will be + # implemented for factorize code (as done for bootstrap) logging.debug("handleAnswerNodeInformation") def handleAnswerPartitionTable(self, conn, packet, ptid, row_list): + # XXX: This will no more exists when the initialization module will be + # implemented for factorize code (as done for bootstrap) logging.debug("handleAnswerPartitionTable") - def handleNotifyClusterInformation(self, conn, packet, cluster_state): - logging.debug("handleNotifyClusterInformation") - + def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): + app = self.app + if ptid < app.ptid: + # Ignore this packet. + return + app.ptid = ptid + app.pt.update(ptid, cell_list, app.nm) -class MasterBaseEventHandler(EventHandler): - """ This is the base class for connection to primary master node""" + def handleSendPartitionTable(self, conn, packet, ptid, row_list): + uuid = conn.getUUID() + app = self.app + nm = app.nm + pt = app.pt + node = app.nm.getNodeByUUID(uuid) + if app.ptid != ptid: + app.ptid = ptid + pt.clear() + for offset, row in row_list: + for uuid, state in row: + node = nm.getNodeByUUID(uuid) + if node is None: + node = StorageNode(uuid = uuid) + node.setState(protocol.TEMPORARILY_DOWN_STATE) + nm.add(node) + pt.setCell(offset, node, state) + pt.log() def handleNotifyClusterInformation(self, conn, packet, cluster_state): self.app.cluster_state = cluster_state @@ -190,8 +209,7 @@ class MasterBaseEventHandler(EventHandler): # possible (ignore TEMPORARILY_DOWN for example) conn.ask(protocol.askPartitionTable([])) - -class MasterRequestEventHandler(MasterBaseEventHandler): +class MasterRequestEventHandler(EventHandler): """ This class handle all answer from primary master node""" def __answerNeoCTL(self, msg_id, packet): @@ -225,34 +243,3 @@ class MasterRequestEventHandler(MasterBaseEventHandler): def handleProtocolError(self, conn, packet, msg): self.__answerNeoCTL(packet.getId(), protocol.protocolError(msg)) - -class MasterMonitoringEventHandler(MasterBaseEventHandler): - """This class deals with events for monitoring cluster.""" - - def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): - app = self.app - if ptid < app.ptid: - # Ignore this packet. - return - app.ptid = ptid - app.pt.update(ptid, cell_list, app.nm) - - def handleSendPartitionTable(self, conn, packet, ptid, row_list): - uuid = conn.getUUID() - app = self.app - nm = app.nm - pt = app.pt - node = app.nm.getNodeByUUID(uuid) - if app.ptid != ptid: - app.ptid = ptid - pt.clear() - for offset, row in row_list: - for uuid, state in row: - node = nm.getNodeByUUID(uuid) - if node is None: - node = StorageNode(uuid = uuid) - node.setState(protocol.TEMPORARILY_DOWN_STATE) - nm.add(node) - pt.setCell(offset, node, state) - pt.log() -