Commit d898a83d authored by Julien Muchembled's avatar Julien Muchembled

Do not reconnect too quickly to a node after an error

For example, a backup storage node that was rejected because the upstream
cluster was not ready could reconnect in loop without delay, using 100% CPU
and flooding logs.

A new 'setReconnectionNoDelay' method on Connection can be used for cases where
it's legitimate to quickly reconnect.

With this new delayed reconnection, it's possible to remove the remaining
time.sleep().
parent 71e30fb9
......@@ -19,11 +19,6 @@
could become UP_TO_DATE with appropriate backup_tid, so that the cluster
stays operational. (FEATURE)
- Finish renaming UUID into NID everywhere (CODE)
- Remove sleeps (LATENCY, CPU WASTE)
Code still contains many delays (explicit sleeps or polling timeouts).
They must be removed to be either infinite (sleep until some condition
becomes true, without waking up needlessly in the meantime) or null
(don't wait at all).
- Implements delayed connection acceptation.
Currently, any node that connects too early to another that is busy for
some reasons is immediately rejected with the 'not ready' error code. This
......@@ -34,10 +29,6 @@
- Empty storages rejected during recovery process
Masters implies in the election process should still reject any connection
as the primary master is still unknown.
- Wait before reconnecting to a node after an error. For example, a backup
storage node that is rejected because the upstream cluster is not ready
currently reconnects in loop without delay, using 100% CPU and
flooding logs.
- Implement transaction garbage collection API (FEATURE)
NEO packing implementation does not update transaction metadata when
deleting object revisions. This inconsistency must be made possible to
......
......@@ -263,7 +263,7 @@ class Application(object):
Lookup for the current primary master node
"""
logging.debug('connecting to primary master...')
index = 0
index = -1
ask = self._ask
handler = self.primary_bootstrap_handler
while 1:
......@@ -276,13 +276,8 @@ class Application(object):
else:
# Otherwise, check one by one.
master_list = self.nm.getMasterList()
try:
index = (index + 1) % len(master_list)
self.trying_master_node = master_list[index]
except IndexError:
time.sleep(1)
index = 0
self.trying_master_node = master_list[0]
index += 1
# Connect to master
conn = MTClientConnection(self.em,
self.notifications_handler,
......
......@@ -76,6 +76,7 @@ class ConnectionPool(object):
if not conn.pending() and \
not self.app.dispatcher.registered(conn):
del self.connection_dict[conn.getUUID()]
conn.setReconnectionNoDelay()
conn.close()
logging.debug('_dropConnections: connection to '
'storage node %s:%d closed', *conn.getAddress())
......@@ -125,8 +126,6 @@ class ConnectionPool(object):
if not new_cell_list or self.app.master_conn is None:
break
cell_list = new_cell_list
# wait a bit to avoid a busy loop
time.sleep(1)
def getConnForNode(self, node):
"""Return a locked connection object to a given node
......
......@@ -14,8 +14,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from time import sleep
from . import logging
from .handler import EventHandler
from .protocol import uuid_str, Packets
......@@ -135,9 +133,6 @@ class BootstrapManager(EventHandler):
master_list = nm.getMasterList()
index = (index + 1) % len(master_list)
self.current = master_list[index]
if index == 0:
# tried all known masters, sleep a bit
sleep(1)
if conn is None:
# open the connection
conn = ClientConnection(em, self, self.current)
......
......@@ -20,7 +20,7 @@ from time import time
from . import attributeTracker, logging
from .connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException, \
ConnectorConnectionClosedException
ConnectorConnectionClosedException, ConnectorDelayedConnection
from .locking import RLock
from .protocol import uuid_str, Errors, \
PacketMalformedError, Packets, ParserState
......@@ -28,8 +28,6 @@ from .util import ReadBuffer
CRITICAL_TIMEOUT = 30
connect_limit = 0
class ConnectionClosed(Exception):
pass
......@@ -215,8 +213,6 @@ class BaseConnection(object):
self.connector = connector
self.addr = addr
self._handlers = HandlerSwitcher(handler)
event_manager.register(self)
event_manager.addReader(self)
# XXX: do not use getHandler
getHandler = property(lambda self: self._handlers.getHandler)
......@@ -324,6 +320,7 @@ class ListeningConnection(BaseConnection):
connector = self.ConnectorClass(addr)
BaseConnection.__init__(self, event_manager, handler, connector, addr)
connector.makeListeningConnection()
event_manager.register(self)
def readable(self):
try:
......@@ -505,6 +502,10 @@ class Connection(BaseConnection):
def pending(self):
return self.connector is not None and self.write_buf
@property
def setReconnectionNoDelay(self):
return self.connector.setReconnectionNoDelay
def close(self):
if self.connector is None:
assert self._on_close is None
......@@ -537,22 +538,6 @@ class Connection(BaseConnection):
self._handlers.handle(self, self._queue.pop(0))
self.close()
def _delayed_closure(self):
# Wait at least 1 second between connection failures.
global connect_limit
t = time()
if t < connect_limit:
# Fake _addPacket so that if does not
# try to reenable polling for writing.
self.write_buf[:] = '',
self.em.unregister(self, check_timeout=True)
self.getTimeout = lambda: connect_limit
self.onTimeout = self.lockWrapper(self._delayed_closure)
self.readable = self.writable = lambda: None
else:
connect_limit = t + 1
self._closure()
def _recv(self):
"""Receive data from a connector."""
try:
......@@ -561,7 +546,7 @@ class Connection(BaseConnection):
pass
except ConnectorConnectionRefusedException:
assert self.connecting
self._delayed_closure()
self._closure()
except ConnectorConnectionClosedException:
# connection resetted by peer, according to the man, this error
# should not occurs but it seems it's false
......@@ -674,24 +659,42 @@ class ClientConnection(Connection):
Connection.__init__(self, event_manager, handler, connector, addr)
node.setConnection(self)
handler.connectionStarted(self)
self._connect()
def _connect(self):
try:
try:
connector.makeClientConnection()
self.connector.makeClientConnection()
except ConnectorInProgressException:
event_manager.addWriter(self)
else:
self._connectionCompleted()
self.em.register(self)
self.em.addWriter(self)
except ConnectorDelayedConnection, c:
connect_limit, = c.args
self.getTimeout = lambda: connect_limit
self.onTimeout = self._delayedConnect
self.em.register(self, timeout_only=True)
# Fake _addPacket so that if does not
# try to reenable polling for writing.
self.write_buf.insert(0, '')
except ConnectorConnectionRefusedException:
self._delayed_closure()
self._closure()
except ConnectorException:
# unhandled connector exception
self._closure()
raise
else:
self.em.register(self)
if self.write_buf:
self.em.addWriter(self)
self._connectionCompleted()
def _delayedConnect(self):
del self.getTimeout, self.onTimeout, self.write_buf[0]
self._connect()
def writable(self):
"""Called when self is writable."""
if self.connector.getError():
self._delayed_closure()
self._closure()
else:
self._connectionCompleted()
self.writable()
......@@ -702,6 +705,7 @@ class ClientConnection(Connection):
self.updateTimeout(time())
self.getHandler().connectionCompleted(self)
class ServerConnection(Connection):
"""A connection from a remote node to this node."""
......@@ -716,6 +720,7 @@ class ServerConnection(Connection):
def __init__(self, *args, **kw):
Connection.__init__(self, *args, **kw)
self.em.register(self)
self.updateTimeout(time())
......@@ -725,7 +730,7 @@ class MTConnectionType(type):
if __debug__:
for name in 'analyse', 'answer':
setattr(cls, name, cls.lockCheckWrapper(name))
for name in ('close', 'notify', 'onTimeout',
for name in ('_delayedConnect', 'close', 'notify', 'onTimeout',
'process', 'readable', 'writable'):
setattr(cls, name, cls.__class__.lockWrapper(cls, name))
......
......@@ -16,6 +16,7 @@
import socket
import errno
from time import time
# Global connector registry.
# Fill by calling registerConnectorHandler.
......@@ -28,6 +29,8 @@ class SocketConnector(object):
""" This class is a wrapper for a socket """
is_closed = is_server = None
connect_limit = {}
CONNECT_LIMIT = 1
def __new__(cls, addr, s=None):
if s is None:
......@@ -61,9 +64,17 @@ class SocketConnector(object):
def makeClientConnection(self):
assert self.is_closed is None
addr = self.addr
try:
connect_limit = self.connect_limit[addr]
if time() < connect_limit:
raise ConnectorDelayedConnection(connect_limit)
except KeyError:
pass
self.connect_limit[addr] = time() + self.CONNECT_LIMIT
self.is_server = self.is_closed = False
try:
self._connect(self.addr)
self._connect(addr)
except socket.error, (err, errmsg):
if err == errno.EINPROGRESS:
raise ConnectorInProgressException
......@@ -135,8 +146,17 @@ class SocketConnector(object):
def close(self):
self.is_closed = True
try:
if self.connect_limit[self.addr] < time():
del self.connect_limit[self.addr]
except KeyError:
pass
return self.socket.close()
def setReconnectionNoDelay(self):
"""Mark as successful so that we can reconnect without delay"""
self.connect_limit.pop(self.addr, None)
def __repr__(self):
if self.is_closed is None:
state = 'never opened'
......@@ -187,3 +207,5 @@ class ConnectorConnectionClosedException(ConnectorException):
class ConnectorConnectionRefusedException(ConnectorException):
pass
class ConnectorDelayedConnection(ConnectorException):
pass
......@@ -76,12 +76,16 @@ class EpollEventManager(object):
# epoll_wait always waits for EPOLLERR & EPOLLHUP so we're forced
# to unregister when we want to ignore all events for a connection.
def register(self, conn):
def register(self, conn, timeout_only=False):
fd = conn.getConnector().getDescriptor()
self.connection_dict[fd] = conn
if timeout_only:
self.wakeup()
else:
self.epoll.register(fd)
self.addReader(conn)
def unregister(self, conn, check_timeout=False):
def unregister(self, conn):
new_pending_processing = [x for x in self._pending_processing
if x is not conn]
# Check that we removed at most one entry from
......@@ -90,16 +94,17 @@ class EpollEventManager(object):
self._pending_processing = new_pending_processing
fd = conn.getConnector().getDescriptor()
try:
del self.connection_dict[fd]
self.unregistered.append(fd)
self.epoll.unregister(fd)
except KeyError:
pass
except IOError, e:
if e.errno != ENOENT:
raise
else:
self.reader_set.discard(fd)
self.writer_set.discard(fd)
if not check_timeout:
del self.connection_dict[fd]
self.unregistered.append(fd)
def isIdle(self):
return not (self._pending_processing or self.writer_set)
......
......@@ -44,6 +44,9 @@ class NeoCTL(object):
if not self.connected:
self.connection = ClientConnection(self.em, self.handler,
self.server)
# Never delay reconnection to master. This speeds up unit tests
# and it should not change anything for normal use.
self.connection.setReconnectionNoDelay()
while not self.connected:
self.em.poll(1)
if self.connection is None:
......
......@@ -319,6 +319,7 @@ class Replicator(object):
self.app.master_conn.notify(p)
logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node)
self.getCurrentConnection().setReconnectionNoDelay()
self._nextPartition()
def abort(self, message=''):
......
......@@ -38,7 +38,7 @@ class MasterTests(NEOFunctionalTest):
def testStoppingSecondaryMaster(self):
# Wait for masters to stabilize
self.neo.expectAllMasters(MASTER_NODE_COUNT)
self.neo.expectAllMasters(MASTER_NODE_COUNT, NodeStates.RUNNING)
# Kill
primary_uuid = self.neoctl.getPrimary()
......@@ -61,6 +61,8 @@ class MasterTests(NEOFunctionalTest):
uuid = killed_uuid_list[0]
# Check the state of the primary we just killed
self.neo.expectMasterState(uuid, (None, NodeStates.UNKNOWN))
# BUG: The following check expects neoctl to reconnect before
# the election finishes.
self.assertEqual(self.neo.getPrimary(), None)
# Check that a primary master arised.
self.neo.expectPrimary(timeout=10)
......
......@@ -72,7 +72,7 @@ class ConnectionTests(NeoUnitTestBase):
self.connector = conn.connector
return conn
def _makeConnection(self):
def _makeServerConnection(self):
addr = self.address
self.connector = DummyConnector(addr)
return Connection(self.em, self.handler, self.connector, addr)
......@@ -83,7 +83,7 @@ class ConnectionTests(NeoUnitTestBase):
self.connector = conn.connector
return conn
_makeServerConnection = _makeConnection
_makeConnection = _makeClientConnection
def _checkRegistered(self, n=1):
self.assertEqual(len(self.em.mockGetNamedCalls("register")), n)
......@@ -91,9 +91,6 @@ class ConnectionTests(NeoUnitTestBase):
def _checkUnregistered(self, n=1):
self.assertEqual(len(self.em.mockGetNamedCalls("unregister")), n)
def _checkReaderAdded(self, n=1):
self.assertEqual(len(self.em.mockGetNamedCalls("addReader")), n)
def _checkReaderRemoved(self, n=1):
self.assertEqual(len(self.em.mockGetNamedCalls("removeReader")), n)
......@@ -177,7 +174,6 @@ class ConnectionTests(NeoUnitTestBase):
bc = self._makeListeningConnection(addr=addr)
self.assertEqual(bc.getAddress(), addr)
self._checkRegistered()
self._checkReaderAdded()
self._checkMakeListeningConnection()
# test readable
bc.readable()
......@@ -193,7 +189,6 @@ class ConnectionTests(NeoUnitTestBase):
bc = self._makeListeningConnection(addr=addr)
self.assertEqual(bc.getAddress(), addr)
self._checkRegistered()
self._checkReaderAdded()
self._checkMakeListeningConnection()
# test readable
bc.readable()
......@@ -203,7 +198,6 @@ class ConnectionTests(NeoUnitTestBase):
def test_03_Connection(self):
bc = self._makeConnection()
self.assertEqual(bc.getAddress(), self.address)
self._checkReaderAdded(1)
self._checkReadBuf(bc, '')
self._checkWriteBuf(bc, '')
self.assertEqual(bc.cur_id, 0)
......@@ -568,7 +562,6 @@ class ConnectionTests(NeoUnitTestBase):
self._checkConnectionFailed(0)
# check call to event manager
self.assertIsNot(bc.em, None)
self._checkReaderAdded(1)
self._checkWriterAdded(0)
def test_ClientConnection_init2(self):
......@@ -588,7 +581,6 @@ class ConnectionTests(NeoUnitTestBase):
self._checkConnectionFailed(0)
# check call to event manager
self.assertIsNot(bc.em, None)
self._checkReaderAdded(1)
self._checkWriterAdded(1)
def test_ClientConnection_init3(self):
......@@ -603,7 +595,6 @@ class ConnectionTests(NeoUnitTestBase):
self._checkConnectionCompleted(0)
self._checkConnectionFailed(1)
# check call to event manager
self._checkReaderAdded(1)
self._checkWriterAdded(0)
def test_ClientConnection_writable1(self):
......@@ -621,7 +612,6 @@ class ConnectionTests(NeoUnitTestBase):
self.assertFalse(bc.aborted)
# call
self._checkConnectionCompleted(1)
self._checkReaderAdded(1)
bc.writable()
self.assertFalse(bc.pending())
self.assertFalse(bc.aborted)
......@@ -631,7 +621,6 @@ class ConnectionTests(NeoUnitTestBase):
self._checkConnectionCompleted(1)
self._checkConnectionFailed(0)
self._checkUnregistered(0)
self._checkReaderAdded(1)
self._checkWriterRemoved(1)
self._checkReaderRemoved(0)
self._checkClose(0)
......@@ -648,7 +637,6 @@ class ConnectionTests(NeoUnitTestBase):
self.assertFalse(bc.aborted)
# call
self._checkConnectionCompleted(1)
self._checkReaderAdded(1)
bc.writable()
self.assertFalse(bc.connecting)
self.assertFalse(bc.pending())
......@@ -658,12 +646,10 @@ class ConnectionTests(NeoUnitTestBase):
self._checkConnectionCompleted(1)
self._checkConnectionFailed(0)
self._checkUnregistered(1)
self._checkReaderAdded(1)
def test_14_ServerConnection(self):
bc = self._makeServerConnection()
self.assertEqual(bc.getAddress(), ("127.0.0.7", 93413))
self._checkReaderAdded(1)
self._checkReadBuf(bc, '')
self._checkWriteBuf(bc, '')
self.assertEqual(bc.cur_id, 0)
......
......@@ -29,7 +29,7 @@ import transaction, ZODB
import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app
from neo.client import Storage
from neo.lib import bootstrap, logging
from neo.lib import logging
from neo.lib.connection import BaseConnection, Connection
from neo.lib.connector import SocketConnector, \
ConnectorConnectionRefusedException, ConnectorTryAgainException
......@@ -467,6 +467,7 @@ class ConnectionFilter(object):
class NEOCluster(object):
BaseConnection_getTimeout = staticmethod(BaseConnection.getTimeout)
CONNECT_LIMIT = SocketConnector.CONNECT_LIMIT
SocketConnector_bind = staticmethod(SocketConnector._bind)
SocketConnector_connect = staticmethod(SocketConnector._connect)
SocketConnector_receive = staticmethod(SocketConnector.receive)
......@@ -506,10 +507,8 @@ class NEOCluster(object):
if data:
return data
raise
# TODO: 'sleep' should 'tic' in a smart way, so that storages can be
# safely started even if the cluster isn't.
bootstrap.sleep = lambda seconds: None
BaseConnection.getTimeout = lambda self: None
SocketConnector.CONNECT_LIMIT = 0
SocketConnector._bind = lambda self, addr: \
cls.SocketConnector_bind(self, BIND)
SocketConnector._connect = lambda self, addr: \
......@@ -525,8 +524,8 @@ class NEOCluster(object):
cls._patch_count -= 1
if cls._patch_count:
return
bootstrap.sleep = time.sleep
BaseConnection.getTimeout = cls.BaseConnection_getTimeout
SocketConnector.CONNECT_LIMIT = cls.CONNECT_LIMIT
SocketConnector._bind = cls.SocketConnector_bind
SocketConnector._connect = cls.SocketConnector_connect
SocketConnector.receive = cls.SocketConnector_receive
......
......@@ -21,6 +21,7 @@ from collections import defaultdict
from functools import wraps
from neo.lib import logging
from neo.storage.checker import CHECK_COUNT
from neo.lib.connector import SocketConnector
from neo.lib.connection import ClientConnection
from neo.lib.event import EventManager
from neo.lib.protocol import CellStates, ClusterStates, Packets, \
......@@ -276,12 +277,13 @@ class ReplicationTests(NEOThreadedTest):
upstream.importZODB()(1)
upstream.client.setPoll(0)
count = [0]
def __init__(orig, *args, **kw):
def _connect(orig, conn):
count[0] += 1
orig(*args, **kw)
with Patch(ClientConnection, __init__=__init__):
orig(conn)
with Patch(ClientConnection, _connect=_connect):
upstream.storage.listening_conn.close()
Serialized.tic(); self.assertEqual(count[0], 0)
SocketConnector.CONNECT_LIMIT = 1
Serialized.tic(); count[0] or Serialized.tic()
t = time.time()
# XXX: review API for checking timeouts
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment