Commit 0e516a29 authored by Vincent Pelletier's avatar Vincent Pelletier

Split _waitMessage into simpler methods.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1773 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 7b03536b
...@@ -155,41 +155,65 @@ class Application(object): ...@@ -155,41 +155,65 @@ class Application(object):
self._nm_acquire = lock.acquire self._nm_acquire = lock.acquire
self._nm_release = lock.release self._nm_release = lock.release
def _waitMessage(self, target_conn = None, msg_id = None, handler=None): def _handlePacket(self, conn, packet, handler=None):
"""Wait for a message returned by the dispatcher in queues.""" """
local_queue = self.local_var.queue conn
while True: The connection which received the packet (forwarded to handler).
if msg_id is None: packet
try: The packet to handle.
conn, packet = local_queue.get_nowait() handler
except Empty: The handler to use to handle packet.
break If not given, it will be guessed from connection's not type.
else: """
conn, packet = local_queue.get() if handler is None:
# check fake packet
if packet is None:
if conn.getUUID() == target_conn.getUUID():
raise ConnectionClosed
else:
continue
# Guess the handler to use based on the type of node on the # Guess the handler to use based on the type of node on the
# connection # connection
if handler is None:
node = self.nm.getByAddress(conn.getAddress()) node = self.nm.getByAddress(conn.getAddress())
if node is None: if node is None:
raise ValueError, 'Expecting an answer from a node ' \ raise ValueError, 'Expecting an answer from a node ' \
'which type is not known... Is this right ?' 'which type is not known... Is this right ?'
else:
if node.isStorage(): if node.isStorage():
handler = self.storage_handler handler = self.storage_handler
elif node.isMaster(): elif node.isMaster():
handler = self.primary_handler handler = self.primary_handler
else: else:
raise ValueError, 'Unknown node type: %r' % ( raise ValueError, 'Unknown node type: %r' % (node.__class__, )
node.__class__, )
handler.dispatch(conn, packet) handler.dispatch(conn, packet)
if target_conn is conn and msg_id == packet.getId():
def _waitAnyMessage(self, block=True):
"""
Handle all pending packets.
block
If True (default), will block until at least one packet was
received.
"""
get = self.local_var.queue.get
_handlePacket = self._handlePacket
while True:
try:
conn, packet = get(block)
except Empty:
break break
block = False
try:
_handlePacket(conn, packet)
except ConnectionClosed:
pass
def _waitMessage(self, target_conn, msg_id, handler=None):
"""Wait for a message returned by the dispatcher in queues."""
get = self.local_var.queue.get
_handlePacket = self._handlePacket
while True:
conn, packet = get(True)
if target_conn is conn:
# check fake packet
if packet is None:
raise ConnectionClosed
if msg_id == packet.getId():
self._handlePacket(conn, packet, handler=handler)
break
self._handlePacket(conn, packet)
def _askStorage(self, conn, packet, timeout=5, additional_timeout=30): def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
""" Send a request to a storage node and process it's answer """ """ Send a request to a storage node and process it's answer """
...@@ -756,10 +780,7 @@ class Application(object): ...@@ -756,10 +780,7 @@ class Application(object):
# Wait for answers from all storages. # Wait for answers from all storages.
while len(self.local_var.node_tids) != len(storage_node_list): while len(self.local_var.node_tids) != len(storage_node_list):
try: self._waitAnyMessage()
self._waitMessage(handler=self.storage_handler)
except ConnectionClosed:
continue
# Reorder tids # Reorder tids
ordered_tids = set() ordered_tids = set()
...@@ -893,7 +914,7 @@ class Application(object): ...@@ -893,7 +914,7 @@ class Application(object):
close = __del__ close = __del__
def sync(self): def sync(self):
self._waitMessage() self._waitAnyMessage(False)
def setNodeReady(self): def setNodeReady(self):
self.local_var.node_ready = True self.local_var.node_ready = True
......
...@@ -43,12 +43,11 @@ def _getPartitionTable(self): ...@@ -43,12 +43,11 @@ def _getPartitionTable(self):
self.master_conn = _getMasterConnection(self) self.master_conn = _getMasterConnection(self)
return self.pt return self.pt
def _waitMessage(self, conn=None, msg_id=None, handler=None): def _waitMessage(self, conn, msg_id, handler=None):
if conn is not None and handler is not None: if handler is None:
handler.dispatch(conn, conn.fakeReceived())
else:
raise NotImplementedError raise NotImplementedError
else:
handler.dispatch(conn, conn.fakeReceived())
class ClientApplicationTests(NeoTestBase): class ClientApplicationTests(NeoTestBase):
...@@ -218,7 +217,7 @@ class ClientApplicationTests(NeoTestBase): ...@@ -218,7 +217,7 @@ class ClientApplicationTests(NeoTestBase):
'getAddress': ('127.0.0.1', 0), 'getAddress': ('127.0.0.1', 0),
'fakeReceived': packet, 'fakeReceived': packet,
}) })
app.local_var.queue = Mock({'get_nowait' : (conn, None)}) app.local_var.queue = Mock({'get' : (conn, None)})
app.pt = Mock({ 'getCellListForOID': (cell, ), }) app.pt = Mock({ 'getCellListForOID': (cell, ), })
app.cp = Mock({ 'getConnForCell' : conn}) app.cp = Mock({ 'getConnForCell' : conn})
app.local_var.asked_object = -1 app.local_var.asked_object = -1
...@@ -776,11 +775,11 @@ class ClientApplicationTests(NeoTestBase): ...@@ -776,11 +775,11 @@ class ClientApplicationTests(NeoTestBase):
'getCellListForTID': ReturnValues([cell1], [cell2]), 'getCellListForTID': ReturnValues([cell1], [cell2]),
}) })
app.cp = Mock({ 'getConnForCell': conn}) app.cp = Mock({ 'getConnForCell': conn})
def _waitMessage(self, conn=None, msg_id=None, handler=None): def _waitAnyMessage(self):
self.local_var.node_tids = {uuid1: (tid1, ), uuid2: (tid2, )} self.local_var.node_tids = {uuid1: (tid1, ), uuid2: (tid2, )}
Application._waitMessage = _waitMessage_old Application._waitAnyMessage = _waitAnyMessage_old
_waitMessage_old = Application._waitMessage _waitAnyMessage_old = Application._waitAnyMessage
Application._waitMessage = _waitMessage Application._waitAnyMessage = _waitAnyMessage
def txn_filter(info): def txn_filter(info):
return info['id'] > '\x00' * 8 return info['id'] > '\x00' * 8
result = app.undoLog(0, 4, filter=txn_filter) result = app.undoLog(0, 4, filter=txn_filter)
...@@ -835,42 +834,42 @@ class ClientApplicationTests(NeoTestBase): ...@@ -835,42 +834,42 @@ class ClientApplicationTests(NeoTestBase):
# TODO: test more connection failure cases # TODO: test more connection failure cases
# Seventh packet : askNodeInformation succeeded # Seventh packet : askNodeInformation succeeded
all_passed = [] all_passed = []
def _waitMessage8(self, conn=None, msg_id=None, handler=None): def _waitMessage8(self, conn, msg_id, handler=None):
all_passed.append(1) all_passed.append(1)
# Sixth packet : askPartitionTable succeeded # Sixth packet : askPartitionTable succeeded
def _waitMessage7(self, conn=None, msg_id=None, handler=None): def _waitMessage7(self, conn, msg_id, handler=None):
app.pt = Mock({'operational': True}) app.pt = Mock({'operational': True})
Application._waitMessage = _waitMessage8 Application._waitMessage = _waitMessage8
# fifth packet : request node identification succeeded # fifth packet : request node identification succeeded
def _waitMessage6(self, conn=None, msg_id=None, handler=None): def _waitMessage6(self, conn, msg_id, handler=None):
conn.setUUID('D' * 16) conn.setUUID('D' * 16)
app.uuid = 'C' * 16 app.uuid = 'C' * 16
Application._waitMessage = _waitMessage7 Application._waitMessage = _waitMessage7
# fourth iteration : connection to primary master succeeded # fourth iteration : connection to primary master succeeded
def _waitMessage5(self, conn=None, msg_id=None, handler=None): def _waitMessage5(self, conn, msg_id, handler=None):
app.trying_master_node = app.primary_master_node = Mock({ app.trying_master_node = app.primary_master_node = Mock({
'getAddress': ('192.168.1.1', 10000), 'getAddress': ('192.168.1.1', 10000),
'__str__': 'Fake master node', '__str__': 'Fake master node',
}) })
Application._waitMessage = _waitMessage6 Application._waitMessage = _waitMessage6
# third iteration : node not ready # third iteration : node not ready
def _waitMessage4(app, conn=None, msg_id=None, handler=None): def _waitMessage4(app, conn, msg_id, handler=None):
app.setNodeNotReady() app.setNodeNotReady()
app.trying_master_node = None app.trying_master_node = None
Application._waitMessage = _waitMessage5 Application._waitMessage = _waitMessage5
# second iteration : master node changed # second iteration : master node changed
def _waitMessage3(app, conn=None, msg_id=None, handler=None): def _waitMessage3(app, conn, msg_id, handler=None):
app.primary_master_node = Mock({ app.primary_master_node = Mock({
'getAddress': ('192.168.1.1', 10000), 'getAddress': ('192.168.1.1', 10000),
'__str__': 'Fake master node', '__str__': 'Fake master node',
}) })
Application._waitMessage = _waitMessage4 Application._waitMessage = _waitMessage4
# first iteration : connection failed # first iteration : connection failed
def _waitMessage2(app, conn=None, msg_id=None, handler=None): def _waitMessage2(app, conn, msg_id, handler=None):
app.trying_master_node = None app.trying_master_node = None
Application._waitMessage = _waitMessage3 Application._waitMessage = _waitMessage3
# do nothing for the first call # do nothing for the first call
def _waitMessage1(app, conn=None, msg_id=None, handler=None): def _waitMessage1(app, conn, msg_id, handler=None):
Application._waitMessage = _waitMessage2 Application._waitMessage = _waitMessage2
_waitMessage_old = Application._waitMessage _waitMessage_old = Application._waitMessage
Application._waitMessage = _waitMessage1 Application._waitMessage = _waitMessage1
...@@ -892,7 +891,7 @@ class ClientApplicationTests(NeoTestBase): ...@@ -892,7 +891,7 @@ class ClientApplicationTests(NeoTestBase):
app.dispatcher = Mock() app.dispatcher = Mock()
conn = Mock() conn = Mock()
self.test_ok = False self.test_ok = False
def _waitMessage_hook(app, conn=None, msg_id=None, handler=None): def _waitMessage_hook(app, conn, msg_id, handler=None):
self.test_ok = True self.test_ok = True
_waitMessage_old = Application._waitMessage _waitMessage_old = Application._waitMessage
packet = Packets.AskBeginTransaction(None) packet = Packets.AskBeginTransaction(None)
...@@ -917,7 +916,7 @@ class ClientApplicationTests(NeoTestBase): ...@@ -917,7 +916,7 @@ class ClientApplicationTests(NeoTestBase):
app.master_conn = conn app.master_conn = conn
app.primary_handler = Mock() app.primary_handler = Mock()
self.test_ok = False self.test_ok = False
def _waitMessage_hook(app, conn=None, msg_id=None, handler=None): def _waitMessage_hook(app, conn, msg_id, handler=None):
self.assertTrue(handler is app.primary_handler) self.assertTrue(handler is app.primary_handler)
self.test_ok = True self.test_ok = True
_waitMessage_old = Application._waitMessage _waitMessage_old = Application._waitMessage
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment