Commit f4dd4bab authored by Julien Muchembled's avatar Julien Muchembled

storage: optimize storage layout of raw data for replication

# Previous status

The issue was that we had extreme storage fragmentation from the point of view
of the replication algorithm, which processes one partition at a time.

By using an autoincrement for the 'data' table, rows were ordered by the time
at which they were added:
- parts may be the result of replication -> ordered by partition, tid, oid
- other rows are globally sorted by tid

Which means that when scanning a given partition, many rows were skipped all
the time:
- if readahead is bigger enough, the efficiency is 1/N for a node with N
  partitions assigned
- else, it is worse because it seeks all the time

For huge databases, the replication was horribly slow, in particular from HDD.

# Chosen solution

This commit changes how ids are generated to somehow split 'data'
per partition. The backend tracks 1 last id per assigned partition, where the
16 higher bits contains the partition. Keep in mind that the value of id has no
meaning and it's only chosen for performance reasons. IOW, a row can be
referred by an oid of a partition different than the 16 higher bits of id:
- there's no migration needed and the 16 higher bits of all existing rows are 0
- in case of deduplication, a row can still be shared by different partitions

Due to https://jira.mariadb.org/browse/MDEV-12836, we leave the autoincrement
on existing databases.

## Downsides

On insertion, increasing the number of partitions now slows down significantly:
for 2 nodes using TokuDB, 4% for 180 partitions, 40% for 2000. For 12
partitions, the difference remains negligible. The solution for this issue will
be to enable to increase the number of partitions efficiently, so that nodes
can keep a small number of them, even for DB that are expected to grow so much
that many nodes are added over time: such feature was already considered so
that users don't have to worry anymore about this obscure setting at database
creation.

Read performance is only slowed down for applications that read a lot of data
that were written contiguously, but split in small blocks. A solution is to
extend ZODB so that the application tells it to chose new oids that will end up
in the same partition. Like for insertion, there should not be too many
partitions.

With RocksDB (MariaDB 10.2.10), it takes a significant amount of time to
collect all last ids at startup when there are many partitions.

## Other advantages

- The storage layout of data is now always the same and does not depend on
  whether rows came from replication or commits.
- Efficient deletion of partition to free space in-place will be possible.

# Considered alternative

The only serious alternative was to replicate as many partitions as possible at
the same time, ideally all assigned partitions, but it's not always possible.
For best performance, it would often require to synchronize new nodes, or even
all of them, so that thesource nodes don't have to scan 'data' several times.

If existing nodes are kept, all data that aren't copied to the newly added
nodes have to be skipped. If the number of nodes is multiplied by N, the
efficiency is 1-1/N at best (synchronized nodes), else it's even worse
because partitions are somehow shuffled.

Checking/replacing a single node would remain slow when there are several
source nodes.

At last, such an algorithm would be much more complex and we would not have the
other advantages listed above.
parent 7b497b8e
......@@ -406,7 +406,7 @@ class ImporterDatabaseManager(DatabaseManager):
if compression:
data = compressed_data
checksum = util.makeChecksum(data)
data_id = self.holdData(util.makeChecksum(data), data,
data_id = self.holdData(util.makeChecksum(data), oid, data,
compression)
data_id_list.append(data_id)
object_list.append((oid, data_id, data_tid))
......
......@@ -488,7 +488,11 @@ class DatabaseManager(object):
existing data is first thrown away.
"""
@requires(_changePartitionTable)
def _getDataLastId(self, partition):
"""
"""
@requires(_changePartitionTable, _getDataLastId)
def changePartitionTable(self, ptid, cell_list, reset=False):
readable_set = self._readable_set
if reset:
......@@ -503,6 +507,10 @@ class DatabaseManager(object):
raise NonReadableCell
self._getPartition = _getPartition
self._getReadablePartition = _getReadablePartition
d = self._data_last_ids = []
for p in xrange(np):
i = self._getDataLastId(p)
d.append(p << 48 if i is None else i + 1)
me = self.getUUID()
for offset, nid, state in cell_list:
if nid == me:
......@@ -570,7 +578,7 @@ class DatabaseManager(object):
"""
@abstract
def storeData(self, checksum, data, compression):
def storeData(self, checksum, oid, data, compression):
"""To be overridden by the backend to store object raw data
If same data was already stored, the storage only has to check there's
......
......@@ -249,7 +249,7 @@ class MySQLDatabaseManager(DatabaseManager):
# We'd like to have partial index on 'hash' column (e.g. hash(4))
# but 'UNIQUE' constraint would not work as expected.
schema_dict['data'] = """CREATE TABLE %%s (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
id BIGINT UNSIGNED NOT NULL PRIMARY KEY,
hash BINARY(20) NOT NULL,
compression TINYINT UNSIGNED NULL,
value MEDIUMBLOB NOT NULL%s
......@@ -363,6 +363,10 @@ class MySQLDatabaseManager(DatabaseManager):
" WHERE `partition`=%s", offset_list)
return trans, obj, None if oid is None else p64(oid)
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
% (partition << 48, (partition + 1) << 48))[0][0]
def _getUnfinishedTIDDict(self):
q = self.query
return q("SELECT ttid, tid FROM ttrans"), (ttid
......@@ -454,8 +458,7 @@ class MySQLDatabaseManager(DatabaseManager):
where = " WHERE `partition`=%d" % partition
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)"
+ where)
if x]
"%s AND data_id IS NOT NULL" % where)]
if not self._use_partition:
q("DELETE FROM obj" + where)
q("DELETE FROM trans" + where)
......@@ -471,7 +474,8 @@ class MySQLDatabaseManager(DatabaseManager):
raise
def _getUnfinishedDataIdList(self):
return [x for x, in self.query("SELECT data_id FROM tobj") if x]
return [x for x, in self.query(
"SELECT data_id FROM tobj WHERE data_id IS NOT NULL")]
def dropPartitionsTemporary(self, offset_list=None):
where = "" if offset_list is None else \
......@@ -507,7 +511,9 @@ class MySQLDatabaseManager(DatabaseManager):
else:
value_serial = 'NULL'
value = "(%s,%s,%s,%s,%s)," % (
partition, oid, tid, data_id or 'NULL', value_serial)
partition, oid, tid,
'NULL' if data_id is None else data_id,
value_serial)
values_size += len(value)
# actually: max_values < values_size + EXTRA - len(final comma)
# (test_max_allowed_packet checks that EXTRA == 2)
......@@ -566,7 +572,7 @@ class MySQLDatabaseManager(DatabaseManager):
for i in xrange(bigdata_id,
bigdata_id + (length + 0x7fffff >> 23)))
def storeData(self, checksum, data, compression, _pack=_structLL.pack):
def storeData(self, checksum, oid, data, compression, _pack=_structLL.pack):
e = self.escape
checksum = e(checksum)
if 0x1000000 <= len(data): # 16M (MEDIUMBLOB limit)
......@@ -593,9 +599,11 @@ class MySQLDatabaseManager(DatabaseManager):
i = bigdata_id = self.conn.insert_id()
i += 1
data = _pack(bigdata_id, length)
p = self._getPartition(util.u64(oid))
r = self._data_last_ids[p]
try:
self.query("INSERT INTO data VALUES (NULL, '%s', %d, '%s')" %
(checksum, compression, e(data)))
self.query("INSERT INTO data VALUES (%s, '%s', %d, '%s')" %
(r, checksum, compression, e(data)))
except IntegrityError as e:
if e.args[0] == DUP_ENTRY:
(r, d), = self.query("SELECT id, value FROM data"
......@@ -604,7 +612,8 @@ class MySQLDatabaseManager(DatabaseManager):
if d == data:
return r
raise
return self.conn.insert_id()
self._data_last_ids[p] = r + 1
return r
def loadData(self, data_id):
compression, hash, value = self.query(
......@@ -643,7 +652,8 @@ class MySQLDatabaseManager(DatabaseManager):
u64 = util.u64
tid = u64(tid)
sql = " FROM tobj WHERE tid=%d" % u64(ttid)
data_id_list = [x for x, in q("SELECT data_id" + sql) if x]
data_id_list = [x for x, in q("SELECT data_id%s AND data_id IS NOT NULL"
% sql)]
q("INSERT INTO obj SELECT `partition`, oid, %d, data_id, value_tid %s"
% (tid, sql))
q("DELETE" + sql)
......@@ -670,7 +680,8 @@ class MySQLDatabaseManager(DatabaseManager):
if serial:
sql += ' AND tid=%d' % u64(serial)
q = self.query
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql)]
q("DELETE" + sql)
self._pruneData(data_id_list)
......@@ -683,7 +694,8 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query
q("DELETE FROM trans" + sql)
sql = " FROM obj" + sql
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql) if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql)]
q("DELETE" + sql)
self._pruneData(data_id_list)
......
......@@ -286,6 +286,10 @@ class SQLiteDatabaseManager(DatabaseManager):
" WHERE nid=? AND rid=partition", args).next()[0]
return trans, obj, None if oid is None else p64(oid)
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
% (partition << 48, (partition + 1) << 48)).fetchone()[0]
def _getUnfinishedTIDDict(self):
q = self.query
return q("SELECT ttid, tid FROM ttrans"), (ttid
......@@ -358,14 +362,16 @@ class SQLiteDatabaseManager(DatabaseManager):
q = self.query
for partition in offset_list:
args = partition,
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj" + where, args) if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id FROM obj%s AND data_id IS NOT NULL"
% where, args)]
q("DELETE FROM obj" + where, args)
q("DELETE FROM trans" + where, args)
self._pruneData(data_id_list)
def _getUnfinishedDataIdList(self):
return [x for x, in self.query("SELECT data_id FROM tobj") if x]
return [x for x, in self.query(
"SELECT data_id FROM tobj WHERE data_id IS NOT NULL")]
def dropPartitionsTemporary(self, offset_list=None):
where = "" if offset_list is None else \
......@@ -428,12 +434,14 @@ class SQLiteDatabaseManager(DatabaseManager):
return len(data_id_list)
return 0
def storeData(self, checksum, data, compression,
def storeData(self, checksum, oid, data, compression,
_dup=unique_constraint_message("data", "hash", "compression")):
H = buffer(checksum)
p = self._getPartition(util.u64(oid))
r = self._data_last_ids[p]
try:
return self.query("INSERT INTO data VALUES (NULL,?,?,?)",
(H, compression, buffer(data))).lastrowid
self.query("INSERT INTO data VALUES (?,?,?,?)",
(r, H, compression, buffer(data)))
except sqlite3.IntegrityError, e:
if e.args[0] == _dup:
(r, d), = self.query("SELECT id, value FROM data"
......@@ -442,10 +450,12 @@ class SQLiteDatabaseManager(DatabaseManager):
if str(d) == data:
return r
raise
self._data_last_ids[p] = r + 1
return r
def loadData(self, data_id):
return self.query("SELECT compression, hash, value"
" FROM data where id=?", (data_id,)).fetchone()
" FROM data WHERE id=?", (data_id,)).fetchone()
def _getDataTID(self, oid, tid=None, before_tid=None):
partition = self._getReadablePartition(oid)
......@@ -474,7 +484,8 @@ class SQLiteDatabaseManager(DatabaseManager):
tid = u64(tid)
ttid = u64(ttid)
sql = " FROM tobj WHERE tid=?"
data_id_list = [x for x, in q("SELECT data_id" + sql, (ttid,)) if x]
data_id_list = [x for x, in q("SELECT data_id%s AND data_id IS NOT NULL"
% sql, (ttid,))]
q("INSERT INTO obj SELECT partition, oid, ?, data_id, value_tid" + sql,
(tid, ttid))
q("DELETE" + sql, (ttid,))
......@@ -501,8 +512,8 @@ class SQLiteDatabaseManager(DatabaseManager):
sql += " AND tid=?"
args.append(util.u64(serial))
q = self.query
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args)
if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql, args)]
q("DELETE" + sql, args)
self._pruneData(data_id_list)
......@@ -518,8 +529,8 @@ class SQLiteDatabaseManager(DatabaseManager):
q = self.query
q("DELETE FROM trans" + sql, args)
sql = " FROM obj" + sql
data_id_list = [x for x, in q("SELECT DISTINCT data_id" + sql, args)
if x]
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql, args)]
q("DELETE" + sql, args)
self._pruneData(data_id_list)
......
......@@ -113,7 +113,7 @@ class StorageOperationHandler(EventHandler):
checksum, data, data_serial):
dm = self.app.dm
if data or checksum != ZERO_HASH:
data_id = dm.storeData(checksum, data, compression)
data_id = dm.storeData(checksum, oid, data, compression)
else:
data_id = None
# Directly store the transaction.
......
......@@ -470,7 +470,7 @@ class TransactionManager(EventQueue):
if data is None:
data_id = None
else:
data_id = self._app.dm.holdData(checksum, data, compression)
data_id = self._app.dm.holdData(checksum, oid, data, compression)
transaction.store(oid, data_id, value_serial)
def rebaseObject(self, ttid, oid):
......
......@@ -104,6 +104,7 @@ class StorageDBTests(NeoUnitTestBase):
def test_getPartitionTable(self):
db = self.getDB()
db.setNumPartitions(3)
uuid1, uuid2 = self.getStorageUUID(), self.getStorageUUID()
cell1 = (0, uuid1, CellStates.OUT_OF_DATE)
cell2 = (1, uuid1, CellStates.UP_TO_DATE)
......@@ -124,7 +125,7 @@ class StorageDBTests(NeoUnitTestBase):
self._last_ttid = ttid = add64(self._last_ttid, 1)
transaction = oid_list, 'user', 'desc', 'ext', False, ttid
H = "0" * 20
object_list = [(oid, self.db.holdData(H, '', 1), None)
object_list = [(oid, self.db.holdData(H, oid, '', 1), None)
for oid in oid_list]
return (transaction, object_list)
......@@ -203,6 +204,7 @@ class StorageDBTests(NeoUnitTestBase):
def test_setPartitionTable(self):
db = self.getDB()
db.setNumPartitions(3)
ptid = 1
uuid = self.getStorageUUID()
cell1 = 0, uuid, CellStates.OUT_OF_DATE
......@@ -452,8 +454,8 @@ class StorageDBTests(NeoUnitTestBase):
tid4 = self.getNextTID()
tid5 = self.getNextTID()
oid1 = p64(1)
foo = db.holdData("3" * 20, 'foo', 0)
bar = db.holdData("4" * 20, 'bar', 0)
foo = db.holdData("3" * 20, oid1, 'foo', 0)
bar = db.holdData("4" * 20, oid1, 'bar', 0)
db.releaseData((foo, bar))
db.storeTransaction(
tid1, (
......
......@@ -19,6 +19,7 @@ from MySQLdb import NotSupportedError, OperationalError
from MySQLdb.constants.ER import UNKNOWN_STORAGE_ENGINE
from ..mock import Mock
from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import ZERO_OID
from neo.lib.util import p64
from .. import DB_PREFIX, DB_SOCKET, DB_USER
from .testStorageDBTests import StorageDBTests
......@@ -114,7 +115,7 @@ class StorageMySQLdbTests(StorageDBTests):
self.assertEqual(2, max(len(self.db.escape(chr(x)))
for x in xrange(256)))
self.assertEqual(2, len(self.db.escape('\0')))
self.db.storeData('\0' * 20, '\0' * (2**24-1), 0)
self.db.storeData('\0' * 20, ZERO_OID, '\0' * (2**24-1), 0)
size, = query_list
max_allowed = self.db.__class__._max_allowed_packet
self.assertTrue(max_allowed - 1024 < size <= max_allowed, size)
......@@ -123,7 +124,7 @@ class StorageMySQLdbTests(StorageDBTests):
self.db._max_allowed_packet = max_allowed_packet
del query_list[:]
self.db.storeTransaction(p64(0),
((p64(1<<i),0,None) for i in xrange(10)), None)
((p64(1<<i),1234,None) for i in xrange(10)), None)
self.assertEqual(max(query_list), max_allowed_packet)
self.assertEqual(len(query_list), count)
......
......@@ -57,7 +57,7 @@ class TransactionManagerTests(NeoUnitTestBase):
self.manager.storeObject(locking_serial, ram_serial, oid, 0, "3" * 20,
'bar', None)
holdData = self.app.dm.mockGetNamedCalls('holdData')
self.assertEqual(holdData.pop(0).params, ("3" * 20, 'bar', 0))
self.assertEqual(holdData.pop(0).params, ("3" * 20, oid, 'bar', 0))
orig_object = self.manager.getObjectFromTransaction(locking_serial,
oid)
self.manager.updateObjectDataForPack(oid, orig_serial, None, checksum)
......
......@@ -1438,7 +1438,7 @@ class Test(NEOThreadedTest):
bad = []
ok = []
def data_args(value):
return makeChecksum(value), value, 0
return makeChecksum(value), ZERO_OID, value, 0
node_list = []
for i, s in enumerate(cluster.storage_list):
node_list.append(s.uuid)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment