Commit fa7fbad6 authored by Julien Muchembled's avatar Julien Muchembled

Code clean-up, comment fixes

(cherry picked from commit 43029be2)
parent 011eba12
......@@ -669,7 +669,7 @@ class Application(ThreadedApplication):
txn_context = self._txn_container.pop(transaction)
if txn_context is None:
return
# We want that the involved nodes abort a transaction after any
# We want the involved nodes to abort a transaction after any
# other packet sent by the client for this transaction. IOW, if we
# already have a connection with a storage node, potentially with
# a pending write, aborting only via the master may lead to a race
......@@ -698,9 +698,8 @@ class Application(ThreadedApplication):
txn_context.conn_dict))
except ConnectionClosed:
pass
# We don't need to flush queue, as it won't be reused by future
# transactions (deleted on next line & indexed by transaction object
# instance).
# No need to flush queue, as it will be destroyed on return,
# along with txn_context.
self.dispatcher.forget_queue(txn_context.queue, flush_queue=False)
def tpc_finish(self, transaction, f=None):
......
......@@ -68,11 +68,10 @@ class PrimaryNotificationsHandler(MTEventHandler):
def answerTransactionFinished(self, conn, _, tid, callback, cache_dict):
app = self.app
app.last_tid = tid
# Update cache
cache = app._cache
invalidate = cache.invalidate
loading_get = app._loading.get
with app._cache_lock:
invalidate = app._cache.invalidate
loading_get = app._loading.get
for oid, data in cache_dict.iteritems():
# Update ex-latest value in cache
invalidate(oid, tid)
......
......@@ -50,7 +50,7 @@ class Transaction(object):
self.conflict_dict = {} # {oid: serial}
# resolved conflicts
self.resolved_dict = {} # {oid: serial}
# involved storage nodes; connection is None is connection was lost
# involved storage nodes; connection is None if connection was lost
self.conn_dict = {} # {node_id: connection}
def __repr__(self):
......
......@@ -587,8 +587,8 @@ class Application(BaseApplication):
node.send(Packets.StartOperation(self.backup_tid))
uuid = node.getUUID()
assert uuid not in self.storage_starting_set
if uuid not in self.storage_ready_dict:
self.storage_starting_set.add(uuid)
assert uuid not in self.storage_ready_dict
self.storage_starting_set.add(uuid)
def setStorageReady(self, uuid):
self.storage_starting_set.remove(uuid)
......
......@@ -34,7 +34,7 @@ class ClientOperationHandler(BaseHandler):
app = self.app
if app.operational:
# Even if in most cases, abortFor is called from both this method
# and BaseMasterHandler.notifyPartitionChanges (especially since
# and BaseMasterHandler.notifyNodeInformation (especially since
# storage nodes disconnects unknown clients on their own), these 2
# handlers also cover distinct scenarios, so neither of them is
# redundant:
......
......@@ -139,10 +139,11 @@ class TransactionManager(EventQueue):
def replicating(self, offset_list):
self._replicating.update(offset_list)
isdisjoint = set(offset_list).isdisjoint
assert isdisjoint(self._replicated), (offset_list, self._replicated)
assert isdisjoint(map(self.getPartition, self._store_lock_dict)), (
offset_list, self._store_lock_dict)
if __debug__:
isdisjoint = set(offset_list).isdisjoint
assert isdisjoint(self._replicated), (offset_list, self._replicated)
assert isdisjoint(map(self.getPartition, self._store_lock_dict)), (
offset_list, self._store_lock_dict)
p = Packets.AskUnfinishedTransactions(offset_list)
self._app.master_conn.ask(p, offset_list=offset_list)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment