Commit d0ee6041 authored by Julien Muchembled's avatar Julien Muchembled

storage: fix replication when clients already stored some objects

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2777 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 4b618101
...@@ -453,11 +453,13 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -453,11 +453,13 @@ class BTreeDatabaseManager(DatabaseManager):
prune(obj[oid]) prune(obj[oid])
del obj[oid] del obj[oid]
def deleteObjectsAbove(self, num_partitions, partition, oid, serial): def deleteObjectsAbove(self, num_partitions, partition, oid, serial,
max_tid):
obj = self._obj obj = self._obj
u64 = util.u64 u64 = util.u64
oid = u64(oid) oid = u64(oid)
serial = u64(serial) serial = u64(serial)
max_tid = u64(max_tid)
if oid % num_partitions == partition: if oid % num_partitions == partition:
try: try:
tserial = obj[oid] tserial = obj[oid]
...@@ -465,11 +467,12 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -465,11 +467,12 @@ class BTreeDatabaseManager(DatabaseManager):
pass pass
else: else:
batchDelete(tserial, lambda _, __: True, batchDelete(tserial, lambda _, __: True,
iter_kw={'min': serial}) iter_kw={'min': serial, 'max': max_tid})
def same_partition(key, _): def same_partition(key, _):
return key % num_partitions == partition return key % num_partitions == partition
batchDelete(obj, same_partition, batchDelete(obj, same_partition,
iter_kw={'min': oid, 'excludemin': True}, recycle_subtrees=True) iter_kw={'min': oid, 'excludemin': True, 'max': max_tid},
recycle_subtrees=True)
def getTransaction(self, tid, all=False): def getTransaction(self, tid, all=False):
tid = util.u64(tid) tid = util.u64(tid)
......
...@@ -401,9 +401,11 @@ class DatabaseManager(object): ...@@ -401,9 +401,11 @@ class DatabaseManager(object):
given oid.""" given oid."""
raise NotImplementedError raise NotImplementedError
def deleteObjectsAbove(self, num_partitions, partition, oid, serial): def deleteObjectsAbove(self, num_partitions, partition, oid, serial,
max_tid):
"""Delete all objects above given OID and serial (inclued) in given """Delete all objects above given OID and serial (inclued) in given
partition.""" partition, but never above max_tid (in case objects are stored during
replication)"""
raise NotImplementedError raise NotImplementedError
def getTransaction(self, tid, all = False): def getTransaction(self, tid, all = False):
......
...@@ -574,13 +574,16 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -574,13 +574,16 @@ class MySQLDatabaseManager(DatabaseManager):
raise raise
self.commit() self.commit()
def deleteObjectsAbove(self, num_partitions, partition, oid, serial): def deleteObjectsAbove(self, num_partitions, partition, oid, serial,
max_tid):
u64 = util.u64 u64 = util.u64
self.begin() self.begin()
try: try:
self.objQuery('DELETE FROM %%(table)s WHERE partition=%(partition)d AND (' self.objQuery('DELETE FROM %%(table)s WHERE partition=%(partition)d'
' AND serial <= %(max_tid)d AND ('
'oid > %(oid)d OR (oid = %(oid)d AND serial >= %(serial)d))' % { 'oid > %(oid)d OR (oid = %(oid)d AND serial >= %(serial)d))' % {
'partition': partition, 'partition': partition,
'max_tid': u64(max_tid),
'oid': u64(oid), 'oid': u64(oid),
'serial': u64(serial), 'serial': u64(serial),
}) })
......
...@@ -21,7 +21,7 @@ import neo.lib ...@@ -21,7 +21,7 @@ import neo.lib
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import Packets, ZERO_TID, ZERO_OID from neo.lib.protocol import Packets, ZERO_TID, ZERO_OID
from neo.lib.util import add64 from neo.lib.util import add64, u64
# TODO: benchmark how different values behave # TODO: benchmark how different values behave
RANGE_LENGTH = 4000 RANGE_LENGTH = 4000
...@@ -315,8 +315,16 @@ class ReplicationHandler(EventHandler): ...@@ -315,8 +315,16 @@ class ReplicationHandler(EventHandler):
# Delete all objects we might have which are beyond what peer # Delete all objects we might have which are beyond what peer
# knows. # knows.
((last_oid, last_serial), ) = params ((last_oid, last_serial), ) = params
offset = replicator.getCurrentOffset()
max_tid = replicator.getCurrentCriticalTID()
neo.lib.logging.debug("Serial range checked (offset=%s, min_oid=%x,"
" min_serial=%x, length=%s, count=%s, max_oid=%x,"
" max_serial=%x, last_oid=%x, last_serial=%x, critical_tid=%x)",
offset, u64(min_oid), u64(min_serial), length, count,
u64(max_oid), u64(max_serial), u64(last_oid), u64(last_serial),
u64(max_tid))
app.dm.deleteObjectsAbove(app.pt.getPartitions(), app.dm.deleteObjectsAbove(app.pt.getPartitions(),
replicator.getCurrentOffset(), last_oid, last_serial) offset, last_oid, last_serial, max_tid)
# Nothing remains, so the replication for this partition is # Nothing remains, so the replication for this partition is
# finished. # finished.
replicator.setReplicationDone() replicator.setReplicationDone()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment