Commit b551f6bf authored by Grégory Wisniewski's avatar Grégory Wisniewski

Factorize request/answer couples in client application by introducing

_askStorage and _askPrimary methods.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@488 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent deb19696
...@@ -29,9 +29,7 @@ from neo.node import NodeManager, MasterNode, StorageNode ...@@ -29,9 +29,7 @@ from neo.node import NodeManager, MasterNode, StorageNode
from neo.connection import MTClientConnection from neo.connection import MTClientConnection
from neo import protocol from neo import protocol
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \ from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
INVALID_PTID, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ INVALID_PTID, CLIENT_NODE_TYPE, UP_TO_DATE_STATE, INVALID_SERIAL
RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, INVALID_SERIAL
from neo.client.handler import * from neo.client.handler import *
from neo.client.exception import NEOStorageError, NEOStorageConflictError, \ from neo.client.exception import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError NEOStorageNotFoundError
...@@ -100,8 +98,6 @@ class ConnectionPool(object): ...@@ -100,8 +98,6 @@ class ConnectionPool(object):
if app.isNodeReady(): if app.isNodeReady():
logging.info('connected to storage node %s', node) logging.info('connected to storage node %s', node)
# FIXME: remove this assertion
assert conn.getUUID() != INVALID_UUID
conn.setHandler(self.app.storage_handler) conn.setHandler(self.app.storage_handler)
return conn return conn
else: else:
...@@ -255,12 +251,6 @@ class Application(object): ...@@ -255,12 +251,6 @@ class Application(object):
self.local_var.queue = Queue(5) self.local_var.queue = Queue(5)
return self.local_var.queue return self.local_var.queue
def _waitStorageMessage(self, target_conn=None, msg_id=None):
self._waitMessage(target_conn, msg_id, self.storage_handler)
def _waitPrimaryMessage(self, target_conn=None, msg_id=None):
self._waitMessage(target_conn, msg_id, self.primary_handler)
def _waitMessage(self, target_conn = None, msg_id = None, handler=None): def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
"""Wait for a message returned by the dispatcher in queues.""" """Wait for a message returned by the dispatcher in queues."""
local_queue = self.getQueue() local_queue = self.getQueue()
...@@ -283,6 +273,29 @@ class Application(object): ...@@ -283,6 +273,29 @@ class Application(object):
and packet.getType() & 0x8000: and packet.getType() & 0x8000:
break break
def _askStorage(self, conn, packet, timeout=5, additional_timeout=30):
""" Send a request to a storage node and process it's answer """
try:
msg_id = conn.ask(packet, timeout, additional_timeout)
self.dispatcher.register(conn, msg_id, self.getQueue())
finally:
# assume that the connection was already locked
conn.unlock()
self._waitMessage(conn, msg_id, self.storage_handler)
def _askPrimary(self, packet, timeout=5, additional_timeout=30):
""" Send a request to the primary master and process it's answer """
if self.master_conn is None:
raise NEOStorageError("Connection to master node failed")
conn = self.master_conn
conn.lock()
try:
msg_id = conn.ask(packet, timeout, additional_timeout)
self.dispatcher.register(conn, msg_id, self.getQueue())
finally:
conn.unlock()
self._waitMessage(conn, msg_id, self.primary_handler)
def registerDB(self, db, limit): def registerDB(self, db, limit):
self._db = db self._db = db
...@@ -298,15 +311,7 @@ class Application(object): ...@@ -298,15 +311,7 @@ class Application(object):
# we manage a list of oid here to prevent # we manage a list of oid here to prevent
# from asking too many time new oid one by one # from asking too many time new oid one by one
# from master node # from master node
conn = self.master_conn self._askPrimary(protocol.askNewOIDs(25))
conn.lock()
try:
msg_id = conn.ask(protocol.askNewOIDs(25))
self.dispatcher.register(conn, msg_id, self.getQueue())
finally:
conn.unlock()
self._waitPrimaryMessage(conn, msg_id)
if len(self.new_oid_list) <= 0: if len(self.new_oid_list) <= 0:
raise NEOStorageError('new_oid failed') raise NEOStorageError('new_oid failed')
return self.new_oid_list.pop() return self.new_oid_list.pop()
...@@ -356,14 +361,9 @@ class Application(object): ...@@ -356,14 +361,9 @@ class Application(object):
if conn is None: if conn is None:
continue continue
try:
msg_id = conn.ask(protocol.askObject(oid, serial, tid))
self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.asked_object = 0 self.local_var.asked_object = 0
finally: self._askStorage(conn, protocol.askObject(oid, serial, tid))
conn.unlock()
self._waitStorageMessage(conn, msg_id)
if self.local_var.asked_object == -1: if self.local_var.asked_object == -1:
# OID not found # OID not found
break break
...@@ -454,17 +454,7 @@ class Application(object): ...@@ -454,17 +454,7 @@ class Application(object):
# Get a new transaction id if necessary # Get a new transaction id if necessary
if tid is None: if tid is None:
self.local_var.tid = None self.local_var.tid = None
conn = self.master_conn self._askPrimary(protocol.askNewTID())
if conn is None:
raise NEOStorageError("Connection to master node failed")
conn.lock()
try:
msg_id = conn.ask(protocol.askNewTID())
self.dispatcher.register(conn, msg_id, self.getQueue())
finally:
conn.unlock()
# Wait for answer
self._waitPrimaryMessage(conn, msg_id)
if self.local_var.tid is None: if self.local_var.tid is None:
raise NEOStorageError('tpc_begin failed') raise NEOStorageError('tpc_begin failed')
else: else:
...@@ -499,17 +489,12 @@ class Application(object): ...@@ -499,17 +489,12 @@ class Application(object):
if conn is None: if conn is None:
continue continue
try: self.local_var.object_stored = 0
p = protocol.askStoreObject(oid, serial, 1, p = protocol.askStoreObject(oid, serial, 1,
checksum, compressed_data, self.local_var.tid) checksum, compressed_data, self.local_var.tid)
msg_id = conn.ask(p) self._askStorage(conn, p)
self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.object_stored = 0
finally:
conn.unlock()
# Check we don't get any conflict # Check we don't get any conflict
self._waitStorageMessage(conn, msg_id)
if self.local_var.object_stored[0] == -1: if self.local_var.object_stored[0] == -1:
if self.local_var.data_dict.has_key(oid): if self.local_var.data_dict.has_key(oid):
# One storage already accept the object, is it normal ?? # One storage already accept the object, is it normal ??
...@@ -548,16 +533,11 @@ class Application(object): ...@@ -548,16 +533,11 @@ class Application(object):
if conn is None: if conn is None:
continue continue
try: self.local_var.txn_voted = False
p = protocol.askStoreTransaction(self.local_var.tid, p = protocol.askStoreTransaction(self.local_var.tid,
user, desc, ext, oid_list) user, desc, ext, oid_list)
msg_id = msg = conn.ask(p) self._askStorage(conn, p)
self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.txn_voted = False
finally:
conn.unlock()
self._waitStorageMessage(conn, msg_id)
if not self.isTransactionVoted(): if not self.isTransactionVoted():
raise NEOStorageError('tpc_vote failed') raise NEOStorageError('tpc_vote failed')
...@@ -621,17 +601,8 @@ class Application(object): ...@@ -621,17 +601,8 @@ class Application(object):
# Call finish on master # Call finish on master
oid_list = self.local_var.data_dict.keys() oid_list = self.local_var.data_dict.keys()
conn = self.master_conn
conn.lock()
try:
p = protocol.finishTransaction(oid_list, self.local_var.tid) p = protocol.finishTransaction(oid_list, self.local_var.tid)
msg_id = conn.ask(p) self._askPrimary(p)
self.dispatcher.register(conn, msg_id, self.getQueue())
finally:
conn.unlock()
# Wait for answer
self._waitPrimaryMessage(conn, msg_id)
if not self.isTransactionFinished(): if not self.isTransactionFinished():
raise NEOStorageError('tpc_finish failed') raise NEOStorageError('tpc_finish failed')
...@@ -667,16 +638,9 @@ class Application(object): ...@@ -667,16 +638,9 @@ class Application(object):
if conn is None: if conn is None:
continue continue
try:
p = protocol.askTransactionInformation(transaction_id)
msg_id = conn.ask(p)
self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.txn_info = 0 self.local_var.txn_info = 0
finally: self._askStorage(conn, protocol.askTransactionInformation(transaction_id))
conn.unlock()
# Wait for answer
self._waitStorageMessage(conn, msg_id)
if self.local_var.txn_info == -1: if self.local_var.txn_info == -1:
# Tid not found, try with next node # Tid not found, try with next node
continue continue
...@@ -751,7 +715,7 @@ class Application(object): ...@@ -751,7 +715,7 @@ class Application(object):
# Wait for answers from all storages. # Wait for answers from all storages.
# FIXME this is a busy loop. # FIXME this is a busy loop.
while True: while True:
self._waitStorageMessage() self._waitMessage(handler=self.storage_handler)
if len(self.local_var.node_tids.keys()) == len(storage_node_list): if len(self.local_var.node_tids.keys()) == len(storage_node_list):
break break
...@@ -777,16 +741,9 @@ class Application(object): ...@@ -777,16 +741,9 @@ class Application(object):
if conn is None: if conn is None:
continue continue
try:
p = protocol.askTransactionInformation(tid)
msg_id = conn.ask(p)
self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.txn_info = 0 self.local_var.txn_info = 0
finally: self._askStorage(conn, protocol.askTransactionInformation(tid))
conn.unlock()
# Wait for answer
self._waitStorageMessage(conn, msg_id)
if self.local_var.txn_info == -1: if self.local_var.txn_info == -1:
# TID not found, go on with next node # TID not found, go on with next node
continue continue
...@@ -830,15 +787,9 @@ class Application(object): ...@@ -830,15 +787,9 @@ class Application(object):
if conn is None: if conn is None:
continue continue
try:
p = protocol.askObjectHistory(oid, 0, length)
msg_id = conn.ask(p)
self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.history = None self.local_var.history = None
finally: self._askStorage(conn, protocol.askObjectHistory(oid, 0, length))
conn.unlock()
self._waitStorageMessage(conn, msg_id)
if self.local_var.history == -1: if self.local_var.history == -1:
# Not found, go on with next node # Not found, go on with next node
continue continue
...@@ -868,16 +819,10 @@ class Application(object): ...@@ -868,16 +819,10 @@ class Application(object):
if conn is None: if conn is None:
continue continue
try: # ask transaction information
p = protocol.askTransactionInformation(serial)
msg_id = conn.ask(p)
self.dispatcher.register(conn, msg_id, self.getQueue())
self.local_var.txn_info = None self.local_var.txn_info = None
finally: self._askStorage(conn, protocol.askTransactionInformation(serial))
conn.unlock()
# Wait for answer
self._waitStorageMessage(conn, msg_id)
if self.local_var.txn_info == -1: if self.local_var.txn_info == -1:
# TID not found # TID not found
continue continue
...@@ -904,7 +849,7 @@ class Application(object): ...@@ -904,7 +849,7 @@ class Application(object):
close = __del__ close = __del__
def sync(self): def sync(self):
self._waitStorageMessage() self._waitMessage(handler=self.storage_handler)
def connectToPrimaryMasterNode(self): def connectToPrimaryMasterNode(self):
self.master_conn = None self.master_conn = None
...@@ -922,7 +867,7 @@ class Application(object): ...@@ -922,7 +867,7 @@ class Application(object):
master_index = 0 master_index = 0
conn = None conn = None
# Make application execute remaining message if any # Make application execute remaining message if any
self._waitStorageMessage() self._waitMessage(handler=self.storage_handler)
while True: while True:
self.setNodeReady() self.setNodeReady()
if self.primary_master_node is None: if self.primary_master_node is None:
......
...@@ -853,6 +853,57 @@ class ClientApplicationTest(unittest.TestCase): ...@@ -853,6 +853,57 @@ class ClientApplicationTest(unittest.TestCase):
self.assertTrue(app.pt.operational()) self.assertTrue(app.pt.operational())
self.assertEquals(len(app.nm.getNodeList()), 3) self.assertEquals(len(app.nm.getNodeList()), 3)
def test_askStorage(self):
""" _askStorage is private but test it anyway """
app = self.getApp('')
app.dispatcher = Mock()
conn = Mock()
self.test_ok = False
def _waitMessage_hook(app, conn=None, msg_id=None, handler=None):
self.test_ok = True
_waitMessage_old = Application._waitMessage
Application._waitMessage = _waitMessage_hook
packet = protocol.askNewOIDs(10)
try:
app._askStorage(conn, packet)
finally:
Application._waitMessage = _waitMessage_old
# check packet sent, connection unlocked and dispatcher updated
self.assertEquals(len(conn.mockGetNamedCalls('ask')), 1)
self.assertEquals(len(conn.mockGetNamedCalls('unlock')), 1)
self.assertEquals(len(app.dispatcher.mockGetNamedCalls('register')), 1)
# and _waitMessage called
self.assertTrue(self.test_ok)
def test_askPrimary(self):
""" _askPrimary is private but test it anyway """
app = self.getApp('')
app.dispatcher = Mock()
conn = Mock()
app.master_conn = conn
app.primary_handler = object()
self.test_ok = False
def _waitMessage_hook(app, conn=None, msg_id=None, handler=None):
self.assertEquals(handler, app.primary_handler)
self.test_ok = True
_waitMessage_old = Application._waitMessage
Application._waitMessage = _waitMessage_hook
packet = protocol.askNewOIDs(10)
try:
app._askPrimary(packet)
finally:
Application._waitMessage = _waitMessage_old
# check packet sent, connection locked during process and dispatcher updated
self.assertEquals(len(conn.mockGetNamedCalls('ask')), 1)
self.assertEquals(len(conn.mockGetNamedCalls('lock')), 1)
self.assertEquals(len(conn.mockGetNamedCalls('unlock')), 1)
self.assertEquals(len(app.dispatcher.mockGetNamedCalls('register')), 1)
# and _waitMessage called
self.assertTrue(self.test_ok)
# check NEOStorageError is raised when the primary connection is lost
app.master_conn = None
self.assertRaises(NEOStorageError, app._askPrimary, packet)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment