Commit 9c3b756d authored by Vincent Pelletier's avatar Vincent Pelletier

When asking dispatcher to forget a packet, wake its queue

This fixes cases where a thread is expecting some answer from queue, and
a packet gets forgotten, as it would block until next expected packet
arrives (if there is no next packet, it will wait forever).

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2142 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent db0bd2fc
...@@ -41,7 +41,7 @@ from neo.client.exception import NEOStorageError ...@@ -41,7 +41,7 @@ from neo.client.exception import NEOStorageError
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
from neo.exception import NeoException from neo.exception import NeoException
from neo.client.handlers import storage, master from neo.client.handlers import storage, master
from neo.dispatcher import Dispatcher from neo.dispatcher import Dispatcher, ForgottenPacket
from neo.client.poll import ThreadedPoll from neo.client.poll import ThreadedPoll
from neo.client.iterator import Iterator from neo.client.iterator import Iterator
from neo.client.mq import MQ from neo.client.mq import MQ
...@@ -227,8 +227,8 @@ class Application(object): ...@@ -227,8 +227,8 @@ class Application(object):
conn, packet = get(block) conn, packet = get(block)
except Empty: except Empty:
break break
if packet is None: if packet is None or isinstance(packet, ForgottenPacket):
# connection was closed # connection was closed or some packet was forgotten
continue continue
block = False block = False
try: try:
...@@ -243,14 +243,18 @@ class Application(object): ...@@ -243,14 +243,18 @@ class Application(object):
_handlePacket = self._handlePacket _handlePacket = self._handlePacket
while True: while True:
conn, packet = get(True) conn, packet = get(True)
is_forgotten = isinstance(packet, ForgottenPacket)
if target_conn is conn: if target_conn is conn:
# check fake packet # check fake packet
if packet is None: if packet is None:
raise ConnectionClosed raise ConnectionClosed
if msg_id == packet.getId(): if msg_id == packet.getId():
if is_forgotten:
raise ValueError, 'ForgottenPacket for an ' \
'explicitely expected packet.'
self._handlePacket(conn, packet, handler=handler) self._handlePacket(conn, packet, handler=handler)
break break
elif packet is not None: elif not is_forgotten and packet is not None:
self._handlePacket(conn, packet) self._handlePacket(conn, packet)
@profiler_decorator @profiler_decorator
......
...@@ -20,6 +20,18 @@ from neo.profiling import profiler_decorator ...@@ -20,6 +20,18 @@ from neo.profiling import profiler_decorator
EMPTY = {} EMPTY = {}
NOBODY = [] NOBODY = []
class ForgottenPacket(object):
"""
Instances of this class will be pushed to queue when an expected answer
is being forgotten. Its purpose is similar to pushing "None" when
connection is closed, but the meaning is different.
"""
def __init__(self, msg_id):
self.msg_id = msg_id
def getId(self):
return self.msg_id
def giant_lock(func): def giant_lock(func):
def wrapped(self, *args, **kw): def wrapped(self, *args, **kw):
self.lock_acquire() self.lock_acquire()
...@@ -95,6 +107,7 @@ class Dispatcher: ...@@ -95,6 +107,7 @@ class Dispatcher:
if queue is NOBODY: if queue is NOBODY:
raise KeyError, 'Already expected by NOBODY: %r, %r' % ( raise KeyError, 'Already expected by NOBODY: %r, %r' % (
conn, msg_id) conn, msg_id)
queue.put((conn, ForgottenPacket(msg_id)))
self.queue_dict[id(queue)] -= 1 self.queue_dict[id(queue)] -= 1
message_table[msg_id] = NOBODY message_table[msg_id] = NOBODY
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
import unittest import unittest
from mock import Mock from mock import Mock
from neo.dispatcher import Dispatcher from neo.dispatcher import Dispatcher, ForgottenPacket
from Queue import Queue from Queue import Queue
class DispatcherTests(unittest.TestCase): class DispatcherTests(unittest.TestCase):
...@@ -116,6 +116,13 @@ class DispatcherTests(unittest.TestCase): ...@@ -116,6 +116,13 @@ class DispatcherTests(unittest.TestCase):
self.dispatcher.register(conn, 1, queue) self.dispatcher.register(conn, 1, queue)
# ...and forget about it # ...and forget about it
self.dispatcher.forget(conn, 1) self.dispatcher.forget(conn, 1)
# A ForgottenPacket must have been put in the queue
queue_conn, packet = queue.get(block=False)
self.assertTrue(isinstance(packet, ForgottenPacket), packet)
# ...with appropriate packet id
self.assertEqual(packet.getId(), 1)
# ...and appropriate connection
self.assertTrue(conn is queue_conn, (conn, queue_conn))
# If forgotten twice, it must raise a KeyError # If forgotten twice, it must raise a KeyError
self.assertRaises(KeyError, self.dispatcher.forget, conn, 1) self.assertRaises(KeyError, self.dispatcher.forget, conn, 1)
# Event arrives, return value must be True (it was expected) # Event arrives, return value must be True (it was expected)
...@@ -127,6 +134,7 @@ class DispatcherTests(unittest.TestCase): ...@@ -127,6 +134,7 @@ class DispatcherTests(unittest.TestCase):
self.dispatcher.register(conn, 1, queue) self.dispatcher.register(conn, 1, queue)
# ...and forget about it # ...and forget about it
self.dispatcher.forget(conn, 1) self.dispatcher.forget(conn, 1)
queue.get(block=False)
# No exception must happen if connection is lost. # No exception must happen if connection is lost.
self.dispatcher.unregister(conn) self.dispatcher.unregister(conn)
# Forgotten message's queue must not have received a "None" # Forgotten message's queue must not have received a "None"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment