Commit 43029be2 authored by Julien Muchembled's avatar Julien Muchembled

Code clean-up, comment fixes

parent 82eea0cd
...@@ -674,7 +674,7 @@ class Application(ThreadedApplication): ...@@ -674,7 +674,7 @@ class Application(ThreadedApplication):
txn_context = self._txn_container.pop(transaction) txn_context = self._txn_container.pop(transaction)
if txn_context is None: if txn_context is None:
return return
# We want that the involved nodes abort a transaction after any # We want the involved nodes to abort a transaction after any
# other packet sent by the client for this transaction. IOW, if we # other packet sent by the client for this transaction. IOW, if we
# already have a connection with a storage node, potentially with # already have a connection with a storage node, potentially with
# a pending write, aborting only via the master may lead to a race # a pending write, aborting only via the master may lead to a race
...@@ -703,9 +703,8 @@ class Application(ThreadedApplication): ...@@ -703,9 +703,8 @@ class Application(ThreadedApplication):
list(txn_context.conn_dict))) list(txn_context.conn_dict)))
except ConnectionClosed: except ConnectionClosed:
pass pass
# We don't need to flush queue, as it won't be reused by future # No need to flush queue, as it will be destroyed on return,
# transactions (deleted on next line & indexed by transaction object # along with txn_context.
# instance).
self.dispatcher.forget_queue(txn_context.queue, flush_queue=False) self.dispatcher.forget_queue(txn_context.queue, flush_queue=False)
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
......
...@@ -68,11 +68,10 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -68,11 +68,10 @@ class PrimaryNotificationsHandler(MTEventHandler):
def answerTransactionFinished(self, conn, _, tid, callback, cache_dict): def answerTransactionFinished(self, conn, _, tid, callback, cache_dict):
app = self.app app = self.app
app.last_tid = tid app.last_tid = tid
# Update cache
cache = app._cache cache = app._cache
invalidate = cache.invalidate
loading_get = app._loading.get
with app._cache_lock: with app._cache_lock:
invalidate = app._cache.invalidate
loading_get = app._loading.get
for oid, data in cache_dict.iteritems(): for oid, data in cache_dict.iteritems():
# Update ex-latest value in cache # Update ex-latest value in cache
invalidate(oid, tid) invalidate(oid, tid)
......
...@@ -50,7 +50,7 @@ class Transaction(object): ...@@ -50,7 +50,7 @@ class Transaction(object):
self.conflict_dict = {} # {oid: serial} self.conflict_dict = {} # {oid: serial}
# resolved conflicts # resolved conflicts
self.resolved_dict = {} # {oid: serial} self.resolved_dict = {} # {oid: serial}
# involved storage nodes; connection is None is connection was lost # involved storage nodes; connection is None if connection was lost
self.conn_dict = {} # {node_id: connection} self.conn_dict = {} # {node_id: connection}
def __repr__(self): def __repr__(self):
......
...@@ -604,8 +604,8 @@ class Application(BaseApplication): ...@@ -604,8 +604,8 @@ class Application(BaseApplication):
node.send(Packets.StartOperation(bool(self.backup_tid))) node.send(Packets.StartOperation(bool(self.backup_tid)))
uuid = node.getUUID() uuid = node.getUUID()
assert uuid not in self.storage_starting_set assert uuid not in self.storage_starting_set
if uuid not in self.storage_ready_dict: assert uuid not in self.storage_ready_dict
self.storage_starting_set.add(uuid) self.storage_starting_set.add(uuid)
def setStorageReady(self, uuid): def setStorageReady(self, uuid):
self.storage_starting_set.remove(uuid) self.storage_starting_set.remove(uuid)
......
...@@ -34,7 +34,7 @@ class ClientOperationHandler(BaseHandler): ...@@ -34,7 +34,7 @@ class ClientOperationHandler(BaseHandler):
app = self.app app = self.app
if app.operational: if app.operational:
# Even if in most cases, abortFor is called from both this method # Even if in most cases, abortFor is called from both this method
# and BaseMasterHandler.notifyPartitionChanges (especially since # and BaseMasterHandler.notifyNodeInformation (especially since
# storage nodes disconnects unknown clients on their own), these 2 # storage nodes disconnects unknown clients on their own), these 2
# handlers also cover distinct scenarios, so neither of them is # handlers also cover distinct scenarios, so neither of them is
# redundant: # redundant:
......
...@@ -139,10 +139,11 @@ class TransactionManager(EventQueue): ...@@ -139,10 +139,11 @@ class TransactionManager(EventQueue):
def replicating(self, offset_list): def replicating(self, offset_list):
self._replicating.update(offset_list) self._replicating.update(offset_list)
isdisjoint = set(offset_list).isdisjoint if __debug__:
assert isdisjoint(self._replicated), (offset_list, self._replicated) isdisjoint = set(offset_list).isdisjoint
assert isdisjoint(map(self.getPartition, self._store_lock_dict)), ( assert isdisjoint(self._replicated), (offset_list, self._replicated)
offset_list, self._store_lock_dict) assert isdisjoint(map(self.getPartition, self._store_lock_dict)), (
offset_list, self._store_lock_dict)
p = Packets.AskUnfinishedTransactions(offset_list) p = Packets.AskUnfinishedTransactions(offset_list)
self._app.master_conn.ask(p, offset_list=offset_list) self._app.master_conn.ask(p, offset_list=offset_list)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment