Commit 4222ac8a authored by Julien Muchembled's avatar Julien Muchembled

Use Connection.send instead of answer when a packet id must be reused

It becomes possible to answer with several packets:
- the last is the usual associated answer packet
- all other (previously sent) packets are notifications

Connection.send does not return the packet id anymore. This is not useful
enough, and the caller can inspect the sent packet (getId).
parent ff4242d4
...@@ -548,14 +548,12 @@ class Connection(BaseConnection): ...@@ -548,14 +548,12 @@ class Connection(BaseConnection):
self.em.addWriter(self) self.em.addWriter(self)
logging.packet(self, packet, True) logging.packet(self, packet, True)
def send(self, packet): def send(self, packet, msg_id=None):
""" Then a packet with a new ID """ """ Then a packet with a new ID """
if self.isClosed(): if self.isClosed():
raise ConnectionClosed raise ConnectionClosed
msg_id = self._getNextId() packet.setId(self._getNextId() if msg_id is None else msg_id)
packet.setId(msg_id)
self._addPacket(packet) self._addPacket(packet)
return msg_id
def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None, **kw): def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None, **kw):
""" """
...@@ -576,13 +574,11 @@ class Connection(BaseConnection): ...@@ -576,13 +574,11 @@ class Connection(BaseConnection):
self.em.wakeup() self.em.wakeup()
return msg_id return msg_id
def answer(self, packet, msg_id=None): def answer(self, packet):
""" Answer to a packet by re-using its ID for the packet answer """ """ Answer to a packet by re-using its ID for the packet answer """
if self.isClosed(): if self.isClosed():
raise ConnectionClosed raise ConnectionClosed
if msg_id is None: packet.setId(self.peer_id)
msg_id = self.getPeerId()
packet.setId(msg_id)
assert packet.isResponse(), packet assert packet.isResponse(), packet
self._addPacket(packet) self._addPacket(packet)
......
...@@ -544,12 +544,11 @@ class Application(BaseApplication): ...@@ -544,12 +544,11 @@ class Application(BaseApplication):
transaction_node = txn.getNode() transaction_node = txn.getNode()
invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList()) invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
for client_node in self.nm.getClientList(only_identified=True): for client_node in self.nm.getClientList(only_identified=True):
c = client_node.getConnection()
if client_node is transaction_node: if client_node is transaction_node:
c.answer(Packets.AnswerTransactionFinished(ttid, tid), client_node.send(Packets.AnswerTransactionFinished(ttid, tid),
msg_id=txn.getMessageId()) msg_id=txn.getMessageId())
else: else:
c.send(invalidate_objects) client_node.send(invalidate_objects)
# Unlock Information to relevant storage nodes. # Unlock Information to relevant storage nodes.
notify_unlock = Packets.NotifyUnlockInformation(ttid) notify_unlock = Packets.NotifyUnlockInformation(ttid)
......
...@@ -107,5 +107,5 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -107,5 +107,5 @@ class StorageServiceHandler(BaseServiceHandler):
if not uid_set: if not uid_set:
app.packing = None app.packing = None
if not client.isClosed(): if not client.isClosed():
client.answer(Packets.AnswerPack(True), msg_id=msg_id) client.send(Packets.AnswerPack(True), msg_id)
...@@ -156,7 +156,7 @@ class StorageOperationHandler(EventHandler): ...@@ -156,7 +156,7 @@ class StorageOperationHandler(EventHandler):
def check(): def check():
r = app.dm.checkTIDRange(*args) r = app.dm.checkTIDRange(*args)
try: try:
conn.answer(Packets.AnswerCheckTIDRange(*r), msg_id) conn.send(Packets.AnswerCheckTIDRange(*r), msg_id)
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
yield yield
...@@ -172,7 +172,7 @@ class StorageOperationHandler(EventHandler): ...@@ -172,7 +172,7 @@ class StorageOperationHandler(EventHandler):
def check(): def check():
r = app.dm.checkSerialRange(*args) r = app.dm.checkSerialRange(*args)
try: try:
conn.answer(Packets.AnswerCheckSerialRange(*r), msg_id) conn.send(Packets.AnswerCheckSerialRange(*r), msg_id)
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
pass pass
yield yield
...@@ -206,14 +206,15 @@ class StorageOperationHandler(EventHandler): ...@@ -206,14 +206,15 @@ class StorageOperationHandler(EventHandler):
else: else:
t = dm.getTransaction(tid) t = dm.getTransaction(tid)
if t is None: if t is None:
conn.answer(Errors.ReplicationError( conn.send(Errors.ReplicationError(
"partition %u dropped" % partition)) "partition %u dropped"
% partition), msg_id)
return return
oid_list, user, desc, ext, packed, ttid = t oid_list, user, desc, ext, packed, ttid = t
conn.send(Packets.AddTransaction( conn.send(Packets.AddTransaction(
tid, user, desc, ext, packed, ttid, oid_list)) tid, user, desc, ext, packed, ttid, oid_list))
yield yield
conn.answer(Packets.AnswerFetchTransactions( conn.send(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id) pack_tid, next_tid, peer_tid_set), msg_id)
yield yield
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
...@@ -250,12 +251,13 @@ class StorageOperationHandler(EventHandler): ...@@ -250,12 +251,13 @@ class StorageOperationHandler(EventHandler):
continue continue
object = dm.getObject(oid, serial) object = dm.getObject(oid, serial)
if not object: if not object:
conn.answer(Errors.ReplicationError( conn.send(Errors.ReplicationError(
"partition %u dropped or truncated" % partition)) "partition %u dropped or truncated"
% partition), msg_id)
return return
conn.send(Packets.AddObject(oid, serial, *object[2:])) conn.send(Packets.AddObject(oid, serial, *object[2:]))
yield yield
conn.answer(Packets.AnswerFetchObjects( conn.send(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id) pack_tid, next_tid, next_oid, object_dict), msg_id)
yield yield
except (weakref.ReferenceError, ConnectionClosed): except (weakref.ReferenceError, ConnectionClosed):
......
...@@ -71,7 +71,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -71,7 +71,7 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
self.checkNoPacketSent(client_conn) self.checkNoPacketSent(client_conn)
self.assertEqual(self.app.packing[2], {conn2.getUUID()}) self.assertEqual(self.app.packing[2], {conn2.getUUID()})
self.service.answerPack(conn2, False) self.service.answerPack(conn2, False)
packet = self.checkAnswerPacket(client_conn, Packets.AnswerPack) packet = self.checkNotifyPacket(client_conn, Packets.AnswerPack)
# TODO: verify packet peer id # TODO: verify packet peer id
self.assertTrue(packet.decode()[0]) self.assertTrue(packet.decode()[0])
self.assertEqual(self.app.packing, None) self.assertEqual(self.app.packing, None)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment