Commit c1b8ffda authored by Kirill Smelkov's avatar Kirill Smelkov

Merge tag 'v1.9' into t

NEO 1.9

* tag 'v1.9':
  Release version 1.9
  master: fix resumption of backup replication (internal or not)
  master: fix/simplify generation of TID
  master: fix possible failure when reading data in a backup cluster with replicas
parents 23894124 1b57a7ae
Change History
==============
1.9 (2018-03-13)
----------------
A lot of performance improvements have been done on storage nodes for this
release, and some of them required changes in the storage format. In short,
the migration is done automatically, but you may want to read UPGRADE notes
for more details.
Performance:
- Speed up replication by sending bigger network packets,
and by not getting object next_serial for nothing.
- Speed up reads by indexing 'obj' primarily by 'oid' (instead of 'tid').
- Optimize storage layout of raw data for replication.
Other storage changes:
- Disable data deduplication by default. --dedup option added.
- importer: do not crash if a backup cluster tries to replicate.
- importer: preserve 'packed' flag.
Master:
- Fix possible failure when reading data in a backup cluster with replicas.
- Fix generation of TID.
- Fix resumption of backup replication (internal or not).
Client:
- New 'cache-size' Storage option.
- Cache hit/miss statistics.
- Fix accounting of cache size.
- Preserve 'packed' flag on iteration.
- At startup, or after nodes are back, full load balancing could be prevented
until some data are written.
Other:
- neolog: --from option now also tries to parse with `dateutil`_.
- neolog: add support for xz-compressed logs, using external xzcat commands.
.. _dateutil: https://dateutil.readthedocs.io/
1.8.1 (2017-11-07)
------------------
......
NEO 1.9
=======
The `obj` table in MySQL/SQLite backends is automatically upgraded at startup.
Note however that rewritting this table can take a lot of time if you have a
lot of object records.
This migration is reversible but you'd have to dig into the migration code
(``DatabaseManager._migrate2()``) and do manually the opposite SQL queries.
The change in the `data` table is such that no migration is done (like for NEO
1.4, it's optional and it would cost too much). For optimal performance, you
may prefer to set up new nodes (and drop the old ones once the replication
is finished).
Because there's no change in the protocol, it's actually possible to mix nodes
with versions 1.8.1 and 1.9, and avoid any downtime because of the migration.
NEO 1.6
=======
......
......@@ -209,12 +209,21 @@ class BackupApplication(object):
except IndexError:
last_max_tid = prev_tid
if offset in partition_set:
self.tid_list[offset].append(tid) # XXX check tid is ↑
primary_list = []
node_list = []
for cell in pt.getCellList(offset, readable=True):
cell_list = pt.getCellList(offset, readable=True)
for cell in cell_list:
node = cell.getNode()
assert node.isConnected(), node
if cell.backup_tid == prev_tid:
if prev_tid == tid:
# Connecting to upstream: any node is that is
# up-to-date wrt upstream is candidate for being
# primary.
assert self.ignore_invalidations
if app.isStorageReady(node.getUUID()):
primary_list.append(node)
continue
# Let's given 4 TID t0,t1,t2,t3: if a cell is only
# modified by t0 & t3 and has all data for t0, 4 values
# are possible for its 'backup_tid' until it replicates
......@@ -234,12 +243,19 @@ class BackupApplication(object):
cell.backup_tid, last_max_tid, prev_tid, tid)
if app.isStorageReady(node.getUUID()):
node_list.append(node)
assert node_list
trigger_set.update(node_list)
# Make sure we have a primary storage for this partition.
if offset not in self.primary_partition_dict:
self.primary_partition_dict[offset] = \
random.choice(node_list)
random.choice(primary_list or node_list)
if node_list:
self.tid_list[offset].append(tid) # XXX check tid is ↑
if primary_list:
# Resume replication to secondary cells.
self._triggerSecondary(
self.primary_partition_dict[offset],
offset, tid, cell_list)
else:
trigger_set.update(node_list)
else:
# Partition not touched, so increase 'backup_tid' of all
# "up-to-date" replicas, without having to replicate.
......@@ -339,15 +355,18 @@ class BackupApplication(object):
if app.getClusterState() == ClusterStates.BACKINGUP:
self.triggerBackup(node)
if primary:
# Notify secondary storages that they can replicate from
# primary ones, even if they are already replicating.
p = Packets.Replicate(tid, '', {offset: node.getAddress()})
for cell in cell_list:
if max(cell.backup_tid, cell.replicating) < tid:
cell.replicating = tid
logging.debug(
"ask %s to replicate partition %u up to %s from %s",
uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID()))
cell.getNode().send(p)
self._triggerSecondary(node, offset, tid, cell_list)
return result
def _triggerSecondary(self, node, offset, tid, cell_list):
# Notify secondary storages that they can replicate from
# primary ones, even if they are already replicating.
p = Packets.Replicate(tid, '', {offset: node.getAddress()})
for cell in cell_list:
if max(cell.backup_tid, cell.replicating) < tid:
cell.replicating = tid
logging.debug(
"ask %s to replicate partition %u up to %s from %s",
uuid_str(cell.getUUID()), offset,
dump(tid), uuid_str(node.getUUID()))
cell.getNode().send(p)
......@@ -36,7 +36,7 @@ class BackupHandler(EventHandler):
def answerLastTransaction(self, conn, tid):
app = self.app
prev_tid = app.app.getLastTransaction()
if prev_tid < tid:
if prev_tid <= tid:
# Since we don't know which partitions were modified during our
# absence, we must force replication on all storages. As long as
# they haven't done this first check, our backup tid will remain
......@@ -44,8 +44,12 @@ class BackupHandler(EventHandler):
# >= app.app.getLastTransaction()
# < tid
# but passing 'tid' is good enough.
# A special case is when prev_tid == tid: even in this case, we
# must restore the state of the backup app so that any interrupted
# replication (internal or not) is resumed, otherwise the global
# backup_tid could remain stuck to an old tid if upstream is idle.
app.invalidatePartitions(tid, tid, xrange(app.pt.getPartitions()))
elif prev_tid != tid:
else:
raise RuntimeError("upstream DB truncated")
app.ignore_invalidations = False
......
......@@ -150,5 +150,5 @@ class ClientReadOnlyServiceHandler(ClientServiceHandler):
# like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn):
assert self.app.backup_tid is not None # we are in BACKUPING mode
backup_tid = self.app.pt.getBackupTid()
backup_tid = self.app.pt.getBackupTid(min)
conn.answer(Packets.AnswerLastTransaction(backup_tid))
......@@ -234,29 +234,16 @@ class TransactionManager(EventQueue):
tid = tidFromTime(time())
min_tid = self._last_tid
if tid <= min_tid:
tid = addTID(min_tid, 1)
# We know we won't have room to adjust by decreasing.
try_decrease = False
else:
try_decrease = True
tid = addTID(min_tid, 1)
if ttid is not None:
assert isinstance(ttid, basestring), repr(ttid)
assert isinstance(divisor, (int, long)), repr(divisor)
ref_remainder = u64(ttid) % divisor
remainder = u64(tid) % divisor
if ref_remainder != remainder:
if try_decrease:
new_tid = addTID(tid, ref_remainder - divisor - remainder)
assert u64(new_tid) % divisor == ref_remainder, (dump(new_tid),
ref_remainder)
if new_tid <= min_tid:
new_tid = addTID(new_tid, divisor)
else:
if ref_remainder > remainder:
ref_remainder += divisor
new_tid = addTID(tid, ref_remainder - remainder)
assert min_tid < new_tid, (dump(min_tid), dump(tid), dump(new_tid))
tid = new_tid
remainder = u64(ttid) % divisor
delta_remainder = remainder - u64(tid) % divisor
if delta_remainder:
tid = addTID(tid, delta_remainder)
if tid <= min_tid:
tid = addTID(tid, divisor)
assert u64(tid) % divisor == remainder, (dump(tid), remainder)
assert min_tid < tid, (dump(min_tid), dump(tid))
self._last_tid = tid
return self._last_tid
......
......@@ -147,6 +147,7 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.RUNNING)
# Restart and switch to BACKINGUP mode again.
backup.stop()
# U -> B propagation with Mb -> Sb' (secondary, Replicate from primary Sb) delayed
......@@ -158,6 +159,9 @@ class ReplicationTests(NEOThreadedTest):
u_last_tid0 = upstream.last_tid
self.assertEqual(backup.backup_tid, u_last_tid0)
self.assertEqual(backup.last_tid, u_last_tid0)
# Leave BACKINGUP mode when 1 replica is late. The cluster
# remains in STOPPING_BACKUP state until it catches up.
with backup.master.filterConnection(*backup.storage_list) as f:
f.add(delaySecondary)
while not f.filtered_count:
......@@ -182,6 +186,9 @@ class ReplicationTests(NEOThreadedTest):
max_tid=backup.last_tid))
self.assertEqual(np*nr, self.checkBackup(backup, max_tid=backup.last_tid))
# Again but leave BACKINGUP mode when a storage node is
# receiving data from the upstream cluster.
backup.stop()
# S -> Sb, Sb -> Sb' (AddObject) delayed
......@@ -208,6 +215,48 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(backup.last_tid, u_last_tid1) # truncated after recovery
self.assertEqual(np*nr, self.checkBackup(backup, max_tid=backup.last_tid))
storage = upstream.getZODBStorage()
# Check that replication from upstream is resumed even if
# upstream is idle.
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
x = backup.master.backup_app.primary_partition_dict
new_oid_storage = x[0]
with upstream.moduloTID(next(p for p, n in x.iteritems()
if n is not new_oid_storage)), \
ConnectionFilter() as f:
f.delayAddObject()
# Transaction that touches 2 primary cells on 2 different
# nodes.
txn = transaction.Transaction()
tid = storage.load(ZERO_OID)[1]
storage.tpc_begin(txn)
storage.store(ZERO_OID, tid, '', '', txn)
storage.tpc_vote(txn)
storage.tpc_finish(txn)
self.tic()
# Stop when exactly 1 of the 2 cells is synced with
# upstream.
backup.stop()
backup.start()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
# Check that replication to secondary cells is resumed even if
# upstream is idle.
with backup.master.filterConnection(*backup.storage_list) as f:
f.add(delaySecondary)
txn = transaction.Transaction()
storage.tpc_begin(txn)
storage.tpc_finish(txn)
self.tic()
backup.stop()
backup.start()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.last_tid))
@predictable_random()
def testBackupNodeLost(self):
"""Check backup cluster can recover after random connection loss
......
......@@ -60,7 +60,7 @@ else:
setup(
name = 'neoppod',
version = '1.8.1',
version = '1.9',
description = __doc__.strip(),
author = 'Nexedi SA',
author_email = 'neo-dev@erp5.org',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment