Commit 669eb3ac authored by Grégory Wisniewski's avatar Grégory Wisniewski

Remove the 'queue' parameter in MTClientConnection.ask by giving the thread safe

data manager at MTClientConnection's constructor. This allows keep the same
method signature for ask() and to not crash when the client try to send a ping.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@864 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 21c09ad8
...@@ -73,9 +73,8 @@ class ConnectionPool(object): ...@@ -73,9 +73,8 @@ class ConnectionPool(object):
while True: while True:
logging.info('trying to connect to %s - %s', node, node.getState()) logging.info('trying to connect to %s - %s', node, node.getState())
app.setNodeReady() app.setNodeReady()
conn = MTClientConnection(app.em, app.storage_event_handler, addr, conn = MTClientConnection(self.app.local_var, app.em, app.storage_event_handler,
connector_handler=app.connector_handler, addr, connector_handler=app.connector_handler, dispatcher=app.dispatcher)
dispatcher=app.dispatcher)
conn.lock() conn.lock()
try: try:
if conn.getConnector() is None: if conn.getConnector() is None:
...@@ -85,7 +84,7 @@ class ConnectionPool(object): ...@@ -85,7 +84,7 @@ class ConnectionPool(object):
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
app.uuid, '0.0.0.0', 0, app.name) app.uuid, '0.0.0.0', 0, app.name)
msg_id = conn.ask(app.local_var.queue, p) msg_id = conn.ask(p)
finally: finally:
conn.unlock() conn.unlock()
...@@ -341,8 +340,7 @@ class Application(object): ...@@ -341,8 +340,7 @@ class Application(object):
def _askStorage(self, conn, packet, timeout=5, additional_timeout=30): def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
""" Send a request to a storage node and process it's answer """ """ Send a request to a storage node and process it's answer """
try: try:
msg_id = conn.ask(self.local_var.queue, packet, timeout, msg_id = conn.ask(packet, timeout, additional_timeout)
additional_timeout)
finally: finally:
# assume that the connection was already locked # assume that the connection was already locked
conn.unlock() conn.unlock()
...@@ -353,8 +351,7 @@ class Application(object): ...@@ -353,8 +351,7 @@ class Application(object):
conn = self._getMasterConnection() conn = self._getMasterConnection()
conn.lock() conn.lock()
try: try:
msg_id = conn.ask(self.local_var.queue, packet, timeout, msg_id = conn.ask(packet, timeout, additional_timeout)
additional_timeout)
finally: finally:
conn.unlock() conn.unlock()
self._waitMessage(conn, msg_id, self.primary_handler) self._waitMessage(conn, msg_id, self.primary_handler)
...@@ -405,7 +402,7 @@ class Application(object): ...@@ -405,7 +402,7 @@ class Application(object):
self.trying_master_node = master_list[0] self.trying_master_node = master_list[0]
index += 1 index += 1
# Connect to master # Connect to master
conn = MTClientConnection(self.em, self.notifications_handler, conn = MTClientConnection(self.local_var, self.em, self.notifications_handler,
addr=self.trying_master_node.getServer(), addr=self.trying_master_node.getServer(),
connector_handler=self.connector_handler, connector_handler=self.connector_handler,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
...@@ -417,7 +414,7 @@ class Application(object): ...@@ -417,7 +414,7 @@ class Application(object):
logging.error('Connection to master node %s failed', logging.error('Connection to master node %s failed',
self.trying_master_node) self.trying_master_node)
continue continue
msg_id = conn.ask(self.local_var.queue, protocol.askPrimaryMaster()) msg_id = conn.ask(protocol.askPrimaryMaster())
finally: finally:
conn.unlock() conn.unlock()
try: try:
...@@ -440,7 +437,7 @@ class Application(object): ...@@ -440,7 +437,7 @@ class Application(object):
break break
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
self.uuid, '0.0.0.0', 0, self.name) self.uuid, '0.0.0.0', 0, self.name)
msg_id = conn.ask(self.local_var.queue, p) msg_id = conn.ask(p)
finally: finally:
conn.unlock() conn.unlock()
try: try:
...@@ -466,15 +463,13 @@ class Application(object): ...@@ -466,15 +463,13 @@ class Application(object):
# wait on one message at a time # wait on one message at a time
conn.lock() conn.lock()
try: try:
msg_id = conn.ask(self.local_var.queue, msg_id = conn.ask(protocol.askPartitionTable([]))
protocol.askPartitionTable([]))
finally: finally:
conn.unlock() conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
conn.lock() conn.lock()
try: try:
msg_id = conn.ask(self.local_var.queue, msg_id = conn.ask(protocol.askNodeInformation())
protocol.askNodeInformation())
finally: finally:
conn.unlock() conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
...@@ -881,8 +876,7 @@ class Application(object): ...@@ -881,8 +876,7 @@ class Application(object):
continue continue
try: try:
conn.ask(self.local_var.queue, conn.ask(protocol.askTIDs(first, last, INVALID_PARTITION))
protocol.askTIDs(first, last, INVALID_PARTITION))
finally: finally:
conn.unlock() conn.unlock()
......
...@@ -462,12 +462,14 @@ class ServerConnection(Connection): ...@@ -462,12 +462,14 @@ class ServerConnection(Connection):
class MTClientConnection(ClientConnection): class MTClientConnection(ClientConnection):
"""A Multithread-safe version of ClientConnection.""" """A Multithread-safe version of ClientConnection."""
def __init__(self, *args, **kwargs):
def __init__(self, local_var, *args, **kwargs):
# _lock is only here for lock debugging purposes. Do not use. # _lock is only here for lock debugging purposes. Do not use.
self._lock = lock = RLock() self._lock = lock = RLock()
self.acquire = lock.acquire self.acquire = lock.acquire
self.release = lock.release self.release = lock.release
self.dispatcher = kwargs.pop('dispatcher') self.dispatcher = kwargs.pop('dispatcher')
self.local_var = local_var
self.lock() self.lock()
try: try:
super(MTClientConnection, self).__init__(*args, **kwargs) super(MTClientConnection, self).__init__(*args, **kwargs)
...@@ -501,10 +503,10 @@ class MTClientConnection(ClientConnection): ...@@ -501,10 +503,10 @@ class MTClientConnection(ClientConnection):
return super(MTClientConnection, self).notify(*args, **kw) return super(MTClientConnection, self).notify(*args, **kw)
@lockCheckWrapper @lockCheckWrapper
def ask(self, queue, packet, timeout=5, additional_timeout=30): def ask(self, packet, timeout=5, additional_timeout=30):
msg_id = self._getNextId() msg_id = self._getNextId()
packet.setId(msg_id) packet.setId(msg_id)
self.dispatcher.register(self, msg_id, queue) self.dispatcher.register(self, msg_id, self.local_var.queue)
self.expectMessage(msg_id) self.expectMessage(msg_id)
self._addPacket(packet) self._addPacket(packet)
return msg_id return msg_id
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment