Commit ca58ccd7 authored by Julien Muchembled's avatar Julien Muchembled

storage: ignore unassigned partitions when looking for last oids/tids

The MySQL implementation is written to work around the issue reported at
  https://jira.mariadb.org/browse/MDEV-12867
parent 3600e59c
...@@ -323,10 +323,22 @@ class DatabaseManager(object): ...@@ -323,10 +323,22 @@ class DatabaseManager(object):
Required only to import a DB using Importer backend. Required only to import a DB using Importer backend.
max_tid must be in unpacked format. max_tid must be in unpacked format.
Data from unassigned partitions must be ignored.
This is important because there may remain data from cells that are
being discarded (which is done in background because this is an
expensive operation).
XXX: Given the TODO comment in getLastIDs, getting ids
from readable partitions should be enough.
""" """
def _getLastIDs(self): def _getLastIDs(self):
"""""" """Return (trans, obj, max(oid)) where
both 'trans' and 'obj' are {partition: max(tid)}
Same as in getLastTID: data from unassigned partitions must be ignored.
"""
@requires(_getLastIDs) @requires(_getLastIDs)
def getLastIDs(self): def getLastIDs(self):
......
...@@ -313,21 +313,37 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -313,21 +313,37 @@ class MySQLDatabaseManager(DatabaseManager):
return self.query("SELECT rid, state FROM pt WHERE nid=%u" % nid) return self.query("SELECT rid, state FROM pt WHERE nid=%u" % nid)
return self.query("SELECT * FROM pt") return self.query("SELECT * FROM pt")
def _getAssignedPartitionList(self):
nid = self.getUUID()
if nid is None:
return ()
return [p for p, in self.query("SELECT rid FROM pt WHERE nid=%s" % nid)]
def _sqlmax(self, sql, arg_list):
q = self.query
x = [x for x in arg_list for x, in q(sql % x) if x is not None]
if x: return max(x)
def getLastTID(self, max_tid): def getLastTID(self, max_tid):
return self.query("SELECT MAX(t) FROM (SELECT MAX(tid) as t FROM trans" return self._sqlmax(
" WHERE tid<=%s GROUP BY `partition`) as t" % max_tid)[0][0] "SELECT MAX(tid) as t FROM trans FORCE INDEX (PRIMARY)"
" WHERE tid<=%s and `partition`=%%s" % max_tid,
self._getAssignedPartitionList())
def _getLastIDs(self): def _getLastIDs(self):
offset_list = self._getAssignedPartitionList()
p64 = util.p64 p64 = util.p64
q = self.query q = self.query
trans = {partition: p64(tid) sql = ("SELECT MAX(tid) FROM %s FORCE INDEX (PRIMARY)"
for partition, tid in q("SELECT `partition`, MAX(tid)" " WHERE `partition`=%s")
" FROM trans GROUP BY `partition`")} trans, obj = ({partition: p64(tid)
obj = {partition: p64(tid) for partition in offset_list
for partition, tid in q("SELECT `partition`, MAX(tid)" for tid, in q(sql % (t, partition))
" FROM obj GROUP BY `partition`")} if tid is not None}
oid = q("SELECT MAX(oid) FROM (SELECT MAX(oid) AS oid FROM obj" for t in ('trans', 'obj'))
" GROUP BY `partition`) as t")[0][0] oid = self._sqlmax(
"SELECT MAX(oid) FROM obj FORCE INDEX (`partition`)"
" WHERE `partition`=%s", offset_list)
return trans, obj, None if oid is None else p64(oid) return trans, obj, None if oid is None else p64(oid)
def _getUnfinishedTIDDict(self): def _getUnfinishedTIDDict(self):
......
...@@ -244,20 +244,25 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -244,20 +244,25 @@ class SQLiteDatabaseManager(DatabaseManager):
# each partition (and finish in Python with max() for getLastTID). # each partition (and finish in Python with max() for getLastTID).
def getLastTID(self, max_tid): def getLastTID(self, max_tid):
return self.query("SELECT MAX(tid) FROM trans WHERE tid<=?", return self.query(
(max_tid,)).next()[0] "SELECT MAX(tid) FROM pt, trans"
" WHERE nid=? AND rid=partition AND tid<=?",
(self.getUUID(), max_tid,)).next()[0]
def _getLastIDs(self): def _getLastIDs(self):
p64 = util.p64 p64 = util.p64
q = self.query q = self.query
args = self.getUUID(),
trans = {partition: p64(tid) trans = {partition: p64(tid)
for partition, tid in q("SELECT partition, MAX(tid)" for partition, tid in q(
" FROM trans GROUP BY partition")} "SELECT partition, MAX(tid) FROM pt, trans"
" WHERE nid=? AND rid=partition GROUP BY partition", args)}
obj = {partition: p64(tid) obj = {partition: p64(tid)
for partition, tid in q("SELECT partition, MAX(tid)" for partition, tid in q(
" FROM obj GROUP BY partition")} "SELECT partition, MAX(tid) FROM pt, obj"
oid = q("SELECT MAX(oid) FROM (SELECT MAX(oid) AS oid FROM obj" " WHERE nid=? AND rid=partition GROUP BY partition", args)}
" GROUP BY partition) as t").next()[0] oid = q("SELECT MAX(oid) oid FROM pt, obj"
" WHERE nid=? AND rid=partition", args).next()[0]
return trans, obj, None if oid is None else p64(oid) return trans, obj, None if oid is None else p64(oid)
def _getUnfinishedTIDDict(self): def _getUnfinishedTIDDict(self):
......
...@@ -35,7 +35,7 @@ from neo.lib.exception import DatabaseFailure, StoppedOperation ...@@ -35,7 +35,7 @@ from neo.lib.exception import DatabaseFailure, StoppedOperation
from neo.lib.handler import DelayEvent from neo.lib.handler import DelayEvent
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes, from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes,
Packets, Packet, uuid_str, ZERO_OID, ZERO_TID) Packets, Packet, uuid_str, ZERO_OID, ZERO_TID, MAX_TID)
from .. import expectedFailure, unpickle_state, Patch, TransactionalResource from .. import expectedFailure, unpickle_state, Patch, TransactionalResource
from . import ClientApplication, ConnectionFilter, LockLock, NEOThreadedTest, \ from . import ClientApplication, ConnectionFilter, LockLock, NEOThreadedTest, \
RandomConflictDict, ThreadId, with_cluster RandomConflictDict, ThreadId, with_cluster
...@@ -2321,6 +2321,34 @@ class Test(NEOThreadedTest): ...@@ -2321,6 +2321,34 @@ class Test(NEOThreadedTest):
self.assertFalse(m1.primary) self.assertFalse(m1.primary)
self.assertTrue(m1.is_alive()) self.assertTrue(m1.is_alive())
@with_cluster(partitions=2, storage_count=2)
def testStorageBackendLastIDs(self, cluster):
"""
Check that getLastIDs/getLastTID ignore data from unassigned partitions.
XXX: this kind of test should not be reexecuted with SSL
"""
cluster.sortStorageList()
t, c = cluster.getTransaction()
c.root()[''] = PCounter()
t.commit()
big_id_list = ('\x7c' * 8, '\x7e' * 8), ('\x7b' * 8, '\x7d' * 8)
for i in 0, 1:
dm = cluster.storage_list[i].dm
expected = dm.getLastTID(u64(MAX_TID)), dm.getLastIDs()
oid, tid = big_id_list[i]
for j, expected in (
(1 - i, (dm.getLastTID(u64(MAX_TID)), dm.getLastIDs())),
(i, (u64(tid), (tid, {}, {}, oid)))):
oid, tid = big_id_list[j]
# Somehow we abuse 'storeTransaction' because we ask it to
# write data for unassigned partitions. This is not checked
# so for the moment, the test works.
dm.storeTransaction(tid, ((oid, None, None),),
((oid,), '', '', '', 0, tid), False)
self.assertEqual(expected,
(dm.getLastTID(u64(MAX_TID)), dm.getLastIDs()))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment