Commit ad7986b8 authored by Vincent Pelletier's avatar Vincent Pelletier

Move on to next chunk after having received row list.

Otherwise, we would check the same chunk multiple times, if last range
check was successful: at this point, we wouldn't know criterion to find
the first row after replicated chunk.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2411 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 68e483de
...@@ -118,6 +118,7 @@ class ReplicationHandler(EventHandler): ...@@ -118,6 +118,7 @@ class ReplicationHandler(EventHandler):
missing_tid_set = tid_set - my_tid_set missing_tid_set = tid_set - my_tid_set
for tid in missing_tid_set: for tid in missing_tid_set:
ask(Packets.AskTransactionInformation(tid), timeout=300) ask(Packets.AskTransactionInformation(tid), timeout=300)
ask(self._doAskCheckTIDRange(add64(tid_list[-1], 1), RANGE_LENGTH))
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerTransactionInformation(self, conn, tid, def answerTransactionInformation(self, conn, tid,
...@@ -157,6 +158,8 @@ class ReplicationHandler(EventHandler): ...@@ -157,6 +158,8 @@ class ReplicationHandler(EventHandler):
missing_serial_set = serial_list missing_serial_set = serial_list
for serial in missing_serial_set: for serial in missing_serial_set:
ask(Packets.AskObject(oid, serial, None), timeout=300) ask(Packets.AskObject(oid, serial, None), timeout=300)
ask(self._doAskCheckSerialRange(max_oid, add64(max_serial, 1),
RANGE_LENGTH))
@checkConnectionIsReplicatorConnection @checkConnectionIsReplicatorConnection
def answerObject(self, conn, oid, serial_start, def answerObject(self, conn, oid, serial_start,
...@@ -249,10 +252,7 @@ class ReplicationHandler(EventHandler): ...@@ -249,10 +252,7 @@ class ReplicationHandler(EventHandler):
if action == CHECK_REPLICATE: if action == CHECK_REPLICATE:
(min_tid, ) = params (min_tid, ) = params
ask(self._doAskTIDsFrom(min_tid, count)) ask(self._doAskTIDsFrom(min_tid, count))
if length == count: if length != count:
action = CHECK_CHUNK
params = (next_tid, RANGE_LENGTH)
else:
action = CHECK_DONE action = CHECK_DONE
if action == CHECK_CHUNK: if action == CHECK_CHUNK:
(min_tid, count) = params (min_tid, count) = params
...@@ -279,10 +279,7 @@ class ReplicationHandler(EventHandler): ...@@ -279,10 +279,7 @@ class ReplicationHandler(EventHandler):
if action == CHECK_REPLICATE: if action == CHECK_REPLICATE:
((min_oid, min_serial), ) = params ((min_oid, min_serial), ) = params
ask(self._doAskObjectHistoryFrom(min_oid, min_serial, count)) ask(self._doAskObjectHistoryFrom(min_oid, min_serial, count))
if length == count: if length != count:
action = CHECK_CHUNK
params = (next_params, RANGE_LENGTH)
else:
action = CHECK_DONE action = CHECK_DONE
if action == CHECK_CHUNK: if action == CHECK_CHUNK:
((min_oid, min_serial), count) = params ((min_oid, min_serial), count) = params
......
...@@ -111,8 +111,19 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -111,8 +111,19 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(min_tid, length, partition) calls[0].checkArgs(min_tid, length, partition)
def _checkPacketTIDList(self, conn, tid_list): def _checkPacketTIDList(self, conn, tid_list, next_tid, app):
packet_list = [x.getParam(0) for x in conn.mockGetNamedCalls('ask')] packet_list = [x.getParam(0) for x in conn.mockGetNamedCalls('ask')]
packet_list, next_range = packet_list[:-1], packet_list[-1]
self.assertEqual(next_range.getType(), Packets.AskCheckTIDRange)
pmin_tid, plength, ppartition = next_range.decode()
self.assertEqual(pmin_tid, add64(next_tid, 1))
self.assertEqual(plength, RANGE_LENGTH)
self.assertEqual(ppartition, app.replicator.getCurrentRID())
calls = app.replicator.mockGetNamedCalls('checkTIDRange')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_tid, plength, ppartition)
self.assertEqual(len(packet_list), len(tid_list)) self.assertEqual(len(packet_list), len(tid_list))
for packet in packet_list: for packet in packet_list:
self.assertEqual(packet.getType(), self.assertEqual(packet.getType(),
...@@ -126,9 +137,21 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -126,9 +137,21 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
raise AssertionFailed, '%s not found in %r' % (dump(ptid), raise AssertionFailed, '%s not found in %r' % (dump(ptid),
[dump(x) for x in tid_list]) [dump(x) for x in tid_list])
def _checkPacketSerialList(self, conn, object_list): def _checkPacketSerialList(self, conn, object_list, next_oid, next_serial, app):
packet_list = [x.getParam(0) for x in conn.mockGetNamedCalls('ask')] packet_list = [x.getParam(0) for x in conn.mockGetNamedCalls('ask')]
self.assertEqual(len(packet_list), len(object_list)) packet_list, next_range = packet_list[:-1], packet_list[-1]
self.assertEqual(next_range.getType(), Packets.AskCheckSerialRange)
pmin_oid, pmin_serial, plength, ppartition = next_range.decode()
self.assertEqual(pmin_oid, next_oid)
self.assertEqual(pmin_serial, add64(next_serial, 1))
self.assertEqual(plength, RANGE_LENGTH)
self.assertEqual(ppartition, app.replicator.getCurrentRID())
calls = app.replicator.mockGetNamedCalls('checkSerialRange')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
self.assertEqual(len(packet_list), len(object_list), packet_list)
for packet, (oid, serial) in zip(packet_list, object_list): for packet, (oid, serial) in zip(packet_list, object_list):
self.assertEqual(packet.getType(), self.assertEqual(packet.getType(),
Packets.AskObject) Packets.AskObject)
...@@ -166,14 +189,14 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -166,14 +189,14 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
app = self.getApp(conn=conn, tid_result=[]) app = self.getApp(conn=conn, tid_result=[])
# With no known TID # With no known TID
ReplicationHandler(app).answerTIDsFrom(conn, tid_list) ReplicationHandler(app).answerTIDsFrom(conn, tid_list)
self._checkPacketTIDList(conn, tid_list[:]) self._checkPacketTIDList(conn, tid_list[:], tid_list[-1], app)
# With first TID known # With first TID known
conn = self.getFakeConnection() conn = self.getFakeConnection()
known_tid_list = [tid_list[0], ] known_tid_list = [tid_list[0], ]
unknown_tid_list = [tid_list[1], ] unknown_tid_list = [tid_list[1], ]
app = self.getApp(conn=conn, tid_result=known_tid_list) app = self.getApp(conn=conn, tid_result=known_tid_list)
ReplicationHandler(app).answerTIDsFrom(conn, tid_list) ReplicationHandler(app).answerTIDsFrom(conn, tid_list)
self._checkPacketTIDList(conn, unknown_tid_list) self._checkPacketTIDList(conn, unknown_tid_list, tid_list[-1], app)
def test_answerTransactionInformation(self): def test_answerTransactionInformation(self):
conn = self.getFakeConnection() conn = self.getFakeConnection()
...@@ -209,7 +232,8 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -209,7 +232,8 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
app = self.getApp(conn=conn, history_result={}) app = self.getApp(conn=conn, history_result={})
# With no known OID/Serial # With no known OID/Serial
ReplicationHandler(app).answerObjectHistoryFrom(conn, oid_dict) ReplicationHandler(app).answerObjectHistoryFrom(conn, oid_dict)
self._checkPacketSerialList(conn, flat_oid_list) self._checkPacketSerialList(conn, flat_oid_list, oid_3, tid_list[5],
app)
# With some known OID/Serials # With some known OID/Serials
# For test to be realist, history_result should contain the same # For test to be realist, history_result should contain the same
# number of serials as oid_dict, otherise it just tests the special # number of serials as oid_dict, otherise it just tests the special
...@@ -224,7 +248,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -224,7 +248,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
self._checkPacketSerialList(conn, ( self._checkPacketSerialList(conn, (
(oid_1, oid_dict[oid_1][1]), (oid_1, oid_dict[oid_1][1]),
(oid_2, oid_dict[oid_2][0]), (oid_2, oid_dict[oid_2][0]),
)) ), oid_3, tid_list[5], app)
calls = app.dm.mockGetNamedCalls('deleteObject') calls = app.dm.mockGetNamedCalls('deleteObject')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(oid_3, tid_list[4]) calls[0].checkArgs(oid_3, tid_list[4])
...@@ -382,10 +406,8 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -382,10 +406,8 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
handler.answerCheckTIDRange(conn, min_tid, length, length, 0, max_tid) handler.answerCheckTIDRange(conn, min_tid, length, length, 0, max_tid)
# Result: ask tid list, and ask next chunk # Result: ask tid list, and ask next chunk
calls = conn.mockGetNamedCalls('ask') calls = conn.mockGetNamedCalls('ask')
self.assertEqual(len(calls), 2) self.assertEqual(len(calls), 1)
tid_call, next_call = calls tid_packet = calls[0].getParam(0)
tid_packet = tid_call.getParam(0)
next_packet = next_call.getParam(0)
self.assertEqual(tid_packet.getType(), Packets.AskTIDsFrom) self.assertEqual(tid_packet.getType(), Packets.AskTIDsFrom)
pmin_tid, pmax_tid, plength, ppartition = tid_packet.decode() pmin_tid, pmax_tid, plength, ppartition = tid_packet.decode()
self.assertEqual(pmin_tid, min_tid) self.assertEqual(pmin_tid, min_tid)
...@@ -395,14 +417,6 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -395,14 +417,6 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
calls = app.replicator.mockGetNamedCalls('getTIDsFrom') calls = app.replicator.mockGetNamedCalls('getTIDsFrom')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_tid, pmax_tid, plength, ppartition) calls[0].checkArgs(pmin_tid, pmax_tid, plength, ppartition)
self.assertEqual(next_packet.getType(), Packets.AskCheckTIDRange)
pmin_tid, plength, ppartition = next_packet.decode()
self.assertEqual(pmin_tid, add64(max_tid, 1))
self.assertEqual(plength, RANGE_LENGTH)
self.assertEqual(ppartition, rid)
calls = app.replicator.mockGetNamedCalls('checkTIDRange')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_tid, plength, ppartition)
def test_answerCheckTIDRangeDifferentSmallChunkWithoutNext(self): def test_answerCheckTIDRangeDifferentSmallChunkWithoutNext(self):
min_tid = self.getNextTID() min_tid = self.getNextTID()
...@@ -420,9 +434,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -420,9 +434,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
# Result: ask tid list, and start replicating object range # Result: ask tid list, and start replicating object range
calls = conn.mockGetNamedCalls('ask') calls = conn.mockGetNamedCalls('ask')
self.assertEqual(len(calls), 2) self.assertEqual(len(calls), 2)
tid_call, next_call = calls tid_packet = calls[0].getParam(0)
tid_packet = tid_call.getParam(0)
next_packet = next_call.getParam(0)
self.assertEqual(tid_packet.getType(), Packets.AskTIDsFrom) self.assertEqual(tid_packet.getType(), Packets.AskTIDsFrom)
pmin_tid, pmax_tid, plength, ppartition = tid_packet.decode() pmin_tid, pmax_tid, plength, ppartition = tid_packet.decode()
self.assertEqual(pmin_tid, min_tid) self.assertEqual(pmin_tid, min_tid)
...@@ -432,15 +444,6 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -432,15 +444,6 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
calls = app.replicator.mockGetNamedCalls('getTIDsFrom') calls = app.replicator.mockGetNamedCalls('getTIDsFrom')
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_tid, pmax_tid, plength, ppartition) calls[0].checkArgs(pmin_tid, pmax_tid, plength, ppartition)
self.assertEqual(next_packet.getType(), Packets.AskCheckSerialRange)
pmin_oid, pmin_serial, plength, ppartition = next_packet.decode()
self.assertEqual(pmin_oid, ZERO_OID)
self.assertEqual(pmin_serial, ZERO_TID)
self.assertEqual(plength, RANGE_LENGTH)
self.assertEqual(ppartition, rid)
calls = app.replicator.mockGetNamedCalls('checkSerialRange')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
# CheckSerialRange # CheckSerialRange
def test_answerCheckSerialFullRangeIdenticalChunkWithNext(self): def test_answerCheckSerialFullRangeIdenticalChunkWithNext(self):
...@@ -553,10 +556,8 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -553,10 +556,8 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
length, 0, max_oid, 1, max_serial) length, 0, max_oid, 1, max_serial)
# Result: ask serial list, and ask next chunk # Result: ask serial list, and ask next chunk
calls = conn.mockGetNamedCalls('ask') calls = conn.mockGetNamedCalls('ask')
self.assertEqual(len(calls), 2) self.assertEqual(len(calls), 1)
serial_call, next_call = calls serial_packet = calls[0].getParam(0)
serial_packet = serial_call.getParam(0)
next_packet = next_call.getParam(0)
self.assertEqual(serial_packet.getType(), Packets.AskObjectHistoryFrom) self.assertEqual(serial_packet.getType(), Packets.AskObjectHistoryFrom)
pmin_oid, pmin_serial, pmax_serial, plength, ppartition = \ pmin_oid, pmin_serial, pmax_serial, plength, ppartition = \
serial_packet.decode() serial_packet.decode()
...@@ -569,15 +570,6 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -569,15 +570,6 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
self.assertEqual(len(calls), 1) self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_oid, pmin_serial, pmax_serial, plength, calls[0].checkArgs(pmin_oid, pmin_serial, pmax_serial, plength,
ppartition) ppartition)
self.assertEqual(next_packet.getType(), Packets.AskCheckSerialRange)
pmin_oid, pmin_serial, plength, ppartition = next_packet.decode()
self.assertEqual(pmin_oid, max_oid)
self.assertEqual(pmin_serial, add64(max_serial, 1))
self.assertEqual(plength, RANGE_LENGTH)
self.assertEqual(ppartition, rid)
calls = app.replicator.mockGetNamedCalls('checkSerialRange')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
def test_answerCheckSerialRangeDifferentSmallChunkWithoutNext(self): def test_answerCheckSerialRangeDifferentSmallChunkWithoutNext(self):
min_oid = self.getOID(1) min_oid = self.getOID(1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment