Commit 22b0df17 authored by Vincent Pelletier's avatar Vincent Pelletier

ERP5Type.BTreeData: Reduce fragmentation.

Reduce fragmentation by aggregating chunks when writing.
Aggregation is controlled by two new properties, chunk_size and
max_chunk_size.
Implement defragmentation.
parent 2a835360
from BTrees.LOBTree import LOBTree from BTrees.LOBTree import LOBTree
from persistent import Persistent from persistent import Persistent
import itertools
# Maximum memory to allocate for sparse-induced padding. # Maximum memory to allocate for sparse-induced padding.
MAX_PADDING_CHUNK = 2 ** 20 MAX_PADDING_CHUNK = 2 ** 20
...@@ -32,8 +33,42 @@ class BTreeData(Persistent): ...@@ -32,8 +33,42 @@ class BTreeData(Persistent):
It supports sparse files, ie writing one byte at 10M offset will not use It supports sparse files, ie writing one byte at 10M offset will not use
10MB on disk. Sparse bytes read as 0x00 (NULL-bytes). 10MB on disk. Sparse bytes read as 0x00 (NULL-bytes).
""" """
def __init__(self): _chunk_size = None
_max_chunk_size = None
@property
def chunk_size(self):
"""Aggregate consecutive writes up to this size."""
return self._chunk_size
@chunk_size.setter
def chunk_size(self, value):
"""Prevent chunks from exceeding this size."""
if value is not None and (value <= 0 or int(value) != value):
raise ValueError('Invalid chunk_size')
self._chunk_size = value
@property
def max_chunk_size(self):
return self._max_chunk_size
@max_chunk_size.setter
def max_chunk_size(self, value):
if value is not None and (value <= 0 or int(value) != value):
raise ValueError('Invalid max chunk_size')
self._max_chunk_size = value
def __init__(self, chunk_size=None, max_chunk_size=None):
"""
chunk_size (int, None)
If non-None, aggregate consecutive writes up to this size.
Overlaping or larger writes may exceed this size, though.
max_chunk_size (int, None)
If non-None, prevent chunks from exceeding this size.
"""
self._tree = LOBTree() self._tree = LOBTree()
self.chunk_size = chunk_size
self.max_chunk_size = max_chunk_size
def __len__(self): def __len__(self):
""" """
...@@ -71,7 +106,11 @@ class BTreeData(Persistent): ...@@ -71,7 +106,11 @@ class BTreeData(Persistent):
# writes. # writes.
if lower_key < offset: if lower_key < offset:
chunk = tree[lower_key] chunk = tree[lower_key]
if lower_key + len(chunk.value) > offset: chunk_end = lower_key + len(chunk.value)
if chunk_end > offset or (
len(chunk.value) < self._chunk_size and
chunk_end == offset
):
key = lower_key key = lower_key
buf = chunk.value[:offset - key] + buf buf = chunk.value[:offset - key] + buf
try: try:
...@@ -84,6 +123,7 @@ class BTreeData(Persistent): ...@@ -84,6 +123,7 @@ class BTreeData(Persistent):
else: else:
if not tree[eof].value: if not tree[eof].value:
del tree[eof] del tree[eof]
max_to_write_len = self._max_chunk_size or float('inf')
while buf or offset > len(self): while buf or offset > len(self):
try: try:
next_key = tree.minKey(key + 1) next_key = tree.minKey(key + 1)
...@@ -97,13 +137,15 @@ class BTreeData(Persistent): ...@@ -97,13 +137,15 @@ class BTreeData(Persistent):
except KeyError: except KeyError:
tree[key] = chunk = PersistentString('') tree[key] = chunk = PersistentString('')
entry_size = len(chunk.value) entry_size = len(chunk.value)
if entry_size < to_write_len:
to_write_len = min(to_write_len, max_to_write_len)
to_write = buf[:to_write_len] to_write = buf[:to_write_len]
buf = buf[to_write_len:] buf = buf[to_write_len:]
if to_write_len < entry_size: if to_write_len < entry_size:
assert not buf, (key, to_write_len, entry_size) assert not buf, (key, to_write_len, entry_size, repr(buf))
to_write += chunk.value[to_write_len:] to_write += chunk.value[to_write_len:]
chunk.value = to_write chunk.value = to_write
key = next_key key = next_key or key + to_write_len
def read(self, offset, size): def read(self, offset, size):
""" """
...@@ -201,6 +243,74 @@ class BTreeData(Persistent): ...@@ -201,6 +243,74 @@ class BTreeData(Persistent):
del tree[key] del tree[key]
self.write('', offset) self.write('', offset)
# XXX: Various batch_size values need to be benchmarked, and a saner
# default is likely to be applied.
def defragment(self, batch_size=100, resume_at=None):
"""
Merge contiguous chunks up to max_chunk_size.
This method is a generator, allowing caller to define a stop condition
(time, number of calls, ...). Yield value is an opaque, small and
serialisable value which only use is to be passed to resume_at
parameter.
batch_size (int)
Yield every this many internal operations. Allows trading overhead
for precision. This value may be adjusted on-the-fly by giving the
new value as parameter of the "send" method on generator (see
Python doc on generators).
resume_at (opaque)
If provided, resume interrupted processing at that point.
"""
chunk_size = self._max_chunk_size
key = resume_at or 0
tree = self._tree
for iteration in itertools.count(1):
try:
key = tree.minKey(key)
except ValueError:
return
if iteration % batch_size == 0:
new_batch_size = yield key
if new_batch_size:
batch_size = new_batch_size
chunk = tree[key]
chunk_len = len(chunk.value)
remainder = chunk_size - chunk_len
if remainder <= 0:
# Current entry is large enough, go to next one.
key += 1
continue
end_offset = key + chunk_len
try:
next_key = tree.minKey(key + 1)
except ValueError:
# No next entry, defrag is over.
return
if next_key != end_offset:
# There is a hole between current entry end and next one, do
# not concatenate and move on with next entry.
assert next_key > end_offset, (key, chunk_len, next_key)
key = next_key
continue
next_chunk = tree[next_key]
next_chunk_len = len(next_chunk.value)
if next_chunk_len >= chunk_size:
# Next entry is larger than target size, do not concatenate and
# go to the entry after that.
key = next_key + 1
continue
# Concatenate current entry and next one.
chunk.value += next_chunk.value[:remainder]
del tree[next_key]
if next_chunk_len > remainder:
key = next_key + remainder
# Concatenation result is larger than target size, split into
# a new entry.
next_chunk.value = next_chunk.value[remainder:]
tree[key] = next_chunk
if __name__ == '__main__': if __name__ == '__main__':
def check(tree, length, read_offset, read_length, data, keys=None): def check(tree, length, read_offset, read_length, data, keys=None):
...@@ -248,17 +358,20 @@ if __name__ == '__main__': ...@@ -248,17 +358,20 @@ if __name__ == '__main__':
# Empty write inside existing chunk # Empty write inside existing chunk
data.write('', 4) data.write('', 4)
check(data, 7, 0, 10, '0VW3XY6', [0, 5]) check(data, 7, 0, 10, '0VW3XY6', [0, 5])
# Aligned write
data.write('Z', 5)
check(data, 7, 0, 10, '0VW3XZ6', [0, 5])
data.write('a', 10) data.write('a', 10)
data.write('8', 8) data.write('8', 8)
check(data, 11, 0, 10, '0VW3XY6\x008\x00', [0, 5, 8, 10]) check(data, 11, 0, 10, '0VW3XZ6\x008\x00', [0, 5, 8, 10])
check(data, 11, 7, 10, '\x008\x00a') check(data, 11, 7, 10, '\x008\x00a')
data.write('ABCDE', 6) data.write('ABCDE', 6)
check(data, 11, 0, 11, '0VW3XYABCDE', [0, 5, 8, 10]) check(data, 11, 0, 11, '0VW3XZABCDE', [0, 5, 8, 10])
data.truncate(7) data.truncate(7)
check(data, 7, 0, 7, '0VW3XYA', [0, 5]) check(data, 7, 0, 7, '0VW3XZA', [0, 5])
data.truncate(5) data.truncate(5)
check(data, 5, 0, 5, '0VW3X', [0]) check(data, 5, 0, 5, '0VW3X', [0])
data.truncate(3) data.truncate(3)
...@@ -272,3 +385,55 @@ if __name__ == '__main__': ...@@ -272,3 +385,55 @@ if __name__ == '__main__':
check(data, 16, 0, 16, '\x00' * 15 + 'a', [15]) check(data, 16, 0, 16, '\x00' * 15 + 'a', [15])
data.write('bc', 9) data.write('bc', 9)
check(data, 16, 0, 16, '\x00' * 9 + 'bc' + '\x00' * 4 + 'a', [9, 15]) check(data, 16, 0, 16, '\x00' * 9 + 'bc' + '\x00' * 4 + 'a', [9, 15])
data = BTreeData(chunk_size=4, max_chunk_size=10)
data.write('01', 0)
check(data, 2, 0, 10, '01', [0])
data.write('23', 2)
check(data, 4, 0, 10, '0123', [0])
data.write('AB4', 2)
check(data, 5, 0, 10, '01AB4', [0])
data.write('C56', 4)
check(data, 7, 0, 10, '01ABC56', [0])
data.write('7', 7)
check(data, 8, 0, 10, '01ABC567', [0, 7])
data.write('8', 8)
check(data, 9, 0, 10, '01ABC5678', [0, 7])
data.write('C', 12)
check(data, 13, 0, 13, '01ABC5678\x00\x00\x00C', [0, 7, 12])
data.write('9ABcDEFG', 9)
check(data, 17, 0, 17, '01ABC56789ABcDEFG', [0, 7, 12])
for _ in data.defragment():
pass
check(data, 17, 0, 17, '01ABC56789ABcDEFG', [0, 10])
data.write('HIJKL', len(data))
check(data, 22, 0, 22, '01ABC56789ABcDEFGHIJKL', [0, 10, 17])
for _ in data.defragment():
pass
check(data, 22, 0, 22, '01ABC56789ABcDEFGHIJKL', [0, 10, 20])
data.write('NOPQRSTUVWXYZ', 23)
check(data, 36, 0, 36, '01ABC56789ABcDEFGHIJKL\x00NOPQRSTUVWXYZ', [0, 10, 20, 23, 33])
for _ in data.defragment():
pass
check(data, 36, 0, 36, '01ABC56789ABcDEFGHIJKL\x00NOPQRSTUVWXYZ', [0, 10, 20, 23, 33])
data = BTreeData(max_chunk_size=10)
for x in xrange(255):
data.write('%02x' % x, x * 2)
check(data, 510, 0, 10, '0001020304', [x * 2 for x in xrange(255)])
defragment_generator = data.defragment(batch_size=2)
defragment_generator.next()
check(data, 510, 0, 10, '0001020304', [0] + [x * 2 for x in xrange(2, 255)])
opaque = defragment_generator.next()
defragment_generator.close()
check(data, 510, 0, 10, '0001020304', [0] + [x * 2 for x in xrange(4, 255)])
defragment_generator = data.defragment(batch_size=2, resume_at=opaque)
defragment_generator.next()
check(data, 510, 0, 10, '0001020304', [0] + [x * 2 for x in xrange(5, 255)])
defragment_generator.send(10)
check(data, 510, 0, 10, '0001020304', [0, 10, 20] + [x * 2 for x in xrange(13, 255)])
defragment_generator.next()
check(data, 510, 0, 10, '0001020304', [0, 10, 20, 30, 40] + [x * 2 for x in xrange(23, 255)])
for _ in defragment_generator:
pass
check(data, 510, 0, 10, '0001020304', [x * 10 for x in xrange(51)])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment