Commit ad43dcd3 authored by Julien Muchembled's avatar Julien Muchembled

storage: discard answers from aborted replications

This fixes a bug that could to data corruption or crashes.
parent 4222ac8a
...@@ -13,22 +13,6 @@ had an up-to-date partition table (and retry if useful). ...@@ -13,22 +13,6 @@ had an up-to-date partition table (and retry if useful).
In the case of undoLog(), incomplete results may be returned. In the case of undoLog(), incomplete results may be returned.
(N) Storage does not discard answers from aborted replications
--------------------------------------------------------------
In some cases, this can lead to data corruption (wrong AnswerFetch*) or crashes
(e.g. KeyError because self.current_partition is None at the beginning of
Replicator.fetchObjects).
The assumption that aborting the replication of a partition implies the closure
of the connection turned out to be wrong, e.g. when a partition is aborted by a
third party, like CORRUPTED/DISCARDED event from the master.
Workaround: do not replicate or tweak while checking replicas.
Currently, this can be reproduced by running testBackupNodeLost
(neo.tests.threaded.testReplication.ReplicationTests) many times.
(N) A backup cell may be wrongly marked as corrupted while checking replicas (N) A backup cell may be wrongly marked as corrupted while checking replicas
---------------------------------------------------------------------------- ----------------------------------------------------------------------------
......
...@@ -23,7 +23,7 @@ from neo.lib.protocol import Errors, NodeStates, Packets, ProtocolError, \ ...@@ -23,7 +23,7 @@ from neo.lib.protocol import Errors, NodeStates, Packets, ProtocolError, \
def checkConnectionIsReplicatorConnection(func): def checkConnectionIsReplicatorConnection(func):
def wrapper(self, conn, *args, **kw): def wrapper(self, conn, *args, **kw):
if self.app.replicator.getCurrentConnection() is conn: if self.app.replicator.isReplicatingConnection(conn):
return func(self, conn, *args, **kw) return func(self, conn, *args, **kw)
return wraps(func)(wrapper) return wraps(func)(wrapper)
...@@ -211,8 +211,8 @@ class StorageOperationHandler(EventHandler): ...@@ -211,8 +211,8 @@ class StorageOperationHandler(EventHandler):
% partition), msg_id) % partition), msg_id)
return return
oid_list, user, desc, ext, packed, ttid = t oid_list, user, desc, ext, packed, ttid = t
conn.send(Packets.AddTransaction( conn.send(Packets.AddTransaction(tid, user,
tid, user, desc, ext, packed, ttid, oid_list)) desc, ext, packed, ttid, oid_list), msg_id)
yield yield
conn.send(Packets.AnswerFetchTransactions( conn.send(Packets.AnswerFetchTransactions(
pack_tid, next_tid, peer_tid_set), msg_id) pack_tid, next_tid, peer_tid_set), msg_id)
...@@ -255,7 +255,8 @@ class StorageOperationHandler(EventHandler): ...@@ -255,7 +255,8 @@ class StorageOperationHandler(EventHandler):
"partition %u dropped or truncated" "partition %u dropped or truncated"
% partition), msg_id) % partition), msg_id)
return return
conn.send(Packets.AddObject(oid, serial, *object[2:])) conn.send(Packets.AddObject(oid, serial, *object[2:]),
msg_id)
yield yield
conn.send(Packets.AnswerFetchObjects( conn.send(Packets.AnswerFetchObjects(
pack_tid, next_tid, next_oid, object_dict), msg_id) pack_tid, next_tid, next_oid, object_dict), msg_id)
......
...@@ -111,6 +111,12 @@ class Partition(object): ...@@ -111,6 +111,12 @@ class Partition(object):
class Replicator(object): class Replicator(object):
# When the replication of a partition is aborted, the connection to the
# feeding node may still be open, e.g. on PT update from the master. In
# such case, replication is also aborted on the other side but there may
# be a few incoming packets that must be discarded.
_conn_msg_id = None
current_node = None current_node = None
current_partition = None current_partition = None
...@@ -122,6 +128,10 @@ class Replicator(object): ...@@ -122,6 +128,10 @@ class Replicator(object):
if node is not None and node.isConnected(True): if node is not None and node.isConnected(True):
return node.getConnection() return node.getConnection()
def isReplicatingConnection(self, conn):
return conn is self.getCurrentConnection() and \
conn.getPeerId() == self._conn_msg_id
def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list): def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list):
"""This is a callback from MasterOperationHandler.""" """This is a callback from MasterOperationHandler."""
assert self.ttid_set.issubset(ttid_list), (self.ttid_set, ttid_list) assert self.ttid_set.issubset(ttid_list), (self.ttid_set, ttid_list)
...@@ -353,7 +363,7 @@ class Replicator(object): ...@@ -353,7 +363,7 @@ class Replicator(object):
max_tid = self.replicate_tid max_tid = self.replicate_tid
tid_list = self.app.dm.getReplicationTIDList(min_tid, max_tid, tid_list = self.app.dm.getReplicationTIDList(min_tid, max_tid,
FETCH_COUNT, offset) FETCH_COUNT, offset)
self.current_node.getConnection().ask(Packets.AskFetchTransactions( self._conn_msg_id = self.current_node.ask(Packets.AskFetchTransactions(
offset, FETCH_COUNT, min_tid, max_tid, tid_list)) offset, FETCH_COUNT, min_tid, max_tid, tid_list))
def fetchObjects(self, min_tid=None, min_oid=ZERO_OID): def fetchObjects(self, min_tid=None, min_oid=ZERO_OID):
...@@ -372,13 +382,13 @@ class Replicator(object): ...@@ -372,13 +382,13 @@ class Replicator(object):
object_dict[serial].append(oid) object_dict[serial].append(oid)
except KeyError: except KeyError:
object_dict[serial] = [oid] object_dict[serial] = [oid]
self.current_node.getConnection().ask(Packets.AskFetchObjects( self._conn_msg_id = self.current_node.ask(Packets.AskFetchObjects(
offset, FETCH_COUNT, min_tid, max_tid, min_oid, object_dict)) offset, FETCH_COUNT, min_tid, max_tid, min_oid, object_dict))
def finish(self): def finish(self):
offset = self.current_partition offset = self.current_partition
tid = self.replicate_tid tid = self.replicate_tid
del self.current_partition, self.replicate_tid del self.current_partition, self._conn_msg_id, self.replicate_tid
p = self.partition_dict[offset] p = self.partition_dict[offset]
p.next_obj = add64(tid, 1) p.next_obj = add64(tid, 1)
self.updateBackupTID() self.updateBackupTID()
...@@ -397,6 +407,7 @@ class Replicator(object): ...@@ -397,6 +407,7 @@ class Replicator(object):
if offset is None: if offset is None:
return return
del self.current_partition del self.current_partition
self._conn_msg_id = None
logging.warning('replication aborted for partition %u%s', logging.warning('replication aborted for partition %u%s',
offset, message and ' (%s)' % message) offset, message and ' (%s)' % message)
if offset in self.partition_dict: if offset in self.partition_dict:
...@@ -413,24 +424,19 @@ class Replicator(object): ...@@ -413,24 +424,19 @@ class Replicator(object):
else: # partition removed else: # partition removed
self._nextPartition() self._nextPartition()
def cancel(self):
offset = self.current_partition
if offset is not None:
logging.info('cancel replication of partition %u', offset)
del self.current_partition
try:
self.replicate_dict.setdefault(offset, self.replicate_tid)
del self.replicate_tid
except AttributeError:
pass
self.getCurrentConnection().close()
def stop(self): def stop(self):
# Close any open connection to an upstream storage, # Close any open connection to an upstream storage,
# possibly aborting current replication. # possibly aborting current replication.
node = self.current_node node = self.current_node
if node is not None is node.getUUID(): if node is not None is node.getUUID():
self.cancel() offset = self.current_partition
if offset is not None:
logging.info('cancel replication of partition %u', offset)
del self.current_partition
if self._conn_msg_id is not None:
self.replicate_dict.setdefault(offset, self.replicate_tid)
del self._conn_msg_id, self.replicate_tid
self.getCurrentConnection().close()
# Cancel all replication orders from upstream cluster. # Cancel all replication orders from upstream cluster.
for offset in self.replicate_dict.keys(): for offset in self.replicate_dict.keys():
addr, name = self.source_dict.get(offset, (None, None)) addr, name = self.source_dict.get(offset, (None, None))
......
...@@ -36,6 +36,7 @@ from neo.lib.util import p64, u64 ...@@ -36,6 +36,7 @@ from neo.lib.util import p64, u64
from .. import expectedFailure, Patch from .. import expectedFailure, Patch
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \ from . import ConnectionFilter, NEOCluster, NEOThreadedTest, \
predictable_random, with_cluster predictable_random, with_cluster
from .test import PCounter # XXX
def backup_test(partitions=1, upstream_kw={}, backup_kw={}): def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
...@@ -82,6 +83,16 @@ class ReplicationTests(NEOThreadedTest): ...@@ -82,6 +83,16 @@ class ReplicationTests(NEOThreadedTest):
checked += 1 checked += 1
return checked return checked
def checkReplicas(self, cluster):
pt = cluster.primary_master.pt
storage_dict = {x.uuid: x for x in cluster.storage_list}
for offset in xrange(pt.getPartitions()):
checksum_list = [
self.checksumPartition(storage_dict[x.getUUID()], offset)
for x in pt.getCellList(offset)]
self.assertEqual(1, len(set(checksum_list)),
(offset, checksum_list))
def testBackupNormalCase(self): def testBackupNormalCase(self):
np = 7 np = 7
nr = 2 nr = 2
...@@ -424,6 +435,30 @@ class ReplicationTests(NEOThreadedTest): ...@@ -424,6 +435,30 @@ class ReplicationTests(NEOThreadedTest):
self.tic() self.tic()
self.assertEqual('foo', storage.load(oid)[0]) self.assertEqual('foo', storage.load(oid)[0])
@with_cluster(start_cluster=False, storage_count=3, partitions=3)
def testAbortingReplication(self, cluster):
s1, s2, s3 = cluster.storage_list
cluster.start((s1, s2))
t, c = cluster.getTransaction()
r = c.root()
for x in 'ab':
r[x] = PCounter()
t.commit()
cluster.stop(replicas=1)
cluster.start((s1, s2))
with ConnectionFilter() as f:
f.delayAddObject()
cluster.neoctl.tweakPartitionTable()
s3.start()
self.tic()
cluster.neoctl.enableStorageList((s3.uuid,))
cluster.neoctl.tweakPartitionTable()
self.tic()
self.tic()
for s in cluster.storage_list:
self.assertTrue(s.is_alive())
self.checkReplicas(cluster)
@with_cluster(start_cluster=0, replicas=1) @with_cluster(start_cluster=0, replicas=1)
def testResumingReplication(self, cluster): def testResumingReplication(self, cluster):
if 1: if 1:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment