Commit 2851a274 authored by Julien Muchembled's avatar Julien Muchembled

client: simplify connection management in transaction contexts

With previous commit, there's no point anymore to distinguish storage nodes
for which we only check serials.
parent ab435b28
...@@ -482,9 +482,8 @@ class Application(ThreadedApplication): ...@@ -482,9 +482,8 @@ class Application(ThreadedApplication):
' with new locking TID %s', dump(ttid), dump(serial)) ' with new locking TID %s', dump(ttid), dump(serial))
txn_context.locking_tid = serial txn_context.locking_tid = serial
packet = Packets.AskRebaseTransaction(ttid, serial) packet = Packets.AskRebaseTransaction(ttid, serial)
for uuid, status in txn_context.involved_nodes.iteritems(): for uuid in txn_context.conn_dict:
if status < 2: self._askStorageForWrite(txn_context, uuid, packet)
self._askStorageForWrite(txn_context, uuid, packet)
else: else:
if data is CHECKED_SERIAL: if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, raise ReadConflictError(oid=oid,
...@@ -518,9 +517,11 @@ class Application(ThreadedApplication): ...@@ -518,9 +517,11 @@ class Application(ThreadedApplication):
conn = txn_context.conn_dict[uuid] conn = txn_context.conn_dict[uuid]
try: try:
return conn.ask(packet, queue=txn_context.queue) return conn.ask(packet, queue=txn_context.queue)
except AttributeError:
if conn is not None:
raise
except ConnectionClosed: except ConnectionClosed:
txn_context.involved_nodes[uuid] = 2 txn_context.conn_dict[uuid] = None
del txn_context.conn_dict[uuid]
def waitResponses(self, queue): def waitResponses(self, queue):
"""Wait for all requests to be answered (or their connection to be """Wait for all requests to be answered (or their connection to be
...@@ -551,21 +552,21 @@ class Application(ThreadedApplication): ...@@ -551,21 +552,21 @@ class Application(ThreadedApplication):
packet = Packets.AskStoreTransaction(ttid, str(transaction.user), packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), ext, txn_context.cache_dict) str(transaction.description), ext, txn_context.cache_dict)
queue = txn_context.queue queue = txn_context.queue
involved_nodes = txn_context.involved_nodes conn_dict = txn_context.conn_dict
# Ask in parallel all involved storage nodes to commit object metadata. # Ask in parallel all involved storage nodes to commit object metadata.
# Nodes that store the transaction metadata get a special packet. # Nodes that store the transaction metadata get a special packet.
trans_nodes = txn_context.write(self, packet, ttid) trans_nodes = txn_context.write(self, packet, ttid)
packet = Packets.AskVoteTransaction(ttid) packet = Packets.AskVoteTransaction(ttid)
for uuid, status in involved_nodes.iteritems(): for uuid in conn_dict:
if status < 2 and uuid not in trans_nodes: if uuid not in trans_nodes:
self._askStorageForWrite(txn_context, uuid, packet) self._askStorageForWrite(txn_context, uuid, packet)
self.waitStoreResponses(txn_context) self.waitStoreResponses(txn_context)
if 2 in involved_nodes.itervalues(): # unlikely if None in conn_dict.itervalues(): # unlikely
# If some writes failed, we must first check whether # If some writes failed, we must first check whether
# all oids have been locked by at least one node. # all oids have been locked by at least one node.
failed = {node.getUUID(): node.isRunning() failed = {node.getUUID(): node.isRunning()
for node in self.nm.getStorageList() for node in self.nm.getStorageList()
if involved_nodes.get(node.getUUID()) == 2} if conn_dict.get(node.getUUID(), 0) is None}
if txn_context.lockless_dict: if txn_context.lockless_dict:
getCellList = self.pt.getCellList getCellList = self.pt.getCellList
for offset, uuid_set in txn_context.lockless_dict.iteritems(): for offset, uuid_set in txn_context.lockless_dict.iteritems():
...@@ -616,10 +617,11 @@ class Application(ThreadedApplication): ...@@ -616,10 +617,11 @@ class Application(ThreadedApplication):
# forever. # forever.
p = Packets.AbortTransaction(txn_context.ttid, ()) p = Packets.AbortTransaction(txn_context.ttid, ())
for conn in txn_context.conn_dict.itervalues(): for conn in txn_context.conn_dict.itervalues():
try: if conn is not None:
conn.send(p) try:
except ConnectionClosed: conn.send(p)
pass except ConnectionClosed:
pass
# Because we want to be sure that the involved nodes are notified, # Because we want to be sure that the involved nodes are notified,
# we still have to send the full list to the master. Most of the # we still have to send the full list to the master. Most of the
# time, the storage nodes get 2 AbortTransaction packets, and the # time, the storage nodes get 2 AbortTransaction packets, and the
...@@ -633,7 +635,7 @@ class Application(ThreadedApplication): ...@@ -633,7 +635,7 @@ class Application(ThreadedApplication):
else: else:
try: try:
notify(Packets.AbortTransaction(txn_context.ttid, notify(Packets.AbortTransaction(txn_context.ttid,
txn_context.involved_nodes)) txn_context.conn_dict))
except ConnectionClosed: except ConnectionClosed:
pass pass
# We don't need to flush queue, as it won't be reused by future # We don't need to flush queue, as it won't be reused by future
...@@ -742,7 +744,7 @@ class Application(ThreadedApplication): ...@@ -742,7 +744,7 @@ class Application(ThreadedApplication):
# conflicts. For example, if a network failure happened # conflicts. For example, if a network failure happened
# only between the client and the storage, the latter would # only between the client and the storage, the latter would
# still be readable until we commit. # still be readable until we commit.
if txn_context.involved_nodes.get(cell.getUUID(), 0) < 2] if txn_context.conn_dict.get(cell.getUUID(), 0) is not None]
storage_conn = getConnForNode( storage_conn = getConnForNode(
min(cell_list, key=getCellSortKey).getNode()) min(cell_list, key=getCellSortKey).getNode())
storage_conn.ask(Packets.AskObjectUndoSerial(ttid, storage_conn.ask(Packets.AskObjectUndoSerial(ttid,
...@@ -949,6 +951,6 @@ class Application(ThreadedApplication): ...@@ -949,6 +951,6 @@ class Application(ThreadedApplication):
assert oid not in txn_context.data_dict, oid assert oid not in txn_context.data_dict, oid
packet = Packets.AskCheckCurrentSerial(ttid, oid, serial) packet = Packets.AskCheckCurrentSerial(ttid, oid, serial)
txn_context.data_dict[oid] = CHECKED_SERIAL, serial, txn_context.write( txn_context.data_dict[oid] = CHECKED_SERIAL, serial, txn_context.write(
self, packet, oid, 0, oid=oid) self, packet, oid, oid=oid)
self._waitAnyTransactionMessage(txn_context, False) self._waitAnyTransactionMessage(txn_context, False)
...@@ -99,9 +99,7 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -99,9 +99,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
conn.ask(Packets.AskRebaseObject(ttid, oid), conn.ask(Packets.AskRebaseObject(ttid, oid),
queue=queue, oid=oid) queue=queue, oid=oid)
except ConnectionClosed: except ConnectionClosed:
uuid = conn.getUUID() txn_context.conn_dict[conn.getUUID()] = None
txn_context.involved_nodes[uuid] = 2
del txn_context.conn_dict[uuid]
def answerRebaseObject(self, conn, conflict, oid): def answerRebaseObject(self, conn, conflict, oid):
if conflict: if conflict:
...@@ -116,7 +114,7 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -116,7 +114,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
# We should still be waiting for an answer from this node, # We should still be waiting for an answer from this node,
# unless we lost connection. # unless we lost connection.
assert conn.uuid in txn_context.data_dict[oid][2] or \ assert conn.uuid in txn_context.data_dict[oid][2] or \
txn_context.involved_nodes[conn.uuid] == 2 txn_context.conn_dict[conn.uuid] is None
return return
assert oid in txn_context.data_dict assert oid in txn_context.data_dict
if serial <= txn_context.conflict_dict.get(oid, ''): if serial <= txn_context.conflict_dict.get(oid, ''):
......
...@@ -49,36 +49,26 @@ class Transaction(object): ...@@ -49,36 +49,26 @@ class Transaction(object):
self.conflict_dict = {} # {oid: serial} self.conflict_dict = {} # {oid: serial}
# resolved conflicts # resolved conflicts
self.resolved_dict = {} # {oid: serial} self.resolved_dict = {} # {oid: serial}
# Keys are node ids instead of Node objects because a node may # involved storage nodes; connection is None is connection was lost
# disappear from the cluster. In any case, we always have to check
# if the id is still known by the NodeManager.
# status: 0 -> check only, 1 -> store, 2 -> failed
self.involved_nodes = {} # {node_id: status}
self.conn_dict = {} # {node_id: connection} self.conn_dict = {} # {node_id: connection}
def wakeup(self, conn): def wakeup(self, conn):
self.queue.put((conn, _WakeupPacket, {})) self.queue.put((conn, _WakeupPacket, {}))
def write(self, app, packet, object_id, store=1, **kw): def write(self, app, packet, object_id, **kw):
uuid_list = [] uuid_list = []
pt = app.pt pt = app.pt
involved = self.involved_nodes conn_dict = self.conn_dict
object_id = pt.getPartition(object_id) object_id = pt.getPartition(object_id)
for cell in pt.getCellList(object_id): for cell in pt.getCellList(object_id):
node = cell.getNode() node = cell.getNode()
uuid = node.getUUID() uuid = node.getUUID()
status = involved.get(uuid, -1) try:
if status < store:
involved[uuid] = store
elif status > 1:
continue
if status < 0:
conn = self.conn_dict[uuid] = app.cp.getConnForNode(node)
else:
conn = self.conn_dict[uuid]
if conn is not None:
try: try:
if status < 0 and self.locking_tid and 'oid' in kw: conn = conn_dict[uuid]
except KeyError:
conn = conn_dict[uuid] = app.cp.getConnForNode(node)
if self.locking_tid and 'oid' in kw:
# A deadlock happened but this node is not aware of it. # A deadlock happened but this node is not aware of it.
# Tell it to write-lock with the same locking tid as # Tell it to write-lock with the same locking tid as
# for the other nodes. The condition on kw is to # for the other nodes. The condition on kw is to
...@@ -86,13 +76,13 @@ class Transaction(object): ...@@ -86,13 +76,13 @@ class Transaction(object):
# transaction metadata. # transaction metadata.
conn.ask(Packets.AskRebaseTransaction( conn.ask(Packets.AskRebaseTransaction(
self.ttid, self.locking_tid), queue=self.queue) self.ttid, self.locking_tid), queue=self.queue)
conn.ask(packet, queue=self.queue, **kw) conn.ask(packet, queue=self.queue, **kw)
uuid_list.append(uuid) uuid_list.append(uuid)
continue except AttributeError:
except ConnectionClosed: if conn is not None:
pass raise
del self.conn_dict[uuid] except ConnectionClosed:
involved[uuid] = 2 conn_dict[uuid] = None
if uuid_list: if uuid_list:
return uuid_list return uuid_list
raise NEOStorageError( raise NEOStorageError(
...@@ -146,9 +136,9 @@ class Transaction(object): ...@@ -146,9 +136,9 @@ class Transaction(object):
self.cache_dict[oid] = data self.cache_dict[oid] = data
def nodeLost(self, app, uuid): def nodeLost(self, app, uuid):
# The following 2 lines are sometimes redundant with the 2 in write(). # The following line is sometimes redundant
self.involved_nodes[uuid] = 2 # with the one in `except ConnectionClosed:` clauses.
self.conn_dict.pop(uuid, None) self.conn_dict[uuid] = None
for oid in list(self.data_dict): for oid in list(self.data_dict):
self.written(app, uuid, oid) self.written(app, uuid, oid)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment