Commit 2c0b400e authored by Kirill Smelkov's avatar Kirill Smelkov

Merge remote-tracking branch 'origin/master' into t

* origin/master:
  storage: start replicating the partition which is furthest behind
  master: fix possibly wrong knowledge of cells' backup_tid when resuming backup
  Minor comment/doc changes
parents d68e25a6 4d3f3723
...@@ -42,7 +42,6 @@ Change History ...@@ -42,7 +42,6 @@ Change History
- Plus a few other changes for debugging and developers, as well as small - Plus a few other changes for debugging and developers, as well as small
optimizations. optimizations.
1.6.3 (2016-06-15) 1.6.3 (2016-06-15)
------------------ ------------------
......
...@@ -57,9 +57,9 @@ for that partition. It means only this node will fetch data from the upstream ...@@ -57,9 +57,9 @@ for that partition. It means only this node will fetch data from the upstream
cluster, to minimize bandwidth between clusters. Other replicas will cluster, to minimize bandwidth between clusters. Other replicas will
synchronize from the primary node. synchronize from the primary node.
There is no UUID conflict between the 2 clusters: There is no conflict of node id between the 2 clusters:
- Storage nodes connect anonymously to upstream. - Storage nodes connect anonymously to upstream.
- Master node receives a new from upstream master and uses it only when - The master node gets an id from the upstream master and uses it only when
communicating with it. communicating with it.
""" """
...@@ -197,9 +197,8 @@ class BackupApplication(object): ...@@ -197,9 +197,8 @@ class BackupApplication(object):
# NOTE called by backup_app.invalidateObjects() when it has info that # NOTE called by backup_app.invalidateObjects() when it has info that
# partitions in partition_set were updated in upstream cluster (up to `tid`) # partitions in partition_set were updated in upstream cluster (up to `tid`)
def invalidatePartitions(self, tid, partition_set): def invalidatePartitions(self, tid, prev_tid, partition_set):
app = self.app app = self.app
prev_tid = app.getLastTransaction()
app.setLastTransaction(tid) app.setLastTransaction(tid)
pt = app.pt pt = app.pt
trigger_set = set() trigger_set = set()
...@@ -315,8 +314,9 @@ class BackupApplication(object): ...@@ -315,8 +314,9 @@ class BackupApplication(object):
logging.debug("partition %u: updating backup_tid of %r to %s", logging.debug("partition %u: updating backup_tid of %r to %s",
offset, cell, dump(tid)) offset, cell, dump(tid))
cell.backup_tid = tid cell.backup_tid = tid
# TODO provide invalidation feedback about new txns to read-only clients connected to backup cluster # TODO: Provide invalidation feedback about new txns to read-only
# NOTE ^^^ not only here but also hooked to in-progress feedback from fetchObjects (storage) # clients connected to backup cluster. Not only here but also
# hooked to in-progress feedback from fetchObjects (storage).
# Forget tids we won't need anymore. # Forget tids we won't need anymore.
cell_list = app.pt.getCellList(offset, readable=True) cell_list = app.pt.getCellList(offset, readable=True)
del tid_list[:bisect(tid_list, min(x.backup_tid for x in cell_list))] # XXX not only for primary Sb ? del tid_list[:bisect(tid_list, min(x.backup_tid for x in cell_list))] # XXX not only for primary Sb ?
......
...@@ -35,10 +35,18 @@ class BackupHandler(EventHandler): ...@@ -35,10 +35,18 @@ class BackupHandler(EventHandler):
# NOTE invalidation from M -> Mb (all partitions) # NOTE invalidation from M -> Mb (all partitions)
def answerLastTransaction(self, conn, tid): def answerLastTransaction(self, conn, tid):
app = self.app app = self.app
if tid != ZERO_TID: prev_tid = app.app.getLastTransaction()
app.invalidatePartitions(tid, set(xrange(app.pt.getPartitions()))) if prev_tid < tid:
else: # upstream DB is empty # Since we don't know which partitions were modified during our
assert app.app.getLastTransaction() == tid # absence, we must force replication on all storages. As long as
# they haven't done this first check, our backup tid will remain
# inferior to this 'tid'. We don't know the real prev_tid, which is:
# >= app.app.getLastTransaction()
# < tid
# but passing 'tid' is good enough.
app.invalidatePartitions(tid, tid, xrange(app.pt.getPartitions()))
elif prev_tid != tid:
raise RuntimeError("upstream DB truncated")
app.ignore_invalidations = False app.ignore_invalidations = False
# NOTE invalidation from M -> Mb # NOTE invalidation from M -> Mb
...@@ -49,4 +57,5 @@ class BackupHandler(EventHandler): ...@@ -49,4 +57,5 @@ class BackupHandler(EventHandler):
getPartition = app.app.pt.getPartition getPartition = app.app.pt.getPartition
partition_set = set(map(getPartition, oid_list)) partition_set = set(map(getPartition, oid_list))
partition_set.add(getPartition(tid)) partition_set.add(getPartition(tid))
app.invalidatePartitions(tid, partition_set) prev_tid = app.app.getLastTransaction()
app.invalidatePartitions(tid, prev_tid, partition_set)
...@@ -214,6 +214,10 @@ class Replicator(object): ...@@ -214,6 +214,10 @@ class Replicator(object):
self.updateBackupTID() self.updateBackupTID()
self._nextPartition() self._nextPartition()
def _nextPartitionSortKey(self, offset):
p = self.partition_dict[offset]
return p.next_obj, bool(p.max_ttid)
def _nextPartition(self): def _nextPartition(self):
# XXX: One connection to another storage may remain open forever. # XXX: One connection to another storage may remain open forever.
# All other previous connections are automatically closed # All other previous connections are automatically closed
...@@ -227,12 +231,12 @@ class Replicator(object): ...@@ -227,12 +231,12 @@ class Replicator(object):
if self.current_partition is not None or not self.replicate_dict: if self.current_partition is not None or not self.replicate_dict:
return return
app = self.app app = self.app
# Choose a partition with no unfinished transaction if possible. # Start replicating the partition which is furthest behind,
# to increase the overall backup_tid as soon as possible.
# Then prefer a partition with no unfinished transaction.
# XXX: When leaving backup mode, we should only consider UP_TO_DATE # XXX: When leaving backup mode, we should only consider UP_TO_DATE
# cells. # cells.
for offset in self.replicate_dict: offset = min(self.replicate_dict, key=self._nextPartitionSortKey)
if not self.partition_dict[offset].max_ttid:
break
try: try:
addr, name = self.source_dict[offset] addr, name = self.source_dict[offset]
except KeyError: except KeyError:
......
...@@ -394,6 +394,29 @@ class ReplicationTests(NEOThreadedTest): ...@@ -394,6 +394,29 @@ class ReplicationTests(NEOThreadedTest):
finally: finally:
upstream.stop() upstream.stop()
@backup_test()
def testBackupTid(self, backup):
"""
Check that the backup cluster does not claim it has all the data just
after it came back whereas new transactions were committed during its
absence.
"""
importZODB = backup.upstream.importZODB()
importZODB(1)
self.tic()
last_tid = backup.upstream.last_tid
self.assertEqual(last_tid, backup.backup_tid)
backup.stop()
importZODB(1)
backup.reset()
with ConnectionFilter() as f:
f.add(lambda conn, packet:
isinstance(packet, Packets.AskFetchTransactions))
backup.start()
self.assertEqual(last_tid, backup.backup_tid)
self.tic()
self.assertEqual(1, self.checkBackup(backup))
def testSafeTweak(self): def testSafeTweak(self):
""" """
Check that tweak always tries to keep a minimum of (replicas + 1) Check that tweak always tries to keep a minimum of (replicas + 1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment