Commit 9b70f88f authored by Julien Muchembled's avatar Julien Muchembled

Use TCP keepalives instead of applicative pings

parent d51920c0
...@@ -183,7 +183,6 @@ class BaseConnection(object): ...@@ -183,7 +183,6 @@ class BaseConnection(object):
""" """
from .connector import SocketConnector as ConnectorClass from .connector import SocketConnector as ConnectorClass
KEEP_ALIVE = 60
def __init__(self, event_manager, handler, connector, addr=None): def __init__(self, event_manager, handler, connector, addr=None):
assert connector is not None, "Need a low-level connector" assert connector is not None, "Need a low-level connector"
...@@ -284,9 +283,6 @@ class BaseConnection(object): ...@@ -284,9 +283,6 @@ class BaseConnection(object):
""" """
return attributeTracker.whoSet(self, 'connector') return attributeTracker.whoSet(self, 'connector')
def idle(self):
pass
attributeTracker.track(BaseConnection) attributeTracker.track(BaseConnection)
...@@ -332,6 +328,7 @@ class Connection(BaseConnection): ...@@ -332,6 +328,7 @@ class Connection(BaseConnection):
peer_id = None peer_id = None
_next_timeout = None _next_timeout = None
_parser_state = None _parser_state = None
_idle_timeout = 0
_timeout = 0 _timeout = 0
def __init__(self, event_manager, *args, **kw): def __init__(self, event_manager, *args, **kw):
...@@ -364,17 +361,20 @@ class Connection(BaseConnection): ...@@ -364,17 +361,20 @@ class Connection(BaseConnection):
def asClient(self): def asClient(self):
try: try:
del self.idle del self._idle_timeout
assert self.client
except AttributeError: except AttributeError:
self.client = True self.client = True
else:
assert self.client
self.updateTimeout()
def asServer(self): def asServer(self):
self.server = True self.server = True
def _closeClient(self): def _closeClient(self):
if self.server: if self.server:
del self.idle del self._idle_timeout
self.updateTimeout()
self.client = False self.client = False
self.send(Packets.CloseClient()) self.send(Packets.CloseClient())
else: else:
...@@ -382,7 +382,8 @@ class Connection(BaseConnection): ...@@ -382,7 +382,8 @@ class Connection(BaseConnection):
def closeClient(self): def closeClient(self):
if self.connector is not None and self.client: if self.connector is not None and self.client:
self.idle = self._closeClient self._idle_timeout = 60
self._checkSmallerTimeout()
def isAborted(self): def isAborted(self):
return self.aborted return self.aborted
...@@ -409,11 +410,14 @@ class Connection(BaseConnection): ...@@ -409,11 +410,14 @@ class Connection(BaseConnection):
if not self._queue: if not self._queue:
if not t: if not t:
t = self._next_timeout - self._timeout t = self._next_timeout - self._timeout
self._timeout = self._handlers.getNextTimeout() or self.KEEP_ALIVE self._timeout = self._handlers.getNextTimeout() or \
self._idle_timeout
self._next_timeout = t + self._timeout self._next_timeout = t + self._timeout
_checkSmallerTimeout = updateTimeout
def getTimeout(self): def getTimeout(self):
if not self._queue: if not self._queue and self._timeout:
return self._next_timeout return self._next_timeout
def onTimeout(self): def onTimeout(self):
...@@ -427,8 +431,8 @@ class Connection(BaseConnection): ...@@ -427,8 +431,8 @@ class Connection(BaseConnection):
if self._next_timeout <= time(): if self._next_timeout <= time():
handlers.timeout(self) handlers.timeout(self)
self.close() self.close()
else: elif self._idle_timeout:
self.idle() self._closeClient()
def abort(self): def abort(self):
"""Abort dealing with this connection.""" """Abort dealing with this connection."""
...@@ -600,11 +604,7 @@ class Connection(BaseConnection): ...@@ -600,11 +604,7 @@ class Connection(BaseConnection):
handlers = self._handlers handlers = self._handlers
t = None if handlers.isPending() else time() t = None if handlers.isPending() else time()
handlers.emit(packet, timeout, kw) handlers.emit(packet, timeout, kw)
if not self._queue: self._checkSmallerTimeout(t)
next_timeout = self._next_timeout
self.updateTimeout(t)
if self._next_timeout < next_timeout:
self.em.wakeup()
return msg_id return msg_id
def answer(self, packet): def answer(self, packet):
...@@ -617,9 +617,6 @@ class Connection(BaseConnection): ...@@ -617,9 +617,6 @@ class Connection(BaseConnection):
packet.setId(self.peer_id) packet.setId(self.peer_id)
self._addPacket(packet) self._addPacket(packet)
def idle(self):
self.ask(Packets.Ping())
def _connected(self): def _connected(self):
self.connecting = False self.connecting = False
self.getHandler().connectionCompleted(self) self.getHandler().connectionCompleted(self)
...@@ -680,13 +677,6 @@ class ClientConnection(Connection): ...@@ -680,13 +677,6 @@ class ClientConnection(Connection):
class ServerConnection(Connection): class ServerConnection(Connection):
"""A connection from a remote node to this node.""" """A connection from a remote node to this node."""
# Both server and client must check the connection, in case:
# - the remote crashed brutally (i.e. without closing TCP connections)
# - or packets sent by the remote are dropped (network failure)
# Use different timeout so that in normal condition, server never has to
# ping the client. Otherwise, it would do it about half of the time.
KEEP_ALIVE = Connection.KEEP_ALIVE + 5
server = True server = True
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
...@@ -761,3 +751,10 @@ class MTClientConnection(ClientConnection): ...@@ -761,3 +751,10 @@ class MTClientConnection(ClientConnection):
msg_id = self._ask(packet, timeout, **kw) msg_id = self._ask(packet, timeout, **kw)
self.dispatcher.register(self, msg_id, queue) self.dispatcher.register(self, msg_id, queue)
return msg_id return msg_id
def _checkSmallerTimeout(self, t=None):
if not self._queue:
next_timeout = self._timeout and self._next_timeout
self.updateTimeout(t)
if not next_timeout or self._next_timeout < next_timeout:
self.em.wakeup()
...@@ -57,6 +57,18 @@ class SocketConnector(object): ...@@ -57,6 +57,18 @@ class SocketConnector(object):
self.socket_fd = s.fileno() self.socket_fd = s.fileno()
# always use non-blocking sockets # always use non-blocking sockets
s.setblocking(0) s.setblocking(0)
# TCP keepalive, enabled on both sides to detect:
# - remote host crash
# - network failure
# They're more efficient than applicative pings and we don't want
# to consider the connection dead if the remote node is busy.
# The following 3 lines are specific to Linux. It seems that OSX
# has similar options (TCP_KEEPALIVE/TCP_KEEPINTVL/TCP_KEEPCNT),
# and Windows has SIO_KEEPALIVE_VALS (fixed count of 10).
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# disable Nagle algorithm to reduce latency # disable Nagle algorithm to reduce latency
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.queued = [ENCODED_VERSION] self.queued = [ENCODED_VERSION]
......
...@@ -194,8 +194,6 @@ class EventHandler(object): ...@@ -194,8 +194,6 @@ class EventHandler(object):
conn.answer(Packets.Pong()) conn.answer(Packets.Pong())
def pong(self, conn): def pong(self, conn):
# Ignore PONG packets. The only purpose of ping/pong packets is
# to test/maintain underlying connection.
pass pass
def closeClient(self, conn): def closeClient(self, conn):
......
...@@ -1349,19 +1349,6 @@ class Test(NEOThreadedTest): ...@@ -1349,19 +1349,6 @@ class Test(NEOThreadedTest):
poll(0) poll(0)
self.assertIs(client.connector, None) self.assertIs(client.connector, None)
def testConnectionTimeout(self):
with self.getLoopbackConnection() as conn:
conn.KEEP_ALIVE
def onTimeout(orig):
conn.idle()
orig()
with Patch(conn, KEEP_ALIVE=0):
while conn.connecting:
conn.em.poll(1)
with Patch(conn, onTimeout=onTimeout):
conn.em.poll(1)
self.assertFalse(conn.isClosed())
@with_cluster() @with_cluster()
def testClientDisconnectedFromMaster(self, cluster): def testClientDisconnectedFromMaster(self, cluster):
def disconnect(conn, packet): def disconnect(conn, packet):
......
...@@ -235,9 +235,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -235,9 +235,7 @@ class ReplicationTests(NEOThreadedTest):
assertion failure. assertion failure.
""" """
conn, = backup.master.getConnectionList(backup.upstream.master) conn, = backup.master.getConnectionList(backup.upstream.master)
# trigger ping conn.ask(Packets.Ping())
self.assertFalse(conn.isPending())
conn.onTimeout()
self.assertTrue(conn.isPending()) self.assertTrue(conn.isPending())
# force ping to have expired # force ping to have expired
# connection will be closed before upstream master has time # connection will be closed before upstream master has time
...@@ -276,7 +274,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -276,7 +274,7 @@ class ReplicationTests(NEOThreadedTest):
self.tic(check_timeout=(backup.storage,)) self.tic(check_timeout=(backup.storage,))
# 2nd failed, 3rd deferred # 2nd failed, 3rd deferred
self.assertEqual(count[0], 4) self.assertEqual(count[0], 4)
self.assertTrue(t <= time.time()) self.assertLessEqual(t, time.time())
@backup_test() @backup_test()
def testBackupDelayedUnlockTransaction(self, backup): def testBackupDelayedUnlockTransaction(self, backup):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment