Commit 8668c909 authored by Vincent Pelletier's avatar Vincent Pelletier

Fix case where last replicated chunk of a partition is empty.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2466 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 28e5f7a8
...@@ -115,10 +115,13 @@ class ReplicationHandler(EventHandler): ...@@ -115,10 +115,13 @@ class ReplicationHandler(EventHandler):
deleteTransaction = app.dm.deleteTransaction deleteTransaction = app.dm.deleteTransaction
for tid in extra_tid_set: for tid in extra_tid_set:
deleteTransaction(tid) deleteTransaction(tid)
missing_tid_set = tid_set - my_tid_set if tid_list:
for tid in missing_tid_set: missing_tid_set = tid_set - my_tid_set
ask(Packets.AskTransactionInformation(tid), timeout=300) for tid in missing_tid_set:
ask(self._doAskCheckTIDRange(add64(tid_list[-1], 1), RANGE_LENGTH)) ask(Packets.AskTransactionInformation(tid), timeout=300)
ask(self._doAskCheckTIDRange(add64(tid_list[-1], 1), RANGE_LENGTH))
else:
ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID))
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerTransactionInformation(self, conn, tid, def answerTransactionInformation(self, conn, tid,
...@@ -135,30 +138,36 @@ class ReplicationHandler(EventHandler): ...@@ -135,30 +138,36 @@ class ReplicationHandler(EventHandler):
deleteObject = app.dm.deleteObject deleteObject = app.dm.deleteObject
my_object_dict = app.replicator.getObjectHistoryFromResult() my_object_dict = app.replicator.getObjectHistoryFromResult()
object_set = set() object_set = set()
max_oid = max(object_dict.iterkeys()) if object_dict:
max_serial = max(object_dict[max_oid]) max_oid = max(object_dict.iterkeys())
for oid, serial_list in object_dict.iteritems(): max_serial = max(object_dict[max_oid])
for serial in serial_list: for oid, serial_list in object_dict.iteritems():
object_set.add((oid, serial)) for serial in serial_list:
object_set.add((oid, serial))
else:
max_oid = None
my_object_set = set() my_object_set = set()
for oid, serial_list in my_object_dict.iteritems(): for oid, serial_list in my_object_dict.iteritems():
if oid > max_oid: filter = lambda x: True
continue if max_oid is not None:
elif oid == max_oid: if oid > max_oid:
filter = lambda x: x <= max_serial continue
else: elif oid == max_oid:
filter = lambda x: True filter = lambda x: x <= max_serial
for serial in serial_list: for serial in serial_list:
if filter(serial): if filter(serial):
my_object_set.add((oid, serial)) my_object_set.add((oid, serial))
extra_object_set = my_object_set - object_set extra_object_set = my_object_set - object_set
for oid, serial in extra_object_set: for oid, serial in extra_object_set:
deleteObject(oid, serial) deleteObject(oid, serial)
missing_object_set = object_set - my_object_set if object_dict:
for oid, serial in missing_object_set: missing_object_set = object_set - my_object_set
ask(Packets.AskObject(oid, serial, None), timeout=300) for oid, serial in missing_object_set:
ask(self._doAskCheckSerialRange(max_oid, add64(max_serial, 1), ask(Packets.AskObject(oid, serial, None), timeout=300)
RANGE_LENGTH)) ask(self._doAskCheckSerialRange(max_oid, add64(max_serial, 1),
RANGE_LENGTH))
else:
self.app.replicator.setReplicationDone()
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerObject(self, conn, oid, serial_start, def answerObject(self, conn, oid, serial_start,
......
...@@ -206,6 +206,13 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -206,6 +206,13 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
calls = app.dm.mockGetNamedCalls('deleteTransaction') calls = app.dm.mockGetNamedCalls('deleteTransaction')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(tid_list[0]) calls[0].checkArgs(tid_list[0])
# Peer has no transaction above requested min, go on with object
# replication after deleting local transactions
conn = self.getFakeConnection()
known_tid_list = [tid_list[0], ]
app = self.getApp(conn=conn, tid_result=known_tid_list)
ReplicationHandler(app).answerTIDsFrom(conn, [])
self.checkAskPacket(conn, Packets.AskCheckSerialRange)
def test_answerTransactionInformation(self): def test_answerTransactionInformation(self):
conn = self.getFakeConnection() conn = self.getFakeConnection()
...@@ -268,6 +275,22 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -268,6 +275,22 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
(oid_4, tid_list[4]), (oid_4, tid_list[4]),
)) ))
self.assertEqual(actual_deletes, expected_deletes) self.assertEqual(actual_deletes, expected_deletes)
# Peer has no object above requested min, replication is over for this
# transaction once we deleted local content.
oid_dict = FakeDict(())
conn = self.getFakeConnection()
app = self.getApp(conn=conn, history_result={
oid_1: [tid_list[2]],
})
ReplicationHandler(app).answerObjectHistoryFrom(conn, oid_dict)
calls = app.dm.mockGetNamedCalls('deleteObject')
actual_deletes = set(((x.getParam(0), x.getParam(1)) for x in calls))
expected_deletes = set((
(oid_1, tid_list[2]),
))
self.assertEqual(actual_deletes, expected_deletes)
calls = app.replicator.mockGetNamedCalls('setReplicationDone')
self.assertEqual(len(calls), 1)
def test_answerObject(self): def test_answerObject(self):
conn = self.getFakeConnection() conn = self.getFakeConnection()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment