Commit eeaa93d9 authored by Vincent Pelletier's avatar Vincent Pelletier

Make MTClientConnection.ask register message to dispatcher. This makes sure...

Make MTClientConnection.ask register message to dispatcher. This makes sure registration happens before the message has a chance of being sent.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@807 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 81f4281e
...@@ -74,7 +74,8 @@ class ConnectionPool(object): ...@@ -74,7 +74,8 @@ class ConnectionPool(object):
logging.info('trying to connect to %s - %s', node, node.getState()) logging.info('trying to connect to %s - %s', node, node.getState())
app.setNodeReady() app.setNodeReady()
conn = MTClientConnection(app.em, app.storage_event_handler, addr, conn = MTClientConnection(app.em, app.storage_event_handler, addr,
connector_handler=app.connector_handler) connector_handler=app.connector_handler,
dispatcher=app.dispatcher)
conn.lock() conn.lock()
try: try:
if conn.getConnector() is None: if conn.getConnector() is None:
...@@ -84,8 +85,7 @@ class ConnectionPool(object): ...@@ -84,8 +85,7 @@ class ConnectionPool(object):
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
app.uuid, addr[0], addr[1], app.name) app.uuid, addr[0], addr[1], app.name)
msg_id = conn.ask(p) msg_id = conn.ask(app.local_var.queue, p)
app.dispatcher.register(conn, msg_id, app.local_var.queue)
finally: finally:
conn.unlock() conn.unlock()
...@@ -341,8 +341,8 @@ class Application(object): ...@@ -341,8 +341,8 @@ class Application(object):
def _askStorage(self, conn, packet, timeout=5, additional_timeout=30): def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
""" Send a request to a storage node and process it's answer """ """ Send a request to a storage node and process it's answer """
try: try:
msg_id = conn.ask(packet, timeout, additional_timeout) msg_id = conn.ask(self.local_var.queue, packet, timeout,
self.dispatcher.register(conn, msg_id, self.local_var.queue) additional_timeout)
finally: finally:
# assume that the connection was already locked # assume that the connection was already locked
conn.unlock() conn.unlock()
...@@ -353,8 +353,8 @@ class Application(object): ...@@ -353,8 +353,8 @@ class Application(object):
conn = self._getMasterConnection() conn = self._getMasterConnection()
conn.lock() conn.lock()
try: try:
msg_id = conn.ask(packet, timeout, additional_timeout) msg_id = conn.ask(self.local_var.queue, packet, timeout,
self.dispatcher.register(conn, msg_id, self.local_var.queue) additional_timeout)
finally: finally:
conn.unlock() conn.unlock()
self._waitMessage(conn, msg_id, self.primary_handler) self._waitMessage(conn, msg_id, self.primary_handler)
...@@ -407,7 +407,8 @@ class Application(object): ...@@ -407,7 +407,8 @@ class Application(object):
# Connect to master # Connect to master
conn = MTClientConnection(self.em, self.notifications_handler, conn = MTClientConnection(self.em, self.notifications_handler,
addr=self.trying_master_node.getServer(), addr=self.trying_master_node.getServer(),
connector_handler=self.connector_handler) connector_handler=self.connector_handler,
dispatcher=self.dispatcher)
# Query for primary master node # Query for primary master node
conn.lock() conn.lock()
try: try:
...@@ -416,8 +417,7 @@ class Application(object): ...@@ -416,8 +417,7 @@ class Application(object):
logging.error('Connection to master node %s failed', logging.error('Connection to master node %s failed',
self.trying_master_node) self.trying_master_node)
continue continue
msg_id = conn.ask(protocol.askPrimaryMaster()) msg_id = conn.ask(self.local_var.queue, protocol.askPrimaryMaster())
self.dispatcher.register(conn, msg_id, self.local_var.queue)
finally: finally:
conn.unlock() conn.unlock()
try: try:
...@@ -440,8 +440,7 @@ class Application(object): ...@@ -440,8 +440,7 @@ class Application(object):
break break
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
self.uuid, '0.0.0.0', 0, self.name) self.uuid, '0.0.0.0', 0, self.name)
msg_id = conn.ask(p) msg_id = conn.ask(self.local_var.queue, p)
self.dispatcher.register(conn, msg_id, self.local_var.queue)
finally: finally:
conn.unlock() conn.unlock()
try: try:
...@@ -467,15 +466,15 @@ class Application(object): ...@@ -467,15 +466,15 @@ class Application(object):
# wait on one message at a time # wait on one message at a time
conn.lock() conn.lock()
try: try:
msg_id = conn.ask(protocol.askPartitionTable([])) msg_id = conn.ask(self.local_var.queue,
self.dispatcher.register(conn, msg_id, self.local_var.queue) protocol.askPartitionTable([]))
finally: finally:
conn.unlock() conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
conn.lock() conn.lock()
try: try:
msg_id = conn.ask(protocol.askNodeInformation()) msg_id = conn.ask(self.local_var.queue,
self.dispatcher.register(conn, msg_id, self.local_var.queue) protocol.askNodeInformation())
finally: finally:
conn.unlock() conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
...@@ -882,9 +881,8 @@ class Application(object): ...@@ -882,9 +881,8 @@ class Application(object):
continue continue
try: try:
p = protocol.askTIDs(first, last, INVALID_PARTITION) conn.ask(self.local_var.queue,
msg_id = conn.ask(p) protocol.askTIDs(first, last, INVALID_PARTITION))
self.dispatcher.register(conn, msg_id, self.local_var.queue)
finally: finally:
conn.unlock() conn.unlock()
......
...@@ -468,6 +468,7 @@ class MTClientConnection(ClientConnection): ...@@ -468,6 +468,7 @@ class MTClientConnection(ClientConnection):
self._lock = lock = RLock() self._lock = lock = RLock()
self.acquire = lock.acquire self.acquire = lock.acquire
self.release = lock.release self.release = lock.release
self.dispatcher = kwargs.pop('dispatcher')
self.lock() self.lock()
try: try:
super(MTClientConnection, self).__init__(*args, **kwargs) super(MTClientConnection, self).__init__(*args, **kwargs)
...@@ -501,8 +502,13 @@ class MTClientConnection(ClientConnection): ...@@ -501,8 +502,13 @@ class MTClientConnection(ClientConnection):
return super(MTClientConnection, self).notify(*args, **kw) return super(MTClientConnection, self).notify(*args, **kw)
@lockCheckWrapper @lockCheckWrapper
def ask(self, *args, **kw): def ask(self, queue, packet, timeout=5, additional_timeout=30):
return super(MTClientConnection, self).ask(*args, **kw) msg_id = self._getNextId()
packet.setId(msg_id)
self.dispatcher.register(self, msg_id, queue)
self.expectMessage(msg_id)
self._addPacket(packet)
return msg_id
@lockCheckWrapper @lockCheckWrapper
def answer(self, *args, **kw): def answer(self, *args, **kw):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment