Commit 0a0a8357 authored by Vincent Pelletier's avatar Vincent Pelletier Committed by wenjie.zheng

ERP5Type.BTreeData: Reduce fragmentation.

Reduce fragmentation by aggregating chunks when writing.
Aggregation is controlled by two new properties, chunk_size and
Implement defragmentation.
parent b35767ba
from BTrees.LOBTree import LOBTree
from persistent import Persistent
import itertools
# Maximum memory to allocate for sparse-induced padding.
......@@ -32,8 +33,42 @@ class BTreeData(Persistent):
It supports sparse files, ie writing one byte at 10M offset will not use
10MB on disk. Sparse bytes read as 0x00 (NULL-bytes).
def __init__(self):
_chunk_size = None
_max_chunk_size = None
def chunk_size(self):
"""Aggregate consecutive writes up to this size."""
return self._chunk_size
def chunk_size(self, value):
"""Prevent chunks from exceeding this size."""
if value is not None and (value <= 0 or int(value) != value):
raise ValueError('Invalid chunk_size')
self._chunk_size = value
def max_chunk_size(self):
return self._max_chunk_size
def max_chunk_size(self, value):
if value is not None and (value <= 0 or int(value) != value):
raise ValueError('Invalid max chunk_size')
self._max_chunk_size = value
def __init__(self, chunk_size=None, max_chunk_size=None):
chunk_size (int, None)
If non-None, aggregate consecutive writes up to this size.
Overlaping or larger writes may exceed this size, though.
max_chunk_size (int, None)
If non-None, prevent chunks from exceeding this size.
self._tree = LOBTree()
self.chunk_size = chunk_size
self.max_chunk_size = max_chunk_size
def __len__(self):
......@@ -71,7 +106,11 @@ class BTreeData(Persistent):
# writes.
if lower_key < offset:
chunk = tree[lower_key]
if lower_key + len(chunk.value) > offset:
chunk_end = lower_key + len(chunk.value)
if chunk_end > offset or (
len(chunk.value) < self._chunk_size and
chunk_end == offset
key = lower_key
buf = chunk.value[:offset - key] + buf
......@@ -84,6 +123,7 @@ class BTreeData(Persistent):
if not tree[eof].value:
del tree[eof]
max_to_write_len = self._max_chunk_size or float('inf')
while buf or offset > len(self):
next_key = tree.minKey(key + 1)
......@@ -97,13 +137,15 @@ class BTreeData(Persistent):
except KeyError:
tree[key] = chunk = PersistentString('')
entry_size = len(chunk.value)
if entry_size < to_write_len:
to_write_len = min(to_write_len, max_to_write_len)
to_write = buf[:to_write_len]
buf = buf[to_write_len:]
if to_write_len < entry_size:
assert not buf, (key, to_write_len, entry_size)
assert not buf, (key, to_write_len, entry_size, repr(buf))
to_write += chunk.value[to_write_len:]
chunk.value = to_write
key = next_key
key = next_key or key + to_write_len
def read(self, offset, size):
......@@ -201,6 +243,74 @@ class BTreeData(Persistent):
del tree[key]
self.write('', offset)
# XXX: Various batch_size values need to be benchmarked, and a saner
# default is likely to be applied.
def defragment(self, batch_size=100, resume_at=None):
Merge contiguous chunks up to max_chunk_size.
This method is a generator, allowing caller to define a stop condition
(time, number of calls, ...). Yield value is an opaque, small and
serialisable value which only use is to be passed to resume_at
batch_size (int)
Yield every this many internal operations. Allows trading overhead
for precision. This value may be adjusted on-the-fly by giving the
new value as parameter of the "send" method on generator (see
Python doc on generators).
resume_at (opaque)
If provided, resume interrupted processing at that point.
chunk_size = self._max_chunk_size
key = resume_at or 0
tree = self._tree
for iteration in itertools.count(1):
key = tree.minKey(key)
except ValueError:
if iteration % batch_size == 0:
new_batch_size = yield key
if new_batch_size:
batch_size = new_batch_size
chunk = tree[key]
chunk_len = len(chunk.value)
remainder = chunk_size - chunk_len
if remainder <= 0:
# Current entry is large enough, go to next one.
key += 1
end_offset = key + chunk_len
next_key = tree.minKey(key + 1)
except ValueError:
# No next entry, defrag is over.
if next_key != end_offset:
# There is a hole between current entry end and next one, do
# not concatenate and move on with next entry.
assert next_key > end_offset, (key, chunk_len, next_key)
key = next_key
next_chunk = tree[next_key]
next_chunk_len = len(next_chunk.value)
if next_chunk_len >= chunk_size:
# Next entry is larger than target size, do not concatenate and
# go to the entry after that.
key = next_key + 1
# Concatenate current entry and next one.
chunk.value += next_chunk.value[:remainder]
del tree[next_key]
if next_chunk_len > remainder:
key = next_key + remainder
# Concatenation result is larger than target size, split into
# a new entry.
next_chunk.value = next_chunk.value[remainder:]
tree[key] = next_chunk
if __name__ == '__main__':
def check(tree, length, read_offset, read_length, data, keys=None):
......@@ -248,17 +358,20 @@ if __name__ == '__main__':
# Empty write inside existing chunk
data.write('', 4)
check(data, 7, 0, 10, '0VW3XY6', [0, 5])
# Aligned write
data.write('Z', 5)
check(data, 7, 0, 10, '0VW3XZ6', [0, 5])
data.write('a', 10)
data.write('8', 8)
check(data, 11, 0, 10, '0VW3XY6\x008\x00', [0, 5, 8, 10])
check(data, 11, 0, 10, '0VW3XZ6\x008\x00', [0, 5, 8, 10])
check(data, 11, 7, 10, '\x008\x00a')
data.write('ABCDE', 6)
check(data, 11, 0, 11, '0VW3XYABCDE', [0, 5, 8, 10])
check(data, 11, 0, 11, '0VW3XZABCDE', [0, 5, 8, 10])
check(data, 7, 0, 7, '0VW3XYA', [0, 5])
check(data, 7, 0, 7, '0VW3XZA', [0, 5])
check(data, 5, 0, 5, '0VW3X', [0])
......@@ -272,3 +385,55 @@ if __name__ == '__main__':
check(data, 16, 0, 16, '\x00' * 15 + 'a', [15])
data.write('bc', 9)
check(data, 16, 0, 16, '\x00' * 9 + 'bc' + '\x00' * 4 + 'a', [9, 15])
data = BTreeData(chunk_size=4, max_chunk_size=10)
data.write('01', 0)
check(data, 2, 0, 10, '01', [0])
data.write('23', 2)
check(data, 4, 0, 10, '0123', [0])
data.write('AB4', 2)
check(data, 5, 0, 10, '01AB4', [0])
data.write('C56', 4)
check(data, 7, 0, 10, '01ABC56', [0])
data.write('7', 7)
check(data, 8, 0, 10, '01ABC567', [0, 7])
data.write('8', 8)
check(data, 9, 0, 10, '01ABC5678', [0, 7])
data.write('C', 12)
check(data, 13, 0, 13, '01ABC5678\x00\x00\x00C', [0, 7, 12])
data.write('9ABcDEFG', 9)
check(data, 17, 0, 17, '01ABC56789ABcDEFG', [0, 7, 12])
for _ in data.defragment():
check(data, 17, 0, 17, '01ABC56789ABcDEFG', [0, 10])
data.write('HIJKL', len(data))
check(data, 22, 0, 22, '01ABC56789ABcDEFGHIJKL', [0, 10, 17])
for _ in data.defragment():
check(data, 22, 0, 22, '01ABC56789ABcDEFGHIJKL', [0, 10, 20])
data.write('NOPQRSTUVWXYZ', 23)
check(data, 36, 0, 36, '01ABC56789ABcDEFGHIJKL\x00NOPQRSTUVWXYZ', [0, 10, 20, 23, 33])
for _ in data.defragment():
check(data, 36, 0, 36, '01ABC56789ABcDEFGHIJKL\x00NOPQRSTUVWXYZ', [0, 10, 20, 23, 33])
data = BTreeData(max_chunk_size=10)
for x in xrange(255):
data.write('%02x' % x, x * 2)
check(data, 510, 0, 10, '0001020304', [x * 2 for x in xrange(255)])
defragment_generator = data.defragment(batch_size=2)
check(data, 510, 0, 10, '0001020304', [0] + [x * 2 for x in xrange(2, 255)])
opaque =
check(data, 510, 0, 10, '0001020304', [0] + [x * 2 for x in xrange(4, 255)])
defragment_generator = data.defragment(batch_size=2, resume_at=opaque)
check(data, 510, 0, 10, '0001020304', [0] + [x * 2 for x in xrange(5, 255)])
check(data, 510, 0, 10, '0001020304', [0, 10, 20] + [x * 2 for x in xrange(13, 255)])
check(data, 510, 0, 10, '0001020304', [0, 10, 20, 30, 40] + [x * 2 for x in xrange(23, 255)])
for _ in defragment_generator:
check(data, 510, 0, 10, '0001020304', [x * 10 for x in xrange(51)])
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment