Commit 5fb802c9 authored by Julien Muchembled's avatar Julien Muchembled

wip

parent 0e0a410f
......@@ -350,7 +350,7 @@ class ImporterDatabaseManager(DatabaseManager):
def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw)
implements(self, """_getNextTID checkSerialRange checkTIDRange
deleteObject deleteTransaction dropPartitions _getLastTID
deleteObject deleteTransaction _dropPartition _getLastTID
getReplicationObjectList _getTIDList nonempty""".split())
_getPartition = property(lambda self: self.db._getPartition)
......
......@@ -14,11 +14,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os, errno, socket, sys, threading, time
import os, errno, socket, sys, threading
from collections import defaultdict
from contextlib import contextmanager
from copy import copy
from functools import wraps
from time import time
from neo.lib import logging, util
from neo.lib.exception import NonReadableCell
from neo.lib.interfaces import abstract, requires
......@@ -54,6 +55,8 @@ class DatabaseManager(object):
LOCKED = "error: database is locked"
_deferred = 0
_drop_stats = 0, 0
_dropping = None
_repairing = None
def __init__(self, database, engine=None, wait=None):
......@@ -213,7 +216,8 @@ class DatabaseManager(object):
self.setConfiguration("version", version)
def doOperation(self, app):
pass
if self._dropping:
self._dropPartitions(app)
def _close(self):
"""Backend-specific code to close the database"""
......@@ -560,7 +564,8 @@ class DatabaseManager(object):
if -x[1] in READABLE)
@requires(_changePartitionTable, _getLastIDs, _getLastTID)
def changePartitionTable(self, ptid, num_replicas, cell_list, reset=False):
def changePartitionTable(self, app, ptid, num_replicas, cell_list,
reset=False):
my_nid = self.getUUID()
pt = dict(self.iterAssignedCells())
# In backup mode, the last transactions of a readable cell may be
......@@ -568,23 +573,41 @@ class DatabaseManager(object):
backup_tid = self.getBackupTID()
if backup_tid:
backup_tid = util.u64(backup_tid)
def outofdate_tid(offset):
max_offset = -1
dropping = self._dropping or set()
assigned = []
cells = []
for offset, nid, state in cell_list:
if max_offset < offset:
max_offset = offset
if state == CellStates.DISCARDED:
if nid == my_nid:
dropping.add(offset)
tid = None
else:
if nid == my_nid:
assigned.append(offset)
if nid != my_nid or state != CellStates.OUT_OF_DATE:
tid = -state
else:
tid = pt.get(offset, 0)
if tid >= 0:
return tid
return -tid in READABLE and (backup_tid or
if tid < 0:
tid = -tid in READABLE and (backup_tid or
max(self._getLastIDs(offset)[0],
self._getLastTID(offset))) or 0
cell_list = [(offset, nid, (
None if state == CellStates.DISCARDED else
-state if nid != my_nid or state != CellStates.OUT_OF_DATE else
outofdate_tid(offset)))
for offset, nid, state in cell_list]
self._changePartitionTable(cell_list, reset)
cells.append((offset, nid, tid))
if reset:
dropping.update(xrange(max_offset + 1))
dropping.difference_update(assigned)
self._changePartitionTable(cells, reset)
self._updateReadable(reset)
assert isinstance(ptid, (int, long)), ptid
self._setConfiguration('ptid', str(ptid))
self._setConfiguration('replicas', str(num_replicas))
if dropping and not self._dropping:
self._dropping = dropping
if app.operational:
self._dropPartitions(app)
@requires(_changePartitionTable)
def updateCellTID(self, partition, tid):
......@@ -629,9 +652,70 @@ class DatabaseManager(object):
else:
yield offset, None
def _dropPartitions(self, app):
if app.disable_drop_partitions:
logging.info("don't drop data for partitions %r", self._dropping)
return
def dropPartitions():
dropping = self._dropping
before = drop_count, drop_time = self._drop_stats
commit = dropped = 0
while dropping:
offset = next(iter(dropping))
log = dropped
while True:
yield 1
if offset not in dropping:
break
start = time()
if 0 < commit < start:
self.commit()
logging.debug('drop: committed')
commit = 0
continue
data_id_list = self._dropPartition(offset,
# The efficiency drops when the number of lines to
# delete is too small so do not delete too few.
max(100, int(.1 * drop_count / drop_time))
if drop_time else 1000)
if data_id_list:
if not commit:
commit = time() + 1
if log == dropped:
dropped += 1
logging.info("dropping partition %s...", offset)
if type(data_id_list) is list:
try:
data_id_list.remove(None)
pass # XXX: not covered
except ValueError:
pass
logging.debug('drop: pruneData(%s)',
len(data_id_list))
drop_count += self._pruneData(data_id_list)
drop_time += time() - start
self._drop_stats = drop_count, drop_time
continue
dropping.remove(offset)
break
if dropped:
if commit:
self.commit()
logging.info("%s partition(s) dropped"
" (stats: count: %s/%s, time: %.4s/%.4s)",
dropped, drop_count - before[0], drop_count,
round(drop_time - before[1], 3), round(drop_time, 3))
app.newTask(dropPartitions())
@abstract
def dropPartitions(self, offset_list):
"""Delete all data for specified partitions"""
def _dropPartition(self, offset, count):
"""Delete rows for given partition
Delete at most 'count' rows of from obj:
- if there's no line to delete, purge trans and return
a boolean indicating if any row was deleted (from trans)
- else return data ids of deleted rows
"""
def _getUnfinishedDataIdList(self):
"""Drop any unfinished data from a database."""
......@@ -852,10 +936,10 @@ class DatabaseManager(object):
my_nid = self.getUUID()
commit = 0
for partition, state in self.iterAssignedCells():
if commit < time.time():
if commit < time():
if commit:
self.commit()
commit = time.time() + 10
commit = time() + 10
if state > tid:
cell_list.append((partition, my_nid, tid))
self._deleteRange(partition, tid)
......
......@@ -520,19 +520,20 @@ class MySQLDatabaseManager(DatabaseManager):
" ON DUPLICATE KEY UPDATE tid = %d"
% (offset, nid, tid, tid))
def dropPartitions(self, offset_list):
def _dropPartition(self, offset, count):
q = self.query
# XXX: these queries are inefficient (execution time increase with
# row count, although we use indexes) when there are rows to
# delete. It should be done as an idle task, by chunks.
for partition in offset_list:
where = " WHERE `partition`=%d" % partition
data_id_list = [x for x, in
q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)"
"%s AND data_id IS NOT NULL" % where)]
where = " WHERE `partition`=%s ORDER BY tid, oid LIMIT %s" % (
offset, count)
logging.debug("drop: select(%s)", count)
x = q("SELECT DISTINCT data_id FROM obj FORCE INDEX(tid)" + where)
if x:
logging.debug("drop: obj")
q("DELETE FROM obj" + where)
q("DELETE FROM trans" + where)
self._pruneData(data_id_list)
return [x for x, in x]
logging.debug("drop: trans")
q("DELETE FROM trans WHERE `partition`=%s" % offset)
(x,), = q('SELECT ROW_COUNT()')
return x
def _getUnfinishedDataIdList(self):
return [x for x, in self.query(
......
......@@ -364,17 +364,14 @@ class SQLiteDatabaseManager(DatabaseManager):
q("INSERT OR FAIL INTO pt VALUES (?,?,?)",
(offset, nid, int(state)))
def dropPartitions(self, offset_list):
where = " WHERE partition=?"
def _dropPartition(self, *args):
q = self.query
for partition in offset_list:
args = partition,
data_id_list = [x for x, in q(
"SELECT DISTINCT data_id FROM obj%s AND data_id IS NOT NULL"
% where, args)]
q("DELETE FROM obj" + where, args)
q("DELETE FROM trans" + where, args)
self._pruneData(data_id_list)
where = " FROM obj WHERE partition=? ORDER BY tid, oid LIMIT ?"
x = q("SELECT DISTINCT data_id" + where, args).fetchall()
if x:
q("DELETE" + where, args)
return [x for x, in x]
return q("DELETE FROM trans WHERE partition=?", args[:1]).rowcount
def _getUnfinishedDataIdList(self):
return [x for x, in self.query(
......
......@@ -71,7 +71,7 @@ class BaseMasterHandler(BaseHandler):
if ptid != 1 + app.pt.getID():
raise ProtocolError('wrong partition table id')
app.pt.update(ptid, num_replicas, cell_list, app.nm)
app.dm.changePartitionTable(ptid, num_replicas, cell_list)
app.dm.changePartitionTable(app, ptid, num_replicas, cell_list)
if app.operational:
app.replicator.notifyPartitionChanges(cell_list)
app.dm.commit()
......
......@@ -27,25 +27,11 @@ class InitializationHandler(BaseMasterHandler):
pt.load(ptid, num_replicas, row_list, app.nm)
if not pt.filled():
raise ProtocolError('Partial partition table received')
# Install the partition table into the database for persistence.
cell_list = []
unassigned = range(pt.getPartitions())
for offset in reversed(unassigned):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(), cell.getState()))
if cell.getUUID() == app.uuid:
unassigned.remove(offset)
# delete objects database
cell_list = [(offset, cell.getUUID(), cell.getState())
for offset in xrange(pt.getPartitions())
for cell in pt.getCellList(offset)]
dm = app.dm
if unassigned:
if app.disable_drop_partitions:
logging.info('partitions %r are discarded but actual deletion'
' of data is disabled', unassigned)
else:
logging.debug('drop data for partitions %r', unassigned)
dm.dropPartitions(unassigned)
dm.changePartitionTable(ptid, num_replicas, cell_list, reset=True)
dm.changePartitionTable(app, ptid, num_replicas, cell_list, reset=True)
dm.commit()
def truncate(self, conn, tid):
......
......@@ -91,7 +91,7 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
# dm call
calls = self.app.dm.mockGetNamedCalls('changePartitionTable')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(ptid, 1, cells)
calls[0].checkArgs(app, ptid, 1, cells)
if __name__ == "__main__":
unittest.main()
......@@ -53,7 +53,7 @@ class StorageDBTests(NeoUnitTestBase):
uuid = self.getStorageUUID()
db.setUUID(uuid)
self.assertEqual(uuid, db.getUUID())
db.changePartitionTable(1, 0,
db.changePartitionTable(None, 1, 0,
[(i, uuid, CellStates.UP_TO_DATE) for i in xrange(num_partitions)],
reset=True)
self.assertEqual(num_partitions, 1 + db._getMaxPartition())
......
......@@ -455,13 +455,10 @@ class ReplicationTests(NEOThreadedTest):
return isinstance(packet, delayed) and \
packet._args[0] == offset and \
conn in s1.getConnectionList(s0)
def changePartitionTable(orig, ptid, num_replicas, cell_list):
def changePartitionTable(orig, app, ptid, num_replicas, cell_list):
if (offset, s0.uuid, CellStates.DISCARDED) in cell_list:
connection_filter.remove(delayAskFetch)
# XXX: this is currently not done by
# default for performance reason
orig.im_self.dropPartitions((offset,))
return orig(ptid, num_replicas, cell_list)
return orig(app, ptid, num_replicas, cell_list)
np = cluster.num_partitions
s0, s1, s2 = cluster.storage_list
for delayed in Packets.AskFetchTransactions, Packets.AskFetchObjects:
......@@ -686,17 +683,7 @@ class ReplicationTests(NEOThreadedTest):
cluster.neoctl.tweakPartitionTable()
self.tic()
self.assertEqual(1, s1.sqlCount('obj'))
# Deletion should start as soon as the cell is discarded, as a
# background task, instead of doing it during initialization.
count = s0.sqlCount('obj')
s0.stop()
cluster.join((s0,))
s0.resetNode()
s0.start()
self.tic()
self.assertEqual(2, s0.sqlCount('obj'))
with self.expectedFailure(): \
self.assertEqual(2, count)
@with_cluster(replicas=1)
def testResumingReplication(self, cluster):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment