Commit d51920c0 authored by Julien Muchembled's avatar Julien Muchembled

Remove unused 'on_timeout' feature on connections

parent 735fb9d1
No related merge requests found
...@@ -32,7 +32,6 @@ class HandlerSwitcher(object): ...@@ -32,7 +32,6 @@ class HandlerSwitcher(object):
_is_handling = False _is_handling = False
_next_timeout = None _next_timeout = None
_next_timeout_msg_id = None _next_timeout_msg_id = None
_next_on_timeout = None
_pending = ({}, None), _pending = ({}, None),
def __init__(self, handler): def __init__(self, handler):
...@@ -54,7 +53,7 @@ class HandlerSwitcher(object): ...@@ -54,7 +53,7 @@ class HandlerSwitcher(object):
while request_dict: while request_dict:
msg_id, request = request_dict.popitem() msg_id, request = request_dict.popitem()
p.setId(msg_id) p.setId(msg_id)
handler.packetReceived(conn, p, request[3]) handler.packetReceived(conn, p, request[2])
if len(self._pending) == 1: if len(self._pending) == 1:
break break
del self._pending[0] del self._pending[0]
...@@ -66,7 +65,7 @@ class HandlerSwitcher(object): ...@@ -66,7 +65,7 @@ class HandlerSwitcher(object):
""" Return the last (may be unapplied) handler registered """ """ Return the last (may be unapplied) handler registered """
return self._pending[-1][1] return self._pending[-1][1]
def emit(self, request, timeout, on_timeout, kw={}): def emit(self, request, timeout, kw={}):
# register the request in the current handler # register the request in the current handler
_pending = self._pending _pending = self._pending
if self._is_handling: if self._is_handling:
...@@ -85,22 +84,14 @@ class HandlerSwitcher(object): ...@@ -85,22 +84,14 @@ class HandlerSwitcher(object):
if next_timeout is None or timeout < next_timeout: if next_timeout is None or timeout < next_timeout:
self._next_timeout = timeout self._next_timeout = timeout
self._next_timeout_msg_id = msg_id self._next_timeout_msg_id = msg_id
self._next_on_timeout = on_timeout request_dict[msg_id] = answer_class, timeout, kw
request_dict[msg_id] = answer_class, timeout, on_timeout, kw
def getNextTimeout(self): def getNextTimeout(self):
return self._next_timeout return self._next_timeout
def timeout(self, connection): def timeout(self, connection):
msg_id = self._next_timeout_msg_id logging.info('timeout for #0x%08x with %r',
if self._next_on_timeout is not None: self._next_timeout_msg_id, connection)
self._next_on_timeout(connection, msg_id)
if self._next_timeout_msg_id != msg_id:
# on_timeout sent a packet with a smaller timeout
# so keep the connection open
return
# Notify that a timeout occurred
return msg_id
def handle(self, connection, packet): def handle(self, connection, packet):
assert not self._is_handling assert not self._is_handling
...@@ -127,7 +118,7 @@ class HandlerSwitcher(object): ...@@ -127,7 +118,7 @@ class HandlerSwitcher(object):
request_dict, handler = pending[0] request_dict, handler = pending[0]
# checkout the expected answer class # checkout the expected answer class
try: try:
klass, _, _, kw = request_dict.pop(msg_id) klass, _, kw = request_dict.pop(msg_id)
except KeyError: except KeyError:
klass = None klass = None
kw = {} kw = {}
...@@ -153,11 +144,11 @@ class HandlerSwitcher(object): ...@@ -153,11 +144,11 @@ class HandlerSwitcher(object):
# Find next timeout and its msg_id # Find next timeout and its msg_id
next_timeout = None next_timeout = None
for pending in self._pending: for pending in self._pending:
for msg_id, (_, timeout, on_timeout, _) in pending[0].iteritems(): for msg_id, (_, timeout, _) in pending[0].iteritems():
if not next_timeout or timeout < next_timeout[0]: if not next_timeout or timeout < next_timeout[0]:
next_timeout = timeout, msg_id, on_timeout next_timeout = timeout, msg_id
self._next_timeout, self._next_timeout_msg_id, self._next_on_timeout = \ self._next_timeout, self._next_timeout_msg_id = \
next_timeout or (None, None, None) next_timeout or (None, None)
def setHandler(self, handler): def setHandler(self, handler):
can_apply = len(self._pending) == 1 and not self._pending[0][0] can_apply = len(self._pending) == 1 and not self._pending[0][0]
...@@ -434,12 +425,8 @@ class Connection(BaseConnection): ...@@ -434,12 +425,8 @@ class Connection(BaseConnection):
# Although this test is only useful for MTClientConnection, # Although this test is only useful for MTClientConnection,
# it's not worth complicating the code more. # it's not worth complicating the code more.
if self._next_timeout <= time(): if self._next_timeout <= time():
msg_id = handlers.timeout(self) handlers.timeout(self)
if msg_id is None: self.close()
self._next_timeout = time() + self._timeout
else:
logging.info('timeout for #0x%08x with %r', msg_id, self)
self.close()
else: else:
self.idle() self.idle()
...@@ -601,7 +588,7 @@ class Connection(BaseConnection): ...@@ -601,7 +588,7 @@ class Connection(BaseConnection):
packet.setId(self._getNextId() if msg_id is None else msg_id) packet.setId(self._getNextId() if msg_id is None else msg_id)
self._addPacket(packet) self._addPacket(packet)
def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None, **kw): def ask(self, packet, timeout=CRITICAL_TIMEOUT, **kw):
""" """
Send a packet with a new ID and register the expectation of an answer Send a packet with a new ID and register the expectation of an answer
""" """
...@@ -612,7 +599,7 @@ class Connection(BaseConnection): ...@@ -612,7 +599,7 @@ class Connection(BaseConnection):
self._addPacket(packet) self._addPacket(packet)
handlers = self._handlers handlers = self._handlers
t = None if handlers.isPending() else time() t = None if handlers.isPending() else time()
handlers.emit(packet, timeout, on_timeout, kw) handlers.emit(packet, timeout, kw)
if not self._queue: if not self._queue:
next_timeout = self._next_timeout next_timeout = self._next_timeout
self.updateTimeout(t) self.updateTimeout(t)
...@@ -764,14 +751,13 @@ class MTClientConnection(ClientConnection): ...@@ -764,14 +751,13 @@ class MTClientConnection(ClientConnection):
# Alias without lock (cheaper than super()) # Alias without lock (cheaper than super())
_ask = ClientConnection.ask.__func__ _ask = ClientConnection.ask.__func__
def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None, def ask(self, packet, timeout=CRITICAL_TIMEOUT, queue=None, **kw):
queue=None, **kw):
with self.lock: with self.lock:
if queue is None: if queue is None:
if type(packet) is Packets.Ping: if type(packet) is Packets.Ping:
return self._ask(packet, timeout, on_timeout, **kw) return self._ask(packet, timeout, **kw)
raise TypeError('Only Ping packet can be asked' raise TypeError('Only Ping packet can be asked'
' without a queue, got a %r.' % packet) ' without a queue, got a %r.' % packet)
msg_id = self._ask(packet, timeout, on_timeout, **kw) msg_id = self._ask(packet, timeout, **kw)
self.dispatcher.register(self, msg_id, queue) self.dispatcher.register(self, msg_id, queue)
return msg_id return msg_id
...@@ -2060,7 +2060,7 @@ class Test(NEOThreadedTest): ...@@ -2060,7 +2060,7 @@ class Test(NEOThreadedTest):
if (isinstance(packet, Packets.AnswerStoreObject) if (isinstance(packet, Packets.AnswerStoreObject)
and packet.decode()[0]): and packet.decode()[0]):
conn, = cluster.client.getConnectionList(app) conn, = cluster.client.getConnectionList(app)
kw = conn._handlers._pending[0][0][packet._id][3] kw = conn._handlers._pending[0][0][packet._id][2]
return 1 == u64(kw['oid']) and delay_conflict[app.uuid].pop() return 1 == u64(kw['oid']) and delay_conflict[app.uuid].pop()
def writeA(orig, txn_context, oid, serial, data): def writeA(orig, txn_context, oid, serial, data):
if u64(oid) == 1: if u64(oid) == 1:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment