Commit 8bf149d0 authored by Julien Muchembled's avatar Julien Muchembled

More generic handling of closed MTConnection with pending requests

This optimizes the normal case, and handlers can now take specific action
when requests are cancelled because a connection is closed.
parent 3bd82ef1
...@@ -172,9 +172,6 @@ class Application(ThreadedApplication): ...@@ -172,9 +172,6 @@ class Application(ThreadedApplication):
conn, packet, kw = get(block) conn, packet, kw = get(block)
except Empty: except Empty:
break break
if packet is None:
# connection was closed
continue
block = False block = False
try: try:
_handlePacket(conn, packet, kw) _handlePacket(conn, packet, kw)
......
...@@ -19,6 +19,16 @@ from .locking import Lock, Empty ...@@ -19,6 +19,16 @@ from .locking import Lock, Empty
EMPTY = {} EMPTY = {}
NOBODY = [] NOBODY = []
@apply
class _ConnectionClosed(object):
handler_method_name = 'connectionClosed'
decode = tuple
class getId(object):
def __eq__(self, other):
return True
def giant_lock(func): def giant_lock(func):
def wrapped(self, *args, **kw): def wrapped(self, *args, **kw):
self.lock_acquire() self.lock_acquire()
...@@ -89,7 +99,7 @@ class Dispatcher: ...@@ -89,7 +99,7 @@ class Dispatcher:
continue continue
queue_id = id(queue) queue_id = id(queue)
if queue_id not in notified_set: if queue_id not in notified_set:
queue.put((conn, None, None)) queue.put((conn, _ConnectionClosed, EMPTY))
notified_set.add(queue_id) notified_set.add(queue_id)
_decrefQueue(queue) _decrefQueue(queue)
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
import sys import sys
from . import logging from . import logging
from .connection import ConnectionClosed
from .protocol import ( from .protocol import (
NodeStates, Packets, Errors, BackendNotImplemented, NodeStates, Packets, Errors, BackendNotImplemented,
BrokenNodeDisallowedError, NotReadyError, PacketMalformedError, BrokenNodeDisallowedError, NotReadyError, PacketMalformedError,
...@@ -261,3 +262,6 @@ class AnswerBaseHandler(EventHandler): ...@@ -261,3 +262,6 @@ class AnswerBaseHandler(EventHandler):
def acceptIdentification(*args): def acceptIdentification(*args):
pass pass
def connectionClosed(self, conn):
raise ConnectionClosed
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
import thread, threading, weakref import thread, threading, weakref
from . import logging from . import logging
from .app import BaseApplication from .app import BaseApplication
from .connection import ConnectionClosed
from .debug import register as registerLiveDebugger from .debug import register as registerLiveDebugger
from .dispatcher import Dispatcher from .dispatcher import Dispatcher
from .locking import SimpleQueue from .locking import SimpleQueue
...@@ -141,14 +140,8 @@ class ThreadedApplication(BaseApplication): ...@@ -141,14 +140,8 @@ class ThreadedApplication(BaseApplication):
_handlePacket = self._handlePacket _handlePacket = self._handlePacket
while True: while True:
qconn, qpacket, kw = get(True) qconn, qpacket, kw = get(True)
if conn is qconn: if conn is qconn and msg_id == qpacket.getId():
# check fake packet
if qpacket is None:
raise ConnectionClosed
if msg_id == qpacket.getId():
_handlePacket(qconn, qpacket, kw, handler) _handlePacket(qconn, qpacket, kw, handler)
break break
elif qpacket is None:
continue
_handlePacket(qconn, qpacket, kw) _handlePacket(qconn, qpacket, kw)
return self.getHandlerData() return self.getHandlerData()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment