Commit a7314b16 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d14e3e7f
...@@ -228,8 +228,7 @@ class Application(ThreadedApplication): ...@@ -228,8 +228,7 @@ class Application(ThreadedApplication):
"Too many connection failures to the primary master") "Too many connection failures to the primary master")
logging.info('Connected to %s', self.primary_master_node) logging.info('Connected to %s', self.primary_master_node)
try: try:
# NOTE # Request identification and required informations to be NOTE
# Request identification and required informations to be
# operational. Might raise ConnectionClosed so that the new # operational. Might raise ConnectionClosed so that the new
# primary can be looked-up again. # primary can be looked-up again.
logging.info('Initializing from master') logging.info('Initializing from master')
......
...@@ -212,7 +212,7 @@ class PartitionTable(object): ...@@ -212,7 +212,7 @@ class PartitionTable(object):
if offset >= self.getPartitions(): if offset >= self.getPartitions():
raise IndexError raise IndexError
for uuid, state in row: for uuid, state in row:
node = nm.getByUUID(uuid) # NOTE pt <-> nm interrelation node = nm.getByUUID(uuid)
# the node must be known by the node manager # the node must be known by the node manager
assert node is not None assert node is not None
self._setCell(offset, node, state) self._setCell(offset, node, state)
...@@ -233,7 +233,7 @@ class PartitionTable(object): ...@@ -233,7 +233,7 @@ class PartitionTable(object):
break break
readable_list += row readable_list += row
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
node = nm.getByUUID(uuid) # NOTE pt <-> nm interrelation node = nm.getByUUID(uuid)
assert node is not None, 'No node found for uuid ' + uuid_str(uuid) assert node is not None, 'No node found for uuid ' + uuid_str(uuid)
self._setCell(offset, node, state) self._setCell(offset, node, state)
self.logUpdated() self.logUpdated()
......
...@@ -140,7 +140,7 @@ class ThreadedApplication(BaseApplication): ...@@ -140,7 +140,7 @@ class ThreadedApplication(BaseApplication):
_handlePacket = self._handlePacket _handlePacket = self._handlePacket
while True: while True:
qconn, qpacket, kw = get(True) qconn, qpacket, kw = get(True)
if conn is qconn and msg_id == qpacket.getId(): # NOTE selector on msg_id if conn is qconn and msg_id == qpacket.getId():
_handlePacket(qconn, qpacket, kw, handler) _handlePacket(qconn, qpacket, kw, handler)
break break
_handlePacket(qconn, qpacket, kw) _handlePacket(qconn, qpacket, kw)
......
...@@ -520,7 +520,7 @@ class Application(BaseApplication): ...@@ -520,7 +520,7 @@ class Application(BaseApplication):
for client_node in self.nm.getClientList(only_identified=True): for client_node in self.nm.getClientList(only_identified=True):
if client_node is transaction_node: if client_node is transaction_node:
client_node.send(Packets.AnswerTransactionFinished(ttid, tid), client_node.send(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.getMessageId()) # NOTE msgid: out-of-order answer msg_id=txn.getMessageId())
else: else:
# NOTE notifies all clients irregardless of whether who was subscribed # NOTE notifies all clients irregardless of whether who was subscribed
client_node.send(invalidate_objects) client_node.send(invalidate_objects)
......
...@@ -63,7 +63,6 @@ class AdministrationHandler(MasterHandler): ...@@ -63,7 +63,6 @@ class AdministrationHandler(MasterHandler):
raise ProtocolError('Cannot exit recovery without any ' raise ProtocolError('Cannot exit recovery without any '
'storage node') 'storage node')
for node in storage_list: for node in storage_list:
# XXX note vvv ^^^
assert node.isPending(), node assert node.isPending(), node
if node.getConnection().isPending(): if node.getConnection().isPending():
# XXX: It's wrong to use ProtocolError here. We must reply # XXX: It's wrong to use ProtocolError here. We must reply
......
...@@ -115,5 +115,5 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -115,5 +115,5 @@ class StorageServiceHandler(BaseServiceHandler):
if not uid_set: if not uid_set:
app.packing = None app.packing = None
if not client.isClosed(): if not client.isClosed():
client.send(Packets.AnswerPack(True), msg_id) # NOTE msg_id: out-of-order answer client.send(Packets.AnswerPack(True), msg_id)
...@@ -430,7 +430,6 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -430,7 +430,6 @@ class PartitionTable(neo.lib.pt.PartitionTable):
except AttributeError: except AttributeError:
pass pass
# NOTE
def setBackupTidDict(self, backup_tid_dict): def setBackupTidDict(self, backup_tid_dict):
for row in self.partition_list: for row in self.partition_list:
for cell in row: for cell in row:
......
...@@ -80,7 +80,6 @@ class InitializationHandler(BaseMasterHandler): ...@@ -80,7 +80,6 @@ class InitializationHandler(BaseMasterHandler):
dm.unlockTransaction(tid, ttid) dm.unlockTransaction(tid, ttid)
dm.commit() dm.commit()
# NOTE M -> S "start operational"
def startOperation(self, conn, backup): def startOperation(self, conn, backup):
self.app.operational = True self.app.operational = True
# XXX: see comment in protocol # XXX: see comment in protocol
......
...@@ -157,7 +157,7 @@ class StorageOperationHandler(EventHandler): ...@@ -157,7 +157,7 @@ class StorageOperationHandler(EventHandler):
def check(): def check():
r = app.dm.checkTIDRange(*args) r = app.dm.checkTIDRange(*args)
try: try:
conn.send(Packets.AnswerCheckTIDRange(*r), msg_id) # NOTE msg_id: out-of-order answer conn.send(Packets.AnswerCheckTIDRange(*r), msg_id)
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
# Splitting this task would cause useless overhead. However, a # Splitting this task would cause useless overhead. However, a
...@@ -176,7 +176,7 @@ class StorageOperationHandler(EventHandler): ...@@ -176,7 +176,7 @@ class StorageOperationHandler(EventHandler):
def check(): def check():
r = app.dm.checkSerialRange(*args) r = app.dm.checkSerialRange(*args)
try: try:
conn.send(Packets.AnswerCheckSerialRange(*r), msg_id) # NOTE msg_id: out-of-order answer conn.send(Packets.AnswerCheckSerialRange(*r), msg_id)
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
return; yield # same as in askCheckTIDRange return; yield # same as in askCheckTIDRange
...@@ -186,7 +186,6 @@ class StorageOperationHandler(EventHandler): ...@@ -186,7 +186,6 @@ class StorageOperationHandler(EventHandler):
def askFetchTransactions(self, conn, partition, length, min_tid, max_tid, def askFetchTransactions(self, conn, partition, length, min_tid, max_tid,
tid_list): tid_list):
app = self.app app = self.app
# NOTE XXX
if app.tm.isLockedTid(max_tid): if app.tm.isLockedTid(max_tid):
# Wow, backup cluster is fast. Requested transactions are still in # Wow, backup cluster is fast. Requested transactions are still in
# ttrans/ttobj so wait a little. # ttrans/ttobj so wait a little.
...@@ -219,7 +218,7 @@ class StorageOperationHandler(EventHandler): ...@@ -219,7 +218,7 @@ class StorageOperationHandler(EventHandler):
# Sending such packet does not mark the connection # Sending such packet does not mark the connection
# for writing if there's too little data in the buffer. # for writing if there's too little data in the buffer.
conn.send(Packets.AddTransaction(tid, user, conn.send(Packets.AddTransaction(tid, user,
desc, ext, packed, ttid, oid_list), msg_id) # NOTE msg_id: ooo answer desc, ext, packed, ttid, oid_list), msg_id)
# To avoid delaying several connections simultaneously, # To avoid delaying several connections simultaneously,
# and also prevent the backend from scanning different # and also prevent the backend from scanning different
# parts of the DB at the same time, we ask the # parts of the DB at the same time, we ask the
...@@ -228,7 +227,7 @@ class StorageOperationHandler(EventHandler): ...@@ -228,7 +227,7 @@ class StorageOperationHandler(EventHandler):
# is flushing another one for a concurrent connection. # is flushing another one for a concurrent connection.
yield conn.buffering yield conn.buffering
conn.send(Packets.AnswerFetchTransactions( conn.send(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id) # NOTE msg_id: out-of-order answer pack_tid, next_tid, peer_tid_set), msg_id)
yield yield
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
...@@ -269,10 +268,10 @@ class StorageOperationHandler(EventHandler): ...@@ -269,10 +268,10 @@ class StorageOperationHandler(EventHandler):
% partition), msg_id) % partition), msg_id)
return return
# Same as in askFetchTransactions. # Same as in askFetchTransactions.
conn.send(Packets.AddObject(oid, *object), msg_id) # NOTE msg_id: ooo answer conn.send(Packets.AddObject(oid, *object), msg_id)
yield conn.buffering yield conn.buffering
conn.send(Packets.AnswerFetchObjects( conn.send(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id) # NOTE msg_id: out-of-order answer pack_tid, next_tid, next_oid, object_dict), msg_id)
yield yield
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
......
...@@ -57,7 +57,7 @@ transactions being committed and then it is expected to fully receive from the ...@@ -57,7 +57,7 @@ transactions being committed and then it is expected to fully receive from the
client any transaction that is started after this answer. client any transaction that is started after this answer.
Which has in turn other consequences: Which has in turn other consequences:
- The client must not fail to write to a storage node after the above request <-- FIXME bad ? - The client must not fail to write to a storage node after the above request <-- FIXME bad -> proper consensus protocol
to the master: for this, the storage must have announced it is ready, and it to the master: for this, the storage must have announced it is ready, and it
must delay identification of unknown clients (those for which it hasn't must delay identification of unknown clients (those for which it hasn't
received yet a notification from the master). received yet a notification from the master).
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment