From 4f2ef12a49826ac9e4b803d03fb636cfe4f76c2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Calonne?= <aurel@nexedi.com> Date: Fri, 26 Jun 2009 12:32:03 +0000 Subject: [PATCH] remove all polling from handler pass msg_id to register so that we can answer to neoctl request with right id git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@692 71dcc9de-d417-0410-9af5-da40c76e7ee4 --- neo/admin/app.py | 8 ++++---- neo/admin/handler.py | 46 +++++++++++++++++++------------------------- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/neo/admin/app.py b/neo/admin/app.py index bbf4b282..38bcb669 100644 --- a/neo/admin/app.py +++ b/neo/admin/app.py @@ -41,8 +41,8 @@ class Dispatcher: # message to connection self.message_table = {} - def register(self, msg_id, conn): - self.message_table[msg_id] = conn + def register(self, msg_id, conn, kw=None): + self.message_table[msg_id] = conn, kw def retrieve(self, msg_id): return self.message_table.pop(msg_id, None) @@ -172,7 +172,7 @@ class Application(object): connector_handler = self.connector_handler) t = time() - def sendPartitionTable(self, conn, min_offset, max_offset, uuid): + def sendPartitionTable(self, conn, min_offset, max_offset, uuid, msg_id): # we have a pt self.pt.log() row_list = [] @@ -196,4 +196,4 @@ class Application(object): return print "sending packet", len(row_list) p = protocol.answerPartitionList(self.ptid, row_list) - conn.notify(p) + conn.notify(p, msg_id) diff --git a/neo/admin/handler.py b/neo/admin/handler.py index 3c525bbf..614f8760 100644 --- a/neo/admin/handler.py +++ b/neo/admin/handler.py @@ -52,13 +52,14 @@ class AdminEventHandler(BaseEventHandler): # check we have one pt otherwise ask it to PMN if len(app.pt.getNodeList()) == 0: master_conn = self.app.master_conn - p = protocol.askPartitionTable([x for x in xrange(app.num_partitions)]) + p = protocol.askPartitionTable([]) msg_id = master_conn.ask(p) app.dispatcher.register(msg_id, conn, {'min_offset' : min_offset, 'max_offset' : max_offset, - 'uuid' : uuid}) + 'uuid' : uuid, + 'msg_id' : packet.getId()}) else: - app.sendPartitionTable(conn, min_offset, max_offset, uuid) + app.sendPartitionTable(conn, min_offset, max_offset, uuid, packet.getId()) def handleAskNodeList(self, conn, packet, node_type): @@ -92,13 +93,8 @@ class AdminEventHandler(BaseEventHandler): # forward to primary master node master_conn = self.app.master_conn p = protocol.setNodeState(uuid, state, modify_partition_table) - master_conn.ask(p) - self.app.notified = False - while not self.app.notified: - self.app.em.poll(1) - node = self.app.nm.getNodeByUUID(uuid) - p = protocol.answerNodeState(node.getUUID(), node.getState()) - conn.answer(p, packet) + msg_id = master_conn.ask(p) + self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()}) def handleSetClusterState(self, conn, packet, name, state): self.checkClusterName(name) @@ -106,7 +102,7 @@ class AdminEventHandler(BaseEventHandler): master_conn = self.app.master_conn p = protocol.setClusterState(name, state) msg_id = master_conn.ask(p) - self.app.dispatcher.register(msg_id, conn) + self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()}) def handleAddPendingNodes(self, conn, packet, uuid_list): uuids = ', '.join([dump(uuid) for uuid in uuid_list]) @@ -115,14 +111,8 @@ class AdminEventHandler(BaseEventHandler): node = self.app.nm.getNodeByUUID(uuid) # forward the request to primary master_conn = self.app.master_conn - master_conn.ask(protocol.addPendingNodes(uuid_list)) - self.app.nn_notified = False - while not self.app.nn_notified: - self.app.em.poll(1) - # forward the answer to neoctl - uuid_list = self.app.uuid_list - - + msg_id = master_conn.ask(protocol.addPendingNodes(uuid_list)) + self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()}) class MasterEventHandler(BaseEventHandler): """ This class is just used to dispacth message to right handler""" @@ -283,27 +273,31 @@ class MasterBaseEventHandler(BaseEventHandler): self.app.notified = True - class MasterRequestEventHandler(MasterBaseEventHandler): """ This class handle all answer from primary master node""" def handleAnswerClusterState(self, conn, packet, state): logging.info("handleAnswerClusterState for a conn") - client_conn, kw = self.app.retrieve(packet.getId()) - conn.answer(protocol.answerClusterState(state), packet) + client_conn, kw = self.app.dispatcher.retrieve(packet.getId()) + client_conn.notify(protocol.answerClusterState(state), kw['msg_id']) def handleAnswerNewNodes(self, conn, packet, uuid_list): logging.info("handleAnswerNewNodes for a conn") - client_conn, kw = self.app.retrieve(packet.getId()) - conn.answer(protocol.answerNewNodes(uuid_list), packet) + client_conn, kw = self.app.dispatcher.retrieve(packet.getId()) + client_conn.notify(protocol.answerNewNodes(uuid_list), kw['msg_id']) @decorators.identification_required def handleAnswerPartitionTable(self, conn, packet, ptid, row_list): logging.info("handleAnswerPartitionTable for a conn") - client_conn, kw = self.app.retrieve(packet.getId()) + client_conn, kw = self.app.dispatcher.retrieve(packet.getId()) # sent client the partition table self.app.sendPartitionTable(client_conn, **kw) - + + def handleAnswerNodeState(self, conn, packet, uuid, state): + client_conn, kw = self.app.dispatcher.retrieve(packet.getId()) + p = protocol.answerNodeState(uuid, state) + client_conn.notify(p, kw['msg_id']) + class MasterBootstrapEventHandler(MasterBaseEventHandler): """This class manage the bootstrap part to the primary master node""" -- 2.30.9