Commit 77132157 authored by Julien Muchembled's avatar Julien Muchembled

client: do not limit the number of open connections to storage nodes

There was a bug that connections were not maintained during a TPC,
which caused transactions to be aborted when the limit was reached.

Given that oids are spreaded evenly over all partitions, and that clients always
write to all cells of each involved partitions, clients would spend their time
reconnecting to storage nodes as soon as the limit is reached. So such feature
really looks counter-productive.
parent cfe1b5ca
...@@ -39,9 +39,8 @@ CELL_FAILED = 1 ...@@ -39,9 +39,8 @@ CELL_FAILED = 1
class ConnectionPool(object): class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes.""" """This class manages a pool of connections to storage nodes."""
def __init__(self, app, max_pool_size = 25): def __init__(self, app):
self.app = app self.app = app
self.max_pool_size = max_pool_size
self.connection_dict = {} self.connection_dict = {}
# Define a lock in order to create one connection to # Define a lock in order to create one connection to
# a storage node at a time to avoid multiple connections # a storage node at a time to avoid multiple connections
...@@ -70,21 +69,6 @@ class ConnectionPool(object): ...@@ -70,21 +69,6 @@ class ConnectionPool(object):
return conn return conn
self.notifyFailure(node) self.notifyFailure(node)
def _dropConnections(self):
"""Drop connections."""
for conn in self.connection_dict.values():
# Drop first connection which looks not used
with conn.lock:
if not conn.pending() and \
not self.app.dispatcher.registered(conn):
del self.connection_dict[conn.getUUID()]
conn.setReconnectionNoDelay()
conn.close()
logging.debug('_dropConnections: connection to '
'storage node %s:%d closed', *conn.getAddress())
if len(self.connection_dict) <= self.max_pool_size:
break
def notifyFailure(self, node): def notifyFailure(self, node):
self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE self.node_failure_dict[node.getUUID()] = time.time() + MAX_FAILURE_AGE
...@@ -148,9 +132,6 @@ class ConnectionPool(object): ...@@ -148,9 +132,6 @@ class ConnectionPool(object):
try: try:
return self.connection_dict[uuid] return self.connection_dict[uuid]
except KeyError: except KeyError:
if len(self.connection_dict) > self.max_pool_size:
# must drop some unused connections
self._dropConnections()
# Create new connection to node # Create new connection to node
conn = self._initNodeConnection(node) conn = self._initNodeConnection(node)
if conn is not None: if conn is not None:
...@@ -161,7 +142,12 @@ class ConnectionPool(object): ...@@ -161,7 +142,12 @@ class ConnectionPool(object):
"""Explicitly remove connection when a node is broken.""" """Explicitly remove connection when a node is broken."""
self.connection_dict.pop(node.getUUID(), None) self.connection_dict.pop(node.getUUID(), None)
def flush(self): def closeAll(self):
"""Remove all connections""" with self._lock:
self.connection_dict.clear() while 1:
try:
conn = self.connection_dict.popitem()[1]
except KeyError:
break
conn.setReconnectionNoDelay()
conn.close()
...@@ -688,8 +688,7 @@ class Test(NEOThreadedTest): ...@@ -688,8 +688,7 @@ class Test(NEOThreadedTest):
cluster.start() cluster.start()
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
c.root()[0] = 'ok' c.root()[0] = 'ok'
while cluster.client.cp.connection_dict: cluster.client.cp.closeAll()
cluster.client.cp._dropConnections()
t.commit() # store request t.commit() # store request
finally: finally:
cluster.stop() cluster.stop()
...@@ -699,8 +698,7 @@ class Test(NEOThreadedTest): ...@@ -699,8 +698,7 @@ class Test(NEOThreadedTest):
try: try:
cluster.start() cluster.start()
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
while cluster.client.cp.connection_dict: cluster.client.cp.closeAll()
cluster.client.cp._dropConnections()
tid, (t1,) = cluster.client.transactionLog( tid, (t1,) = cluster.client.transactionLog(
ZERO_TID, c.db().lastTransaction(), 10) ZERO_TID, c.db().lastTransaction(), 10)
finally: finally:
...@@ -711,8 +709,7 @@ class Test(NEOThreadedTest): ...@@ -711,8 +709,7 @@ class Test(NEOThreadedTest):
try: try:
cluster.start() cluster.start()
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
while cluster.client.cp.connection_dict: cluster.client.cp.closeAll()
cluster.client.cp._dropConnections()
t1, = cluster.client.undoLog(0, 10) t1, = cluster.client.undoLog(0, 10)
finally: finally:
cluster.stop() cluster.stop()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment