Commit 87eb20e0 authored by Julien Muchembled's avatar Julien Muchembled

wip

parent c61072ab
...@@ -522,7 +522,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -522,7 +522,7 @@ class ImporterDatabaseManager(DatabaseManager):
" your configuration to use the native backend and restart.") " your configuration to use the native backend and restart.")
self._import = None self._import = None
for x in """getObject getReplicationTIDList getReplicationObjectList for x in """getObject getReplicationTIDList getReplicationObjectList
_fetchObject _getDataTID getLastObjectTID _fetchObject _getObjectHistoryForUndo getLastObjectTID
""".split(): """.split():
setattr(self, x, getattr(self.db, x)) setattr(self, x, getattr(self.db, x))
for zodb in self.zodb: for zodb in self.zodb:
...@@ -709,10 +709,12 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -709,10 +709,12 @@ class ImporterDatabaseManager(DatabaseManager):
raise AssertionError raise AssertionError
getLastObjectTID = Fallback.getLastObjectTID.__func__ getLastObjectTID = Fallback.getLastObjectTID.__func__
_getDataTID = Fallback._getDataTID.__func__
def getObjectHistory(self, *args, **kw): def _getObjectHistoryForUndo(self, *args, **kw):
raise BackendNotImplemented(self.getObjectHistory) raise BackendNotImplemented(self._getObjectHistoryForUndo)
def getObjectHistoryWithLength(self, *args, **kw):
raise BackendNotImplemented(self.getObjectHistoryWithLength)
def isReadyToStartPack(self): def isReadyToStartPack(self):
pass # disable pack pass # disable pack
......
...@@ -1003,25 +1003,12 @@ class DatabaseManager(object): ...@@ -1003,25 +1003,12 @@ class DatabaseManager(object):
if prune: if prune:
return self._pruneData(data_id_list) return self._pruneData(data_id_list)
@fallback def _getObjectHistoryForUndo(self, oid, undo_tid):
@requires(_getObject) """Return (undone_tid, history) where 'undone_tid' is the greatest tid
def _getDataTID(self, oid, tid=None, before_tid=None): before 'undo_tid' and 'history' is the list of (tid, value_tid) after
""" 'undo_tid'. If there's no record at 'undo_tid', return None."""
Return a 2-tuple:
tid (int)
tid corresponding to received parameters
serial
data tid of the found record
(None, None) is returned if requested object and transaction
could not be found.
This method only exists for performance reasons, by not returning data:
_getObject already returns these values but it is slower.
"""
r = self._getObject(oid, tid, before_tid)
return (r[0], r[-1]) if r else (None, None)
@requires(_getObjectHistoryForUndo)
def findUndoTID(self, oid, ltid, undo_tid, current_tid): def findUndoTID(self, oid, ltid, undo_tid, current_tid):
""" """
oid oid
...@@ -1049,29 +1036,62 @@ class DatabaseManager(object): ...@@ -1049,29 +1036,62 @@ class DatabaseManager(object):
is_current (bool) is_current (bool)
False if object was modified by later transaction (ie, data_tid is False if object was modified by later transaction (ie, data_tid is
not current), True otherwise. not current), True otherwise.
When undoing several times in such a way that several data_tid are
possible, the implementation guarantees to return the greatest one,
which makes undo compatible with pack without having to update the
value_tid of obj records. IOW, all records that are undo-identical
constitute a simply-linked list; after a pack, the value_tid of the
record with the smallest TID points to nowhere.
With a different implementation, it could fail as follows:
tid value_tid
10 -
20 10
30 10
40 20
After packing at 30, the DB would lose the information that 30 & 40
are undo-identical.
TODO: Since ZODB requires nothing about how undo-identical records are
linked, imported databases may not be packable without breaking
undo information. Same for existing databases because older NEO
implementation linked records differently. A background task to
fix value_tid should be implemented; for example, it would be
used automatically once Importer has finished, if it has seen
non-null value_tid.
""" """
u64 = util.u64 u64 = util.u64
oid = u64(oid)
undo_tid = u64(undo_tid) undo_tid = u64(undo_tid)
if self._getDataTID(oid, undo_tid)[0] is None: history = self._getObjectHistoryForUndo(u64(oid), undo_tid)
return if not history:
undone_tid = self._getDataTID(oid, before_tid=undo_tid)[0] return # nothing to undo for this oid at undo_tid
undone_tid, history = history
if current_tid: if current_tid:
tid = data_tid = u64(current_tid) current = u64(current_tid)
else:
ltid = u64(ltid) if ltid else float('inf')
for current, _ in reversed(history):
if current < ltid:
break
else: else:
if ltid: if ltid <= undo_tid:
ltid = u64(ltid)
tid, data_tid = self._getDataTID(oid, before_tid=ltid)
if tid is None:
return None, None, False return None, None, False
current_tid = util.p64(tid) current = undo_tid
if undo_tid < tid: current_tid = util.p64(current)
tid = data_tid is_current = current == undo_tid
while undo_tid < tid: for tid, data_tid in history:
tid = self._getDataTID(oid, tid)[1] if data_tid is not None:
if data_tid == undone_tid:
undone_tid = tid
elif data_tid == undo_tid:
if current == tid:
is_current = True
else:
undo_tid = tid
return (current_tid, return (current_tid,
None if undone_tid is None else util.p64(undone_tid), None if undone_tid is None else util.p64(undone_tid),
undo_tid == tid) is_current)
@abstract @abstract
def storePackOrder(self, tid, approved, partial, oid_list, pack_tid): def storePackOrder(self, tid, approved, partial, oid_list, pack_tid):
...@@ -1185,7 +1205,7 @@ class DatabaseManager(object): ...@@ -1185,7 +1205,7 @@ class DatabaseManager(object):
area as well.""" area as well."""
@abstract @abstract
def getObjectHistory(self, oid, offset, length): def getObjectHistoryWithLength(self, oid, offset, length):
"""Return a list of serials and sizes for a given object ID. """Return a list of serials and sizes for a given object ID.
The length specifies the maximum size of such a list. Result starts The length specifies the maximum size of such a list. Result starts
with latest serial, and the list must be sorted in descending order. with latest serial, and the list must be sorted in descending order.
...@@ -1220,7 +1240,12 @@ class DatabaseManager(object): ...@@ -1220,7 +1240,12 @@ class DatabaseManager(object):
@abstract @abstract
def _pack(self, offset, oid, tid, limit=None): def _pack(self, offset, oid, tid, limit=None):
"""""" """
The undo feature is implemented in such a way that value_tid does not
have to be updated. This is important for performance reasons, but also
because pack must be idempotent to guarantee that up-to-date replicas
are identical.
"""
@abstract @abstract
def checkTIDRange(self, partition, length, min_tid, max_tid): def checkTIDRange(self, partition, length, min_tid, max_tid):
......
...@@ -871,7 +871,19 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -871,7 +871,19 @@ class MySQLDatabaseManager(DatabaseManager):
None if pack_oids is None else splitOIDField(tid, pack_oids), None if pack_oids is None else splitOIDField(tid, pack_oids),
util.p64(pack_tid))) util.p64(pack_tid)))
def getObjectHistory(self, oid, offset, length): def _getObjectHistoryForUndo(self, oid, undo_tid):
q = self.query
args = self._getReadablePartition(oid), oid, undo_tid
undo = iter(q("SELECT tid FROM obj"
" WHERE `partition`=%s AND oid=%s AND tid<=%s"
" ORDER BY tid DESC LIMIT 2" % args))
if next(undo, (None,))[0] == undo_tid:
return next(undo, (None,))[0], q(
"SELECT tid, value_tid FROM obj"
" WHERE `partition`=%s AND oid=%s AND tid>%s"
% args)
def getObjectHistoryWithLength(self, oid, offset, length):
# FIXME: This method doesn't take client's current transaction id as # FIXME: This method doesn't take client's current transaction id as
# parameter, which means it can return transactions in the future of # parameter, which means it can return transactions in the future of
# client's transaction. # client's transaction.
......
...@@ -655,7 +655,19 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -655,7 +655,19 @@ class SQLiteDatabaseManager(DatabaseManager):
None if pack_oids is None else splitOIDField(tid, pack_oids), None if pack_oids is None else splitOIDField(tid, pack_oids),
util.p64(pack_tid))) util.p64(pack_tid)))
def getObjectHistory(self, oid, offset, length): def _getObjectHistoryForUndo(self, oid, undo_tid):
q = self.query
args = self._getReadablePartition(oid), oid, undo_tid
undo = q("SELECT tid FROM obj"
" WHERE partition=? AND oid=? AND tid<=?"
" ORDER BY tid DESC LIMIT 2", args).fetchall()
if undo and undo.pop(0)[0] == undo_tid:
return undo[0][0] if undo else None, q(
"SELECT tid, value_tid FROM obj"
" WHERE partition=? AND oid=? AND tid>?",
args).fetchall()
def getObjectHistoryWithLength(self, oid, offset, length):
# FIXME: This method doesn't take client's current transaction id as # FIXME: This method doesn't take client's current transaction id as
# parameter, which means it can return transactions in the future of # parameter, which means it can return transactions in the future of
# client's transaction. # client's transaction.
......
...@@ -199,7 +199,8 @@ class ClientOperationHandler(BaseHandler): ...@@ -199,7 +199,8 @@ class ClientOperationHandler(BaseHandler):
app = self.app app = self.app
if app.tm.loadLocked(oid): if app.tm.loadLocked(oid):
raise DelayEvent raise DelayEvent
history_list = app.dm.getObjectHistory(oid, first, last - first) history_list = app.dm.getObjectHistoryWithLength(
oid, first, last - first)
if history_list is None: if history_list is None:
p = Errors.OidNotFound(dump(oid)) p = Errors.OidNotFound(dump(oid))
else: else:
...@@ -300,5 +301,5 @@ class ClientReadOnlyOperationHandler(ClientOperationHandler): ...@@ -300,5 +301,5 @@ class ClientReadOnlyOperationHandler(ClientOperationHandler):
# (askObjectUndoSerial is used in undo() but itself is read-only query) # (askObjectUndoSerial is used in undo() but itself is read-only query)
# FIXME askObjectHistory to limit tid <= backup_tid # FIXME askObjectHistory to limit tid <= backup_tid
# TODO dm.getObjectHistory has to be first fixed for this # TODO dm.getObjectHistoryWithLength has to be first fixed for this
#def askObjectHistory(self, conn, oid, first, last): #def askObjectHistory(self, conn, oid, first, last):
...@@ -290,17 +290,17 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -290,17 +290,17 @@ class StorageDBTests(NeoUnitTestBase):
txn3, objs3 = self.getTransaction([oid]) txn3, objs3 = self.getTransaction([oid])
# one revision # one revision
self.db.storeTransaction(tid1, objs1, txn1, False) self.db.storeTransaction(tid1, objs1, txn1, False)
result = self.db.getObjectHistory(oid, 0, 3) result = self.db.getObjectHistoryWithLength(oid, 0, 3)
self.assertEqual(result, [(tid1, 0)]) self.assertEqual(result, [(tid1, 0)])
result = self.db.getObjectHistory(oid, 1, 1) result = self.db.getObjectHistoryWithLength(oid, 1, 1)
self.assertEqual(result, None) self.assertEqual(result, None)
# two revisions # two revisions
self.db.storeTransaction(tid2, objs2, txn2, False) self.db.storeTransaction(tid2, objs2, txn2, False)
result = self.db.getObjectHistory(oid, 0, 3) result = self.db.getObjectHistoryWithLength(oid, 0, 3)
self.assertEqual(result, [(tid2, 0), (tid1, 0)]) self.assertEqual(result, [(tid2, 0), (tid1, 0)])
result = self.db.getObjectHistory(oid, 1, 3) result = self.db.getObjectHistoryWithLength(oid, 1, 3)
self.assertEqual(result, [(tid1, 0)]) self.assertEqual(result, [(tid1, 0)])
result = self.db.getObjectHistory(oid, 2, 3) result = self.db.getObjectHistoryWithLength(oid, 2, 3)
self.assertEqual(result, None) self.assertEqual(result, None)
def _storeTransactions(self, count): def _storeTransactions(self, count):
......
...@@ -235,7 +235,8 @@ class ImporterTests(NEOThreadedTest): ...@@ -235,7 +235,8 @@ class ImporterTests(NEOThreadedTest):
storage._cache.clear() storage._cache.clear()
storage.loadBefore(r._p_oid, r._p_serial) storage.loadBefore(r._p_oid, r._p_serial)
## ##
self.assertRaisesRegexp(NotImplementedError, " getObjectHistory$", self.assertRaisesRegexp(NotImplementedError,
" getObjectHistoryWithLength$",
c.db().history, r._p_oid) c.db().history, r._p_oid)
h = random_tree.hashTree(r) h = random_tree.hashTree(r)
h(30) h(30)
......
...@@ -131,7 +131,7 @@ class PackTests(NEOThreadedTest): ...@@ -131,7 +131,7 @@ class PackTests(NEOThreadedTest):
self.checkReplicas(cluster) self.checkReplicas(cluster)
@with_cluster() @with_cluster()
def testValueSerialMultipleUndo(self, cluster): def _testValueSerialMultipleUndo(self, cluster, *undos):
t, c = cluster.getTransaction() t, c = cluster.getTransaction()
r = c.root() r = c.root()
ob = r[''] = PCounter() ob = r[''] = PCounter()
...@@ -144,17 +144,22 @@ class PackTests(NEOThreadedTest): ...@@ -144,17 +144,22 @@ class PackTests(NEOThreadedTest):
r._p_changed = 1 r._p_changed = 1
t.commit() t.commit()
db = c.db() db = c.db()
def undo(tid): def undo(i):
db.undo(tid, t.get()) db.undo(tids[i], t.get())
t.commit() t.commit()
tids.append(db.lastTransaction()) tids.append(db.lastTransaction())
undo(tids[1]) undo(-1)
undo(tids[0]) for i in undos:
undo(tids[-1]) undo(i)
cluster.client.pack(timeFromTID(r._p_serial)) cluster.client.pack(timeFromTID(r._p_serial))
self.tic() self.tic()
db.undo(tids[2], t.get()) undo(2)
t.commit()
def testValueSerialMultipleUndo1(self):
self._testValueSerialMultipleUndo(0, -1)
def testValueSerialMultipleUndo2(self):
self._testValueSerialMultipleUndo(-1, 1)
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment