Commit 393e59e0 authored by Vincent Pelletier's avatar Vincent Pelletier

Change (again) the way timeouts are handled.

There are 2 distinct kinds of timeout events:
- an unresponsive node
  This is a connection-level timeout.
  This is handled by the Timeout class, triggering pings and monitoring
  incoming data to decide when remote node is considered dead.
- a "too long" processing from an otherwise responsive node
  This is a per-request timeout.
  This is handled by the HandlerSwitcher class, triggering only
  disconnections when an answer takes too long to arrive (historical
  behaviour, not so useful when exchanging with a single-threaded peer).

Previous implementation mixed both, and had shortcomings (ping would
timeout almost immediately, it was not possible to tell which message
caused a timeout).

Update tests.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2009 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f65762d8
......@@ -31,7 +31,7 @@ from neo import attributeTracker
from neo.util import ReadBuffer
from neo.profiling import profiler_decorator
PING_DELAY = 5
PING_DELAY = 6
PING_TIMEOUT = 5
INCOMING_TIMEOUT = 10
CRITICAL_TIMEOUT = 30
......@@ -70,6 +70,8 @@ def lockCheckWrapper(func):
class HandlerSwitcher(object):
_next_timeout = None
_next_timeout_msg_id = None
def __init__(self, connection, handler):
self._connection = connection
......@@ -87,7 +89,7 @@ class HandlerSwitcher(object):
return self._pending[0][1]
@profiler_decorator
def emit(self, request):
def emit(self, request, timeout):
# register the request in the current handler
_pending = self._pending
assert len(_pending) == 1 or _pending[0][0]
......@@ -96,7 +98,19 @@ class HandlerSwitcher(object):
answer_class = request.getAnswerClass()
assert answer_class is not None, "Not a request"
assert msg_id not in request_dict, "Packet id already expected"
request_dict[msg_id] = answer_class
next_timeout = self._next_timeout
if next_timeout is None or timeout < next_timeout:
self._next_timeout = timeout
self._next_timeout_msg_id = msg_id
request_dict[msg_id] = (answer_class, timeout)
def checkTimeout(self, t):
next_timeout = self._next_timeout
if next_timeout is not None and next_timeout < t:
result = self._next_timeout_msg_id
else:
result = None
return result
@profiler_decorator
def handle(self, packet):
......@@ -109,7 +123,7 @@ class HandlerSwitcher(object):
handler.packetReceived(self._connection, packet)
return
# checkout the expected answer class
klass = request_dict.pop(msg_id, None)
(klass, timeout) = request_dict.pop(msg_id, (None, None))
if klass and isinstance(packet, klass) or packet.isError():
handler.packetReceived(self._connection, packet)
else:
......@@ -122,6 +136,18 @@ class HandlerSwitcher(object):
while len(self._pending) > 1 and not self._pending[0][0]:
del self._pending[0]
logging.debug('Apply handler %r', self._pending[0][1])
if timeout == self._next_timeout:
# Find next timeout and its msg_id
timeout_list = []
extend = timeout_list.extend
for (request_dict, handler) in self._pending:
extend(((timeout, msg_id) \
for msg_id, (_, timeout) in request_dict.iteritems()))
if timeout_list:
timeout_list.sort(key=lambda x: x[0])
self._next_timeout, self._next_timeout_msg_id = timeout_list[0]
else:
self._next_timeout, self._next_timeout_msg_id = None, None
@profiler_decorator
def setHandler(self, handler):
......@@ -136,31 +162,48 @@ class HandlerSwitcher(object):
class Timeout(object):
""" Keep track of current timeouts """
""" Keep track of connection-level timeouts """
def __init__(self):
self._ping_time = None
self._critical_time = None
def update(self, t, timeout=CRITICAL_TIMEOUT):
""" Update the new critical time """
self._ping_time = t + PING_TIMEOUT
critical_time = self._ping_time + timeout
self._critical_time = max(critical_time, self._critical_time)
def update(self, t):
"""
Send occurred:
- set ping time if earlier than existing one
"""
ping_time = self._ping_time
t += PING_DELAY
if ping_time is None or t < ping_time:
self._ping_time = t
def refresh(self, t):
""" Refresh timeout after something received """
"""
Recv occured:
- reschedule next ping time
- as this is an evidence that node is alive, remove pong expectation
"""
self._ping_time = t + PING_DELAY
self._critical_time = None
def ping(self, t):
"""
Ping send occured:
- reschedule next ping time
- set pong expectation
"""
self._ping_time = t + PING_DELAY
self._critical_time = t + PING_TIMEOUT
def softExpired(self, t):
""" Indicate if the soft timeout (ping delay) is reached """
# hard timeout takes precedences
return self._ping_time < t < self._critical_time
""" Do we need to ping ? """
return self._ping_time < t
def hardExpired(self, t):
""" Indicate if hard (or pong) timeout is reached """
# should be called if softExpired if False
return self._critical_time < t or self._ping_time < t
""" Have we reached pong latest arrival time, if set ? """
critical_time = self._critical_time
return critical_time is not None and critical_time < t
class BaseConnection(object):
......@@ -176,16 +219,23 @@ class BaseConnection(object):
event_manager.register(self)
def checkTimeout(self, t):
if self._handlers.isPending():
if self._timeout.softExpired(t):
self._timeout.refresh(t)
self.ping()
handlers = self._handlers
if handlers.isPending():
msg_id = handlers.checkTimeout(t)
if msg_id is not None:
logging.info('timeout for %r with %s:%d', msg_id,
*self.getAddress())
self.close()
self.getHandler().timeoutExpired(self)
elif self._timeout.hardExpired(t):
# critical time reach or pong not received, abort
logging.info('timeout with %s:%d', *(self.getAddress()))
self.notify(Packets.Notify('Timeout'))
self.abort()
self.getHandler().timeoutExpired(self)
elif self._timeout.softExpired(t):
self._timeout.ping(t)
self.ping()
def lock(self):
return 1
......@@ -272,7 +322,7 @@ class ListeningConnection(BaseConnection):
new_conn = ServerConnection(self.getEventManager(), handler,
connector=new_s, addr=addr)
# A request for a node identification should arrive.
self._timeout.update(time(), timeout=INCOMING_TIMEOUT)
self._timeout.update(time())
handler.connectionAccepted(new_conn)
except ConnectorTryAgainException:
pass
......@@ -493,10 +543,11 @@ class Connection(BaseConnection):
msg_id = self._getNextId()
packet.setId(msg_id)
self._addPacket(packet)
t = time()
# If there is no pending request, initialise timeout values.
if not self._handlers.isPending():
self._timeout.update(time(), timeout=timeout)
self._handlers.emit(packet)
self._timeout.update(t)
self._handlers.emit(packet, t + timeout)
return msg_id
@not_closed
......@@ -615,10 +666,11 @@ class MTClientConnection(ClientConnection):
packet.setId(msg_id)
self.dispatcher.register(self, msg_id, self._local_var.queue)
self._addPacket(packet)
t = time()
# If there is no pending request, initialise timeout values.
if not self._handlers.isPending():
self._timeout.update(time(), timeout=timeout)
self._handlers.emit(packet)
self._timeout.update(t)
self._handlers.emit(packet, t + timeout)
return msg_id
finally:
self.unlock()
......
......@@ -19,7 +19,7 @@ from time import time
from mock import Mock
from neo.connection import ListeningConnection, Connection, \
ClientConnection, ServerConnection, MTClientConnection, \
HandlerSwitcher, Timeout
HandlerSwitcher, Timeout, PING_DELAY, PING_TIMEOUT
from neo.connector import getConnectorHandler, registerConnectorHandler
from neo.tests import DoNothingConnector
from neo.connector import ConnectorException, ConnectorTryAgainException, \
......@@ -808,7 +808,7 @@ class HandlerSwitcherTests(NeoTestBase):
def testEmit(self):
self.assertFalse(self._handlers.isPending())
request = self._makeRequest(1)
self._handlers.emit(request)
self._handlers.emit(request, 0)
self.assertTrue(self._handlers.isPending())
def testHandleNotification(self):
......@@ -818,7 +818,7 @@ class HandlerSwitcherTests(NeoTestBase):
self._checkPacketReceived(self._handler, notif1)
# emit a request and delay an handler
request = self._makeRequest(2)
self._handlers.emit(request)
self._handlers.emit(request, 0)
handler = self._makeHandler()
self._handlers.setHandler(handler)
# next notification fall into the current handler
......@@ -835,7 +835,7 @@ class HandlerSwitcherTests(NeoTestBase):
def testHandleAnswer1(self):
# handle with current handler
request = self._makeRequest(1)
self._handlers.emit(request)
self._handlers.emit(request, 0)
answer = self._makeAnswer(1)
self._handlers.handle(answer)
self._checkPacketReceived(self._handler, answer)
......@@ -843,7 +843,7 @@ class HandlerSwitcherTests(NeoTestBase):
def testHandleAnswer2(self):
# handle with blocking handler
request = self._makeRequest(1)
self._handlers.emit(request)
self._handlers.emit(request, 0)
handler = self._makeHandler()
self._handlers.setHandler(handler)
answer = self._makeAnswer(1)
......@@ -863,11 +863,11 @@ class HandlerSwitcherTests(NeoTestBase):
h2 = self._makeHandler()
h3 = self._makeHandler()
# emit all requests and setHandleres
self._handlers.emit(r1)
self._handlers.emit(r1, 0)
self._handlers.setHandler(h1)
self._handlers.emit(r2)
self._handlers.emit(r2, 0)
self._handlers.setHandler(h2)
self._handlers.emit(r3)
self._handlers.emit(r3, 0)
self._handlers.setHandler(h3)
self._checkCurrentHandler(self._handler)
self.assertTrue(self._handlers.isPending())
......@@ -889,9 +889,9 @@ class HandlerSwitcherTests(NeoTestBase):
a3 = self._makeAnswer(3)
h = self._makeHandler()
# emit all requests
self._handlers.emit(r1)
self._handlers.emit(r2)
self._handlers.emit(r3)
self._handlers.emit(r1, 0)
self._handlers.emit(r2, 0)
self._handlers.emit(r3, 0)
self._handlers.setHandler(h)
# process answers
self._handlers.handle(a1)
......@@ -908,59 +908,98 @@ class HandlerSwitcherTests(NeoTestBase):
a2 = self._makeAnswer(2)
h = self._makeHandler()
# emit requests aroung state setHandler
self._handlers.emit(r1)
self._handlers.emit(r1, 0)
self._handlers.setHandler(h)
self._handlers.emit(r2)
self._handlers.emit(r2, 0)
# process answer for next state
self._handlers.handle(a2)
self.checkAborted(self._connection)
def testTimeout(self):
"""
This timeout happens when a request has not been answered for longer
than a duration defined at emit() time.
"""
now = time()
# No timeout when no pending request
self.assertEqual(self._handlers.checkTimeout(now), None)
# Prepare some requests
msg_id_1 = 1
msg_id_2 = 2
msg_id_3 = 3
r1 = self._makeRequest(msg_id_1)
a1 = self._makeAnswer(msg_id_1)
r2 = self._makeRequest(msg_id_2)
r3 = self._makeRequest(msg_id_3)
msg_1_time = now + 5
msg_2_time = msg_1_time + 5
msg_3_time = msg_2_time + 5
# Emit r3 before all other, to test that it's time parameter value
# which is used, not the registration order.
self._handlers.emit(r3, msg_3_time)
self._handlers.emit(r1, msg_1_time)
self._handlers.emit(r2, msg_2_time)
# No timeout before msg_1_time
self.assertEqual(self._handlers.checkTimeout(now), None)
# Timeout for msg_1 after msg_1_time
self.assertEqual(self._handlers.checkTimeout(msg_1_time + 0.5),
msg_id_1)
# If msg_1 met its answer, no timeout after msg_1_time
self._handlers.handle(a1)
self.assertEqual(self._handlers.checkTimeout(msg_1_time + 0.5), None)
# Next timeout is after msg_2_time
self.assertEqual(self._handlers.checkTimeout(msg_2_time + 0.5), msg_id_2)
class TestTimeout(NeoTestBase):
""" assume PING_DELAY=5 """
def setUp(self):
self.initial = time()
self.current = self.initial
self.current = time()
self.timeout = Timeout()
self.timeout.update(self.current)
def checkAfter(self, n, soft, hard):
def _checkAt(self, n, soft, hard):
at = self.current + n
self.assertEqual(soft, self.timeout.softExpired(at))
self.assertEqual(hard, self.timeout.hardExpired(at))
def refreshAfter(self, n):
self.current += n
self.timeout.refresh(self.current)
def _refreshAt(self, n):
self.timeout.refresh(self.current + n)
def testNoTimeout(self):
self.timeout.update(self.initial, 5)
self.checkAfter(1, False, False)
self.checkAfter(4, False, False)
self.refreshAfter(4) # answer received
self.checkAfter(1, False, False)
def _pingAt(self, n):
self.timeout.ping(self.current + n)
def testSoftTimeout(self):
self.timeout.update(self.initial, 5)
self.checkAfter(1, False, False)
self.checkAfter(4, False, False)
self.checkAfter(6, True, True) # ping
self.refreshAfter(8) # pong
self.checkAfter(1, False, False)
self.checkAfter(4, False, True)
"""
Soft timeout is when a ping should be sent to peer to see if it's
still responsive, after seing no life sign for PING_DELAY.
"""
# Before PING_DELAY, no timeout.
self._checkAt(PING_DELAY - 0.5, False, False)
# If nothing came to refresh the timeout, soft timeout will be asserted
# after PING_DELAY.
self._checkAt(PING_DELAY + 0.5, True, False)
# If something refreshes the timeout, soft timeout will not be asserted
# after PING_DELAY.
answer_time = PING_DELAY - 0.5
self._refreshAt(answer_time)
self._checkAt(PING_DELAY + 0.5, False, False)
# ...but it will happen again after PING_DELAY after that answer
self._checkAt(answer_time + PING_DELAY + 0.5, True, False)
def testHardTimeout(self):
self.timeout.update(self.initial, 5)
self.checkAfter(1, False, False)
self.checkAfter(4, False, False)
self.checkAfter(6, True, True) # ping
self.refreshAfter(6) # pong
self.checkAfter(1, False, False)
self.checkAfter(4, False, False)
self.checkAfter(6, False, True) # ping
self.refreshAfter(6) # pong
self.checkAfter(1, False, True) # too late
self.checkAfter(5, False, True)
"""
Hard timeout is when a ping was sent, and any life sign must come
back to us before PING_TIMEOUT.
"""
# A timeout triggered at PING_DELAY, so a ping was sent.
self._pingAt(PING_DELAY)
# Before PING_DELAY + PING_TIMEOUT, no timeout occurs.
self._checkAt(PING_DELAY + PING_TIMEOUT - 0.5, False, False)
# After PING_DELAY + PING_TIMEOUT, hard timeout occurs.
self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, True)
# If anything happened on the connection, there is no hard timeout
# anymore after PING_DELAY + PING_TIMEOUT.
self._refreshAt(PING_DELAY + PING_TIMEOUT - 0.5)
self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, False)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment