Commit f9a8500d authored by Julien Muchembled's avatar Julien Muchembled

mysql: the largest value allowed by TokuDB enginge is 32 MB

Limiting the size of data.value column to 16 MB saves 1 byte by switching
to MEDIUMBLOB, and it avoid the need of big redo logs in InnoDB.
parent 24b1d024
...@@ -23,6 +23,7 @@ from array import array ...@@ -23,6 +23,7 @@ from array import array
from hashlib import sha1 from hashlib import sha1
import re import re
import string import string
import struct
import time import time
from . import DatabaseManager, LOG_QUERIES from . import DatabaseManager, LOG_QUERIES
...@@ -197,10 +198,15 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -197,10 +198,15 @@ class MySQLDatabaseManager(DatabaseManager):
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
hash BINARY(20) NOT NULL, hash BINARY(20) NOT NULL,
compression TINYINT UNSIGNED NULL, compression TINYINT UNSIGNED NULL,
value LONGBLOB NOT NULL, value MEDIUMBLOB NOT NULL,
UNIQUE (hash, compression) UNIQUE (hash, compression)
) ENGINE=""" + engine) ) ENGINE=""" + engine)
q("""CREATE TABLE IF NOT EXISTS bigdata (
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
value MEDIUMBLOB NOT NULL
) ENGINE=""" + engine)
# The table "ttrans" stores information on uncommitted transactions. # The table "ttrans" stores information on uncommitted transactions.
q("""CREATE TABLE IF NOT EXISTS ttrans ( q("""CREATE TABLE IF NOT EXISTS ttrans (
`partition` SMALLINT UNSIGNED NOT NULL, `partition` SMALLINT UNSIGNED NOT NULL,
...@@ -332,6 +338,9 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -332,6 +338,9 @@ class MySQLDatabaseManager(DatabaseManager):
serial, compression, checksum, data, value_serial = r[0] serial, compression, checksum, data, value_serial = r[0]
except IndexError: except IndexError:
return None return None
if compression and compression & 0x80:
compression &= 0x7f
data = ''.join(self._bigData(data))
return (serial, self._getNextTID(partition, oid, serial), return (serial, self._getNextTID(partition, oid, serial),
compression, checksum, data, value_serial) compression, checksum, data, value_serial)
...@@ -428,17 +437,64 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -428,17 +437,64 @@ class MySQLDatabaseManager(DatabaseManager):
if temporary: if temporary:
self.commit() self.commit()
_structLL = struct.Struct(">LL")
_unpackLL = _structLL.unpack
def _pruneData(self, data_id_list): def _pruneData(self, data_id_list):
data_id_list = set(data_id_list).difference(self._uncommitted_data) data_id_list = set(data_id_list).difference(self._uncommitted_data)
if data_id_list: if data_id_list:
self.query("DELETE data FROM data" q = self.query
" LEFT JOIN obj ON (id = data_id)" id_list = []
" WHERE id IN (%s) AND data_id IS NULL" bigid_list = []
% ",".join(map(str, data_id_list))) for id, value in q("SELECT id, IF(compression < 128, NULL, value)"
" FROM data LEFT JOIN obj ON (id = data_id)"
" WHERE id IN (%s) AND data_id IS NULL"
% ",".join(map(str, data_id_list))):
id_list.append(str(id))
if value:
bigdata_id, length = self._unpackLL(value)
bigid_list += xrange(bigdata_id,
bigdata_id + (length + 0x7fffff >> 23))
if id_list:
q("DELETE FROM data WHERE id IN (%s)" % ",".join(id_list))
if bigid_list:
q("DELETE FROM bigdata WHERE id IN (%s)"
% ",".join(map(str, bigid_list)))
def _bigData(self, value):
bigdata_id, length = self._unpackLL(value)
q = self.query
return (q("SELECT value FROM bigdata WHERE id=%s" % i)[0][0]
for i in xrange(bigdata_id,
bigdata_id + (length + 0x7fffff >> 23)))
def storeData(self, checksum, data, compression): def storeData(self, checksum, data, compression, _pack=_structLL.pack):
e = self.escape e = self.escape
checksum = e(checksum) checksum = e(checksum)
if 0x1000000 <= len(data): # 16M (MEDIUMBLOB limit)
compression |= 0x80
q = self.query
for r, d in q("SELECT id, value FROM data"
" WHERE hash='%s' AND compression=%s"
% (checksum, compression)):
i = 0
for d in self._bigData(d):
j = i + len(d)
if data[i:j] != d:
raise IntegrityError(DUP_ENTRY)
i = j
if j != len(data):
raise IntegrityError(DUP_ENTRY)
return r
i = 'NULL'
length = len(data)
for j in xrange(0, length, 0x800000): # 8M
q("INSERT INTO bigdata VALUES (%s, '%s')"
% (i, e(data[j:j+0x800000])))
if not j:
i = bigdata_id = self.conn.insert_id()
i += 1
data = _pack(bigdata_id, length)
try: try:
self.query("INSERT INTO data VALUES (NULL, '%s', %d, '%s')" % self.query("INSERT INTO data VALUES (NULL, '%s', %d, '%s')" %
(checksum, compression, e(data))) (checksum, compression, e(data)))
...@@ -452,6 +508,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -452,6 +508,8 @@ class MySQLDatabaseManager(DatabaseManager):
raise raise
return self.conn.insert_id() return self.conn.insert_id()
del _structLL
def _getDataTID(self, oid, tid=None, before_tid=None): def _getDataTID(self, oid, tid=None, before_tid=None):
sql = ('SELECT tid, value_tid FROM obj' sql = ('SELECT tid, value_tid FROM obj'
' WHERE `partition` = %d AND oid = %d' ' WHERE `partition` = %d AND oid = %d'
...@@ -547,10 +605,11 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -547,10 +605,11 @@ class MySQLDatabaseManager(DatabaseManager):
# client's transaction. # client's transaction.
oid = util.u64(oid) oid = util.u64(oid)
p64 = util.p64 p64 = util.p64
r = self.query("""SELECT tid, LENGTH(value) r = self.query("SELECT tid, IF(compression < 128, LENGTH(value),"
FROM obj LEFT JOIN data ON (obj.data_id = data.id) " CAST(CONV(HEX(SUBSTR(value, 5, 4)), 16, 10) AS INT))"
WHERE `partition` = %d AND oid = %d AND tid >= %d " FROM obj LEFT JOIN data ON (obj.data_id = data.id)"
ORDER BY tid DESC LIMIT %d, %d""" % " WHERE `partition` = %d AND oid = %d AND tid >= %d"
" ORDER BY tid DESC LIMIT %d, %d" %
(self._getPartition(oid), oid, self._getPackTID(), offset, length)) (self._getPartition(oid), oid, self._getPackTID(), offset, length))
if r: if r:
return [(p64(tid), length or 0) for tid, length in r] return [(p64(tid), length or 0) for tid, length in r]
......
...@@ -110,6 +110,8 @@ class ClientOperationHandler(EventHandler): ...@@ -110,6 +110,8 @@ class ClientOperationHandler(EventHandler):
def askStoreObject(self, conn, oid, serial, def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, ttid, unlock): compression, checksum, data, data_serial, ttid, unlock):
if 1 < compression:
raise ProtocolError('invalid compression value')
# register the transaction # register the transaction
self.app.tm.register(conn.getUUID(), ttid) self.app.tm.register(conn.getUUID(), ttid)
if data or checksum != ZERO_HASH: if data or checksum != ZERO_HASH:
......
...@@ -300,6 +300,9 @@ class StorageApplication(ServerNode, neo.storage.app.Application): ...@@ -300,6 +300,9 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
except StandardError: # AttributeError & ProgrammingError except StandardError: # AttributeError & ProgrammingError
pass pass
def getAdapter(self):
return self._init_args['getAdapter']
def switchTables(self): def switchTables(self):
q = self.dm.query q = self.dm.query
for table in 'trans', 'obj': for table in 'trans', 'obj':
...@@ -312,7 +315,11 @@ class StorageApplication(ServerNode, neo.storage.app.Application): ...@@ -312,7 +315,11 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
index = tuple(dm.query("SELECT id, hash, compression FROM data")) index = tuple(dm.query("SELECT id, hash, compression FROM data"))
assert set(dm._uncommitted_data).issubset(x[0] for x in index) assert set(dm._uncommitted_data).issubset(x[0] for x in index)
get = dm._uncommitted_data.get get = dm._uncommitted_data.get
return {(str(h), c): get(i, 0) for i, h, c in index} return {(str(h), c & 0x7f): get(i, 0) for i, h, c in index}
def sqlCount(self, table):
(r,), = self.dm.query("SELECT COUNT(*) FROM " + table)
return r
class ClientApplication(Node, neo.client.app.Application): class ClientApplication(Node, neo.client.app.Application):
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys import sys
import threading import threading
import transaction import transaction
...@@ -50,7 +51,18 @@ class Test(NEOThreadedTest): ...@@ -50,7 +51,18 @@ class Test(NEOThreadedTest):
data_info = {} data_info = {}
compressible = 'x' * 20 compressible = 'x' * 20
compressed = compress(compressible) compressed = compress(compressible)
for data in 'foo', '', 'foo', compressed, compressible: oid_list = []
if cluster.storage.getAdapter() == 'SQLite':
big = None
data = 'foo', '', 'foo', compressed, compressible
else:
big = os.urandom(65536) * 600
assert len(big) < len(compress(big))
data = ('foo', big, '', 'foo', big[:2**24-1], big,
compressed, compressible, big[:2**24])
self.assertFalse(cluster.storage.sqlCount('bigdata'))
self.assertFalse(cluster.storage.sqlCount('data'))
for data in data:
if data is compressible: if data is compressible:
key = makeChecksum(compressed), 1 key = makeChecksum(compressed), 1
else: else:
...@@ -69,6 +81,20 @@ class Test(NEOThreadedTest): ...@@ -69,6 +81,20 @@ class Test(NEOThreadedTest):
storage._cache.clear() storage._cache.clear()
self.assertEqual((data, serial), storage.load(oid, '')) self.assertEqual((data, serial), storage.load(oid, ''))
self.assertEqual((data, serial), storage.load(oid, '')) self.assertEqual((data, serial), storage.load(oid, ''))
oid_list.append((oid, data, serial))
if big:
self.assertTrue(cluster.storage.sqlCount('bigdata'))
self.assertTrue(cluster.storage.sqlCount('data'))
for i, (oid, data, serial) in enumerate(oid_list, 1):
storage._cache.clear()
cluster.storage.dm.deleteObject(oid)
self.assertRaises(POSException.POSKeyError,
storage.load, oid, '')
for oid, data, serial in oid_list[i:]:
self.assertEqual((data, serial), storage.load(oid, ''))
if big:
self.assertFalse(cluster.storage.sqlCount('bigdata'))
self.assertFalse(cluster.storage.sqlCount('data'))
finally: finally:
cluster.stop() cluster.stop()
...@@ -97,6 +123,30 @@ class Test(NEOThreadedTest): ...@@ -97,6 +123,30 @@ class Test(NEOThreadedTest):
finally: finally:
cluster.stop() cluster.stop()
def testCreationUndoneHistory(self):
cluster = NEOCluster()
try:
cluster.start()
storage = cluster.getZODBStorage()
oid = storage.new_oid()
txn = transaction.Transaction()
storage.tpc_begin(txn)
storage.store(oid, None, 'foo', '', txn)
storage.tpc_vote(txn)
tid1 = storage.tpc_finish(txn)
storage.tpc_begin(txn)
storage.undo(tid1, txn)
tid2 = storage.tpc_finish(txn)
storage.tpc_begin(txn)
storage.undo(tid2, txn)
tid3 = storage.tpc_finish(txn)
expected = [(tid1, 3), (tid2, 0), (tid3, 3)]
for x in storage.history(oid, 10):
self.assertEqual((x['tid'], x['size']), expected.pop())
self.assertFalse(expected)
finally:
cluster.stop()
def testStorageDataLock(self): def testStorageDataLock(self):
cluster = NEOCluster() cluster = NEOCluster()
try: try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment