Commit 61529ea6 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Client node use update() from node manager.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@941 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent fcd9fabc
...@@ -228,70 +228,28 @@ class PrimaryNotificationsHandler(BaseHandler): ...@@ -228,70 +228,28 @@ class PrimaryNotificationsHandler(BaseHandler):
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app app = self.app
nm = app.nm nm = app.nm
self.app.nm.update(node_list)
for node_type, addr, uuid, state in node_list: for node_type, addr, uuid, state in node_list:
logging.debug("notified of %s %s %s %s" %(node_type, addr, dump(uuid), state)) if node_type != STORAGE_NODE_TYPE or state != RUNNING_STATE:
# Register new nodes.
# Try to retrieve it from nm
n = None
if uuid is not None:
n = nm.getNodeByUUID(uuid)
if n is None:
n = nm.getNodeByServer(addr)
if n is not None and uuid is not None:
# node only exists by address, remove it
nm.remove(n)
n = None
elif n.getServer() != addr:
# same uuid but different address, remove it
nm.remove(n)
n = None
if node_type == MASTER_NODE_TYPE:
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid is not None:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid is None:
# No interest.
continue
if n is None:
n = StorageNode(server = addr, uuid = uuid)
nm.add(n)
elif node_type == CLIENT_NODE_TYPE:
continue continue
# close connection to this storage if no longer running
n.setState(state) closed = False
# close connection to this node if no longer running conn = self.app.em.getConnectionByUUID(uuid)
if node_type in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE) and \ if conn is not None:
state != RUNNING_STATE:
can_close = False
for conn in self.app.em.getConnectionList():
if conn.getUUID() == n.getUUID():
conn.lock() conn.lock()
try: try:
conn.close() conn.close()
finally: finally:
conn.release() conn.release()
can_close = True closed = True
break if closed and node_type == STORAGE_NODE_TYPE:
if can_close and node_type == STORAGE_NODE_TYPE:
# Remove from pool connection # Remove from pool connection
app.cp.removeConnection(n) app.cp.removeConnection(n)
# Put fake packets to task queues. # Put fake packets to task queues.
queue_set = set() # XXX: this should be done in MTClientConnection
for key in self.dispatcher.message_table.keys(): for key in self.dispatcher.message_table.keys():
if id(conn) == key[0]: if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key) queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
# Storage failure is notified to the primary master when the fake
# packet if popped by a non-polling thread.
for queue in queue_set:
queue.put((conn, None)) queue.put((conn, None))
class PrimaryAnswersHandler(AnswerBaseHandler): class PrimaryAnswersHandler(AnswerBaseHandler):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment