Commit db9ef1a5 authored by Yoni Fogel's avatar Yoni Fogel

Refs Tokutek/ft-index#46 Comments, minor renames, reworked...

Refs Tokutek/ft-index#46 Comments, minor renames, reworked bn_data::move_leafentries s.t. splits do not significantly increase memory usage of basement nodes and to be cleaner
parent 7f2e8e72
...@@ -111,9 +111,9 @@ void bn_data::add_key(uint32_t keylen) { ...@@ -111,9 +111,9 @@ void bn_data::add_key(uint32_t keylen) {
m_disksize_of_keys += sizeof(keylen) + keylen; m_disksize_of_keys += sizeof(keylen) + keylen;
} }
void bn_data::add_keys(uint32_t n_keys, uint32_t combined_keylen) { void bn_data::add_keys(uint32_t n_keys, uint32_t combined_klpair_len) {
invariant(n_keys * sizeof(uint32_t) <= combined_keylen); invariant(n_keys * sizeof(uint32_t) <= combined_klpair_len);
m_disksize_of_keys += combined_keylen; m_disksize_of_keys += combined_klpair_len;
} }
void bn_data::remove_key(uint32_t keylen) { void bn_data::remove_key(uint32_t keylen) {
...@@ -124,7 +124,7 @@ void bn_data::remove_key(uint32_t keylen) { ...@@ -124,7 +124,7 @@ void bn_data::remove_key(uint32_t keylen) {
// Currently only supports fixed-length keys. // Currently only supports fixed-length keys.
void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version UU(), void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version UU(),
uint32_t key_data_size, uint32_t val_data_size, bool all_keys_same_length, uint32_t key_data_size, uint32_t val_data_size, bool all_keys_same_length,
uint32_t fixed_key_length) { uint32_t fixed_klpair_length) {
paranoid_invariant(version >= FT_LAYOUT_VERSION_25); // Support was added @25 paranoid_invariant(version >= FT_LAYOUT_VERSION_25); // Support was added @25
uint32_t ndone_before = rb->ndone; uint32_t ndone_before = rb->ndone;
init_zero(); init_zero();
...@@ -133,7 +133,7 @@ void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struc ...@@ -133,7 +133,7 @@ void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struc
rbuf_literal_bytes(rb, &keys_src, key_data_size); rbuf_literal_bytes(rb, &keys_src, key_data_size);
//Generate dmt //Generate dmt
this->m_buffer.create_from_sorted_memory_of_fixed_size_elements( this->m_buffer.create_from_sorted_memory_of_fixed_size_elements(
keys_src, num_entries, key_data_size, fixed_key_length); keys_src, num_entries, key_data_size, fixed_klpair_length);
toku_mempool_construct(&this->m_buffer_mempool, val_data_size); toku_mempool_construct(&this->m_buffer_mempool, val_data_size);
bytevec vals_src; bytevec vals_src;
...@@ -145,7 +145,7 @@ void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struc ...@@ -145,7 +145,7 @@ void bn_data::initialize_from_separate_keys_and_vals(uint32_t num_entries, struc
memcpy(vals_dest, vals_src, val_data_size); memcpy(vals_dest, vals_src, val_data_size);
} }
add_keys(num_entries, num_entries * fixed_key_length); add_keys(num_entries, num_entries * fixed_klpair_length);
toku_note_deserialized_basement_node(all_keys_same_length); toku_note_deserialized_basement_node(all_keys_same_length);
...@@ -168,7 +168,7 @@ void bn_data::serialize_header(struct wbuf *wb) const { ...@@ -168,7 +168,7 @@ void bn_data::serialize_header(struct wbuf *wb) const {
wbuf_nocrc_uint(wb, m_disksize_of_keys); wbuf_nocrc_uint(wb, m_disksize_of_keys);
//val_data_size //val_data_size
wbuf_nocrc_uint(wb, toku_mempool_get_used_space(&m_buffer_mempool)); wbuf_nocrc_uint(wb, toku_mempool_get_used_space(&m_buffer_mempool));
//fixed_key_length //fixed_klpair_length
wbuf_nocrc_uint(wb, m_buffer.get_fixed_length()); wbuf_nocrc_uint(wb, m_buffer.get_fixed_length());
// all_keys_same_length // all_keys_same_length
wbuf_nocrc_uint8_t(wb, fixed); wbuf_nocrc_uint8_t(wb, fixed);
...@@ -200,14 +200,14 @@ void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint3 ...@@ -200,14 +200,14 @@ void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint3
bool all_keys_same_length = false; bool all_keys_same_length = false;
bool keys_vals_separate = false; bool keys_vals_separate = false;
uint32_t fixed_key_length = 0; uint32_t fixed_klpair_length = 0;
// In version 24 and older there is no header. Skip reading header for old version. // In version 24 and older there is no header. Skip reading header for old version.
if (version >= FT_LAYOUT_VERSION_25) { if (version >= FT_LAYOUT_VERSION_25) {
uint32_t ndone_before = rb->ndone; uint32_t ndone_before = rb->ndone;
key_data_size = rbuf_int(rb); key_data_size = rbuf_int(rb);
val_data_size = rbuf_int(rb); val_data_size = rbuf_int(rb);
fixed_key_length = rbuf_int(rb); // 0 if !all_keys_same_length fixed_klpair_length = rbuf_int(rb); // 0 if !all_keys_same_length
all_keys_same_length = rbuf_char(rb); all_keys_same_length = rbuf_char(rb);
keys_vals_separate = rbuf_char(rb); keys_vals_separate = rbuf_char(rb);
invariant(all_keys_same_length == keys_vals_separate); // Until we support this invariant(all_keys_same_length == keys_vals_separate); // Until we support this
...@@ -217,7 +217,7 @@ void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint3 ...@@ -217,7 +217,7 @@ void bn_data::deserialize_from_rbuf(uint32_t num_entries, struct rbuf *rb, uint3
if (keys_vals_separate) { if (keys_vals_separate) {
initialize_from_separate_keys_and_vals(num_entries, rb, data_size, version, initialize_from_separate_keys_and_vals(num_entries, rb, data_size, version,
key_data_size, val_data_size, all_keys_same_length, key_data_size, val_data_size, all_keys_same_length,
fixed_key_length); fixed_klpair_length);
return; return;
} }
} }
...@@ -350,7 +350,7 @@ void bn_data::delete_leafentry ( ...@@ -350,7 +350,7 @@ void bn_data::delete_leafentry (
{ {
remove_key(keylen); remove_key(keylen);
m_buffer.delete_at(idx); m_buffer.delete_at(idx);
toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size); // Must pass nullptr, since le is no good any more. toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size);
} }
/* mempool support */ /* mempool support */
...@@ -389,7 +389,8 @@ void bn_data::dmt_compress_kvspace(size_t added_size, void **maybe_free, bool fo ...@@ -389,7 +389,8 @@ void bn_data::dmt_compress_kvspace(size_t added_size, void **maybe_free, bool fo
return; return;
} }
struct mempool new_kvspace; struct mempool new_kvspace;
toku_mempool_construct(&new_kvspace, 2*total_size_needed); size_t requested_size = force_compress ? total_size_needed : 2*total_size_needed;
toku_mempool_construct(&new_kvspace, requested_size);
struct dmt_compressor_state oc = { &new_kvspace, this}; struct dmt_compressor_state oc = { &new_kvspace, this};
m_buffer.iterate_ptr< decltype(oc), move_it >(&oc); m_buffer.iterate_ptr< decltype(oc), move_it >(&oc);
...@@ -422,14 +423,15 @@ void bn_data::get_space_for_overwrite( ...@@ -422,14 +423,15 @@ void bn_data::get_space_for_overwrite(
uint32_t old_le_size, uint32_t old_le_size,
uint32_t new_size, uint32_t new_size,
LEAFENTRY* new_le_space LEAFENTRY* new_le_space
//TODO(yoni): add maybe_free return (caller will free it)
) )
{ {
void* maybe_free = nullptr; void* maybe_free = nullptr;
LEAFENTRY new_le = mempool_malloc_and_update_dmt( LEAFENTRY new_le = mempool_malloc_and_update_dmt(new_size, &maybe_free);
new_size, if (maybe_free) {
&maybe_free toku_free(maybe_free);
); }
toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size); // Must pass nullptr, since le is no good any more. toku_mempool_mfree(&m_buffer_mempool, nullptr, old_le_size);
klpair_struct* klp = nullptr; klpair_struct* klp = nullptr;
uint32_t klpair_len; uint32_t klpair_len;
int r = m_buffer.fetch(idx, &klpair_len, &klp); int r = m_buffer.fetch(idx, &klpair_len, &klp);
...@@ -437,7 +439,7 @@ void bn_data::get_space_for_overwrite( ...@@ -437,7 +439,7 @@ void bn_data::get_space_for_overwrite(
paranoid_invariant(klp!=nullptr); paranoid_invariant(klp!=nullptr);
// Key never changes. // Key never changes.
paranoid_invariant(keylen_from_klpair_len(klpair_len) == keylen); paranoid_invariant(keylen_from_klpair_len(klpair_len) == keylen);
paranoid_invariant(!memcmp(klp->key, keyp, keylen)); // TODO: can keyp be pointing to the old space? If so this could fail paranoid_invariant(!memcmp(klp->key, keyp, keylen));
size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le); size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
paranoid_invariant(new_le_offset <= UINT32_MAX - new_size); // Not using > 4GB paranoid_invariant(new_le_offset <= UINT32_MAX - new_size); // Not using > 4GB
...@@ -445,11 +447,6 @@ void bn_data::get_space_for_overwrite( ...@@ -445,11 +447,6 @@ void bn_data::get_space_for_overwrite(
paranoid_invariant(new_le == get_le_from_klpair(klp)); paranoid_invariant(new_le == get_le_from_klpair(klp));
*new_le_space = new_le; *new_le_space = new_le;
// free at end, so that the keyp and keylen
// passed in is still valid
if (maybe_free) {
toku_free(maybe_free);
}
} }
void bn_data::get_space_for_insert( void bn_data::get_space_for_insert(
...@@ -458,103 +455,124 @@ void bn_data::get_space_for_insert( ...@@ -458,103 +455,124 @@ void bn_data::get_space_for_insert(
uint32_t keylen, uint32_t keylen,
size_t size, size_t size,
LEAFENTRY* new_le_space LEAFENTRY* new_le_space
//TODO(yoni): add maybe_free return (caller will free it). Also make callers (and tests) use it
) )
{ {
add_key(keylen); add_key(keylen);
void* maybe_free = nullptr; void* maybe_free = nullptr;
LEAFENTRY new_le = mempool_malloc_and_update_dmt( LEAFENTRY new_le = mempool_malloc_and_update_dmt(size, &maybe_free);
size, if (maybe_free) {
&maybe_free toku_free(maybe_free);
); }
size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le); size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
klpair_dmtwriter kl(keylen, new_le_offset, keyp); klpair_dmtwriter kl(keylen, new_le_offset, keyp);
m_buffer.insert_at(kl, idx); m_buffer.insert_at(kl, idx);
*new_le_space = new_le; *new_le_space = new_le;
// free at end, so that the keyp and keylen
// passed in is still valid (you never know if
// it was part of the old mempool, this is just
// safer).
if (maybe_free) {
toku_free(maybe_free);
}
} }
class move_leafentry_extra { class split_klpairs_extra {
bn_data *const m_src_bn; bn_data *const m_left_bn;
bn_data *const m_dest_bn; bn_data *const m_right_bn;
klpair_dmt_t::builder *const m_builder; klpair_dmt_t::builder *const m_left_builder;
klpair_dmt_t::builder *const m_right_builder;
struct mempool *const m_left_dest_mp;
uint32_t m_split_at;
struct mempool *src_mp() const { return &m_src_bn->m_buffer_mempool; } struct mempool *left_dest_mp(void) const { return m_left_dest_mp; }
struct mempool *dest_mp() const { return &m_dest_bn->m_buffer_mempool; } struct mempool *right_dest_mp(void) const { return &m_right_bn->m_buffer_mempool; }
int move_leafentry(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t UU(idx)) { void copy_klpair(const uint32_t klpair_len, const klpair_struct &klpair,
LEAFENTRY old_le = m_src_bn->get_le_from_klpair(&klpair); klpair_dmt_t::builder *const builder,
struct mempool *const dest_mp,
bn_data *const bn) {
LEAFENTRY old_le = m_left_bn->get_le_from_klpair(&klpair);
size_t le_size = leafentry_memsize(old_le); size_t le_size = leafentry_memsize(old_le);
void *new_le = toku_mempool_malloc(dest_mp(), le_size, 1);
void *new_le = toku_mempool_malloc(dest_mp, le_size, 1);
paranoid_invariant_notnull(new_le); paranoid_invariant_notnull(new_le);
memcpy(new_le, old_le, le_size); memcpy(new_le, old_le, le_size);
size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(dest_mp(), new_le); size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(dest_mp, new_le);
size_t keylen = keylen_from_klpair_len(klpair_len); size_t keylen = keylen_from_klpair_len(klpair_len);
m_builder->append(klpair_dmtwriter(keylen, le_offset, klpair.key)); builder->append(klpair_dmtwriter(keylen, le_offset, klpair.key));
// TODO(yoni): change these to "note_removed_key" bn->add_key(keylen);
m_src_bn->remove_key(keylen); }
m_dest_bn->add_key(keylen);
toku_mempool_mfree(src_mp(), old_le, le_size); int move_leafentry(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx) {
m_left_bn->remove_key(keylen_from_klpair_len(klpair_len));
if (idx < m_split_at) {
copy_klpair(klpair_len, klpair, m_left_builder, left_dest_mp(), m_left_bn);
} else {
copy_klpair(klpair_len, klpair, m_right_builder, right_dest_mp(), m_right_bn);
}
return 0; return 0;
} }
public: public:
move_leafentry_extra(bn_data *const src_bn, bn_data *const dest_bn, split_klpairs_extra(bn_data *const left_bn, bn_data *const right_bn,
klpair_dmt_t::builder *const builder) klpair_dmt_t::builder *const left_builder,
: m_src_bn(src_bn), klpair_dmt_t::builder *const right_builder,
m_dest_bn(dest_bn), struct mempool *const left_new_mp,
m_builder(builder) {} uint32_t split_at)
static int cb(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx, move_leafentry_extra *const thisp) { : m_left_bn(left_bn),
m_right_bn(right_bn),
m_left_builder(left_builder),
m_right_builder(right_builder),
m_left_dest_mp(left_new_mp),
m_split_at(split_at) {}
static int cb(const uint32_t klpair_len, const klpair_struct &klpair, const uint32_t idx, split_klpairs_extra *const thisp) {
return thisp->move_leafentry(klpair_len, klpair, idx); return thisp->move_leafentry(klpair_len, klpair, idx);
} }
}; };
void bn_data::move_leafentries_to( void bn_data::split_klpairs(
bn_data* dest_bd, bn_data* right_bd,
uint32_t lbi, //lower bound inclusive uint32_t split_at //lower bound inclusive for right_bd
uint32_t ube //upper bound exclusive
) )
{ {
// We use move_leafentries_to during a split, and the split algorithm should never call this // We use move_leafentries_to during a split, and the split algorithm should never call this
// if it's splitting on a boundary, so there must be some leafentries in the range to move. // if it's splitting on a boundary, so there must be some leafentries in the range to move.
paranoid_invariant(lbi < ube); paranoid_invariant(split_at < dmt_size());
paranoid_invariant(ube <= dmt_size());
dest_bd->init_zero(); right_bd->init_zero();
size_t mpsize = toku_mempool_get_used_space(&m_buffer_mempool); // overkill, but safe size_t mpsize = toku_mempool_get_used_space(&m_buffer_mempool); // overkill, but safe
struct mempool *dest_mp = &dest_bd->m_buffer_mempool;
toku_mempool_construct(dest_mp, mpsize);
klpair_dmt_t::builder dest_dmt_builder; struct mempool new_left_mp;
dest_dmt_builder.create(ube-lbi, m_disksize_of_keys); // overkill, but safe (builder will realloc at the end) toku_mempool_construct(&new_left_mp, mpsize);
struct mempool *right_mp = &right_bd->m_buffer_mempool;
toku_mempool_construct(right_mp, mpsize);
klpair_dmt_t::builder left_dmt_builder;
left_dmt_builder.create(split_at, m_disksize_of_keys); // overkill, but safe (builder will realloc at the end)
klpair_dmt_t::builder right_dmt_builder;
right_dmt_builder.create(dmt_size() - split_at, m_disksize_of_keys); // overkill, but safe (builder will realloc at the end)
move_leafentry_extra extra(this, dest_bd, &dest_dmt_builder); split_klpairs_extra extra(this, right_bd, &left_dmt_builder, &right_dmt_builder, &new_left_mp, split_at);
int r = m_buffer.iterate_on_range<move_leafentry_extra, move_leafentry_extra::cb>(lbi, ube, &extra); int r = m_buffer.iterate<split_klpairs_extra, split_klpairs_extra::cb>(&extra);
invariant_zero(r); invariant_zero(r);
dest_dmt_builder.build(&dest_bd->m_buffer); m_buffer.destroy();
toku_mempool_destroy(&m_buffer_mempool);
// now remove the elements from src_dmt m_buffer_mempool = new_left_mp;
for (uint32_t i=ube-1; i >= lbi; i--) {
m_buffer.delete_at(i); left_dmt_builder.build(&m_buffer);
} right_dmt_builder.build(&right_bd->m_buffer);
// Potentially shrink memory pool for destination. // Potentially shrink memory pool for destination.
// We overallocated ("overkill") above // We overallocated ("overkill") above
paranoid_invariant_zero(toku_mempool_get_frag_size(dest_mp)); struct mempool *const left_mp = &m_buffer_mempool;
toku_mempool_realloc_larger(dest_mp, toku_mempool_get_used_space(dest_mp)); paranoid_invariant_zero(toku_mempool_get_frag_size(left_mp));
toku_mempool_realloc_larger(left_mp, toku_mempool_get_used_space(left_mp));
paranoid_invariant_zero(toku_mempool_get_frag_size(right_mp));
toku_mempool_realloc_larger(right_mp, toku_mempool_get_used_space(right_mp));
} }
uint64_t bn_data::get_disk_size() { uint64_t bn_data::get_disk_size() {
...@@ -605,11 +623,11 @@ void bn_data::set_contents_as_clone_of_sorted_array( ...@@ -605,11 +623,11 @@ void bn_data::set_contents_as_clone_of_sorted_array(
uint32_t num_les, uint32_t num_les,
const void** old_key_ptrs, const void** old_key_ptrs,
uint32_t* old_keylens, uint32_t* old_keylens,
LEAFENTRY* old_les, LEAFENTRY* old_les,
size_t *le_sizes, size_t *le_sizes,
size_t total_key_size, size_t total_key_size,
size_t total_le_size size_t total_le_size
) )
{ {
//Enforce "just created" invariant. //Enforce "just created" invariant.
paranoid_invariant_zero(m_disksize_of_keys); paranoid_invariant_zero(m_disksize_of_keys);
...@@ -624,9 +642,9 @@ void bn_data::set_contents_as_clone_of_sorted_array( ...@@ -624,9 +642,9 @@ void bn_data::set_contents_as_clone_of_sorted_array(
klpair_dmt_t::builder dmt_builder; klpair_dmt_t::builder dmt_builder;
dmt_builder.create(num_les, total_key_size); dmt_builder.create(num_les, total_key_size);
//TODO: speed this up with some form of mass create dmt
for (uint32_t idx = 0; idx < num_les; idx++) { for (uint32_t idx = 0; idx < num_les; idx++) {
void* new_le = toku_mempool_malloc(&m_buffer_mempool, le_sizes[idx], 1); void* new_le = toku_mempool_malloc(&m_buffer_mempool, le_sizes[idx], 1);
paranoid_invariant_notnull(new_le);
memcpy(new_le, old_les[idx], le_sizes[idx]); memcpy(new_le, old_les[idx], le_sizes[idx]);
size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(&m_buffer_mempool, new_le); size_t le_offset = toku_mempool_get_offset_from_pointer_and_base(&m_buffer_mempool, new_le);
dmt_builder.append(klpair_dmtwriter(old_keylens[idx], le_offset, old_key_ptrs[idx])); dmt_builder.append(klpair_dmtwriter(old_keylens[idx], le_offset, old_key_ptrs[idx]));
......
...@@ -193,9 +193,11 @@ public: ...@@ -193,9 +193,11 @@ public:
void verify_mempool(void); void verify_mempool(void);
// size() of key dmt // size() of key dmt
// TODO(yoni): maybe rename to something like num_klpairs
uint32_t dmt_size(void) const; uint32_t dmt_size(void) const;
// iterate() on key dmt (and associated leafentries) // iterate() on key dmt (and associated leafentries)
// TODO(yoni): rename to iterate
template<typename iterate_extra_t, template<typename iterate_extra_t,
int (*f)(const void * key, const uint32_t keylen, const LEAFENTRY &, const uint32_t, iterate_extra_t *const)> int (*f)(const void * key, const uint32_t keylen, const LEAFENTRY &, const uint32_t, iterate_extra_t *const)>
int dmt_iterate(iterate_extra_t *const iterate_extra) const { int dmt_iterate(iterate_extra_t *const iterate_extra) const {
...@@ -203,6 +205,7 @@ public: ...@@ -203,6 +205,7 @@ public:
} }
// iterate_on_range() on key dmt (and associated leafentries) // iterate_on_range() on key dmt (and associated leafentries)
// TODO(yoni): rename to iterate_on_range
template<typename iterate_extra_t, template<typename iterate_extra_t,
int (*f)(const void * key, const uint32_t keylen, const LEAFENTRY &, const uint32_t, iterate_extra_t *const)> int (*f)(const void * key, const uint32_t keylen, const LEAFENTRY &, const uint32_t, iterate_extra_t *const)>
int dmt_iterate_on_range(const uint32_t left, const uint32_t right, iterate_extra_t *const iterate_extra) const { int dmt_iterate_on_range(const uint32_t left, const uint32_t right, iterate_extra_t *const iterate_extra) const {
...@@ -272,10 +275,7 @@ public: ...@@ -272,10 +275,7 @@ public:
// Move leafentries (and associated key/keylens) from this basement node to dest_bd // Move leafentries (and associated key/keylens) from this basement node to dest_bd
// Moves indexes [lbi-ube) // Moves indexes [lbi-ube)
__attribute__((__nonnull__)) __attribute__((__nonnull__))
void move_leafentries_to(bn_data* dest_bd, void split_klpairs(bn_data* dest_bd, uint32_t first_index_for_dest);
uint32_t lbi, //lower bound inclusive
uint32_t ube //upper bound exclusive
);
// Destroy this basement node and free memory. // Destroy this basement node and free memory.
void destroy(void); void destroy(void);
...@@ -321,6 +321,8 @@ public: ...@@ -321,6 +321,8 @@ public:
// Between calling prepare_to_serialize and actually serializing, the basement node may not be modified // Between calling prepare_to_serialize and actually serializing, the basement node may not be modified
void prepare_to_serialize(void); void prepare_to_serialize(void);
//TODO(yoni): go to serialize_ftnode_partition and move prepare/header/etc (and wbufwriteleafentry) into here and add just one external function: serialize_to_wbuf()
// Serialize the basement node header to a wbuf // Serialize the basement node header to a wbuf
// Requires prepare_to_serialize() to have been called first. // Requires prepare_to_serialize() to have been called first.
void serialize_header(struct wbuf *wb) const; void serialize_header(struct wbuf *wb) const;
...@@ -344,10 +346,10 @@ public: ...@@ -344,10 +346,10 @@ public:
+ 0; + 0;
private: private:
// move_leafentry_extra should be a local class in move_leafentries_to, but // split_klpairs_extra should be a local class in split_klpairs, but
// the dmt template parameter for iterate needs linkage, so it has to be a // the dmt template parameter for iterate needs linkage, so it has to be a
// separate class, but we want it to be able to call e.g. add_key // separate class, but we want it to be able to call e.g. add_key
friend class move_leafentry_extra; friend class split_klpairs_extra;
// Allocates space in the mempool. // Allocates space in the mempool.
// If there is insufficient space, the mempool is enlarged and leafentries may be shuffled to reduce fragmentation. // If there is insufficient space, the mempool is enlarged and leafentries may be shuffled to reduce fragmentation.
...@@ -364,7 +366,7 @@ private: ...@@ -364,7 +366,7 @@ private:
void add_key(uint32_t keylen); void add_key(uint32_t keylen);
// Note that multiple keys were added (for maintaining disk-size of this basement node) // Note that multiple keys were added (for maintaining disk-size of this basement node)
void add_keys(uint32_t n_keys, uint32_t combined_keylen); void add_keys(uint32_t n_keys, uint32_t combined_klpair_len);
// Note that a key was removed (for maintaining disk-size of this basement node) // Note that a key was removed (for maintaining disk-size of this basement node)
void remove_key(uint32_t keylen); void remove_key(uint32_t keylen);
...@@ -375,7 +377,7 @@ private: ...@@ -375,7 +377,7 @@ private:
friend class bndata_bugfix_test; friend class bndata_bugfix_test;
// Get the serialized size of a klpair. // Get the serialized size of a klpair.
// As of Jan 14, 2014, serialized size of a klpair is independent of if this basement node has fixed-length keys. // As of Jan 14, 2014, serialized size of a klpair is independent of whether this basement node has fixed-length keys.
uint32_t klpair_disksize(const uint32_t klpair_len, const klpair_struct *klpair) const; uint32_t klpair_disksize(const uint32_t klpair_len, const klpair_struct *klpair) const;
// The disk/memory size of all keys. (Note that the size of memory for the leafentries is maintained by m_buffer_mempool) // The disk/memory size of all keys. (Note that the size of memory for the leafentries is maintained by m_buffer_mempool)
...@@ -385,6 +387,6 @@ private: ...@@ -385,6 +387,6 @@ private:
// all keys will be first followed by all leafentries (both in sorted order) // all keys will be first followed by all leafentries (both in sorted order)
void initialize_from_separate_keys_and_vals(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version, void initialize_from_separate_keys_and_vals(uint32_t num_entries, struct rbuf *rb, uint32_t data_size, uint32_t version,
uint32_t key_data_size, uint32_t val_data_size, bool all_keys_same_length, uint32_t key_data_size, uint32_t val_data_size, bool all_keys_same_length,
uint32_t fixed_key_length); uint32_t fixed_klpair_length);
}; };
...@@ -754,7 +754,8 @@ move_leafentries( ...@@ -754,7 +754,8 @@ move_leafentries(
) )
//Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt //Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt
{ {
src_bn->data_buffer.move_leafentries_to(&dest_bn->data_buffer, lbi, ube); invariant(ube == src_bn->data_buffer.dmt_size());
src_bn->data_buffer.split_klpairs(&dest_bn->data_buffer, lbi);
} }
static void ftnode_finalize_split(FTNODE node, FTNODE B, MSN max_msn_applied_to_node) { static void ftnode_finalize_split(FTNODE node, FTNODE B, MSN max_msn_applied_to_node) {
......
...@@ -327,7 +327,7 @@ serialize_ftnode_partition_size (FTNODE node, int i) ...@@ -327,7 +327,7 @@ serialize_ftnode_partition_size (FTNODE node, int i)
return result; return result;
} }
#define FTNODE_PARTITION_OMT_LEAVES 0xaa #define FTNODE_PARTITION_DMT_LEAVES 0xaa
#define FTNODE_PARTITION_FIFO_MSG 0xbb #define FTNODE_PARTITION_FIFO_MSG 0xbb
static void static void
...@@ -374,7 +374,7 @@ serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) { ...@@ -374,7 +374,7 @@ serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
serialize_nonleaf_childinfo(BNC(node, i), &wb); serialize_nonleaf_childinfo(BNC(node, i), &wb);
} }
else { else {
unsigned char ch = FTNODE_PARTITION_OMT_LEAVES; unsigned char ch = FTNODE_PARTITION_DMT_LEAVES;
bn_data* bd = BLB_DATA(node, i); bn_data* bd = BLB_DATA(node, i);
wbuf_nocrc_char(&wb, ch); wbuf_nocrc_char(&wb, ch);
...@@ -1553,7 +1553,7 @@ deserialize_ftnode_partition( ...@@ -1553,7 +1553,7 @@ deserialize_ftnode_partition(
BP_WORKDONE(node, childnum) = 0; BP_WORKDONE(node, childnum) = 0;
} }
else { else {
assert(ch == FTNODE_PARTITION_OMT_LEAVES); assert(ch == FTNODE_PARTITION_DMT_LEAVES);
BLB_SEQINSERT(node, childnum) = 0; BLB_SEQINSERT(node, childnum) = 0;
uint32_t num_entries = rbuf_int(&rb); uint32_t num_entries = rbuf_int(&rb);
// we are now at the first byte of first leafentry // we are now at the first byte of first leafentry
......
...@@ -236,14 +236,14 @@ static inline size_t uxr_unpack_length_and_bit(UXR uxr, uint8_t *p); ...@@ -236,14 +236,14 @@ static inline size_t uxr_unpack_length_and_bit(UXR uxr, uint8_t *p);
static inline size_t uxr_unpack_data(UXR uxr, uint8_t *p); static inline size_t uxr_unpack_data(UXR uxr, uint8_t *p);
static void get_space_for_le( static void get_space_for_le(
bn_data* data_buffer, bn_data* data_buffer,
uint32_t idx, uint32_t idx,
void* keyp, void* keyp,
uint32_t keylen, uint32_t keylen,
uint32_t old_le_size, uint32_t old_le_size,
size_t size, size_t size,
LEAFENTRY* new_le_space LEAFENTRY* new_le_space
) )
{ {
if (data_buffer == NULL) { if (data_buffer == NULL) {
CAST_FROM_VOIDP(*new_le_space, toku_xmalloc(size)); CAST_FROM_VOIDP(*new_le_space, toku_xmalloc(size));
...@@ -889,7 +889,7 @@ update_le_status(ULE ule, size_t memsize) { ...@@ -889,7 +889,7 @@ update_le_status(ULE ule, size_t memsize) {
} }
// Purpose is to return a newly allocated leaf entry in packed format, or // Purpose is to return a newly allocated leaf entry in packed format, or
// return null if leaf entry should be destroyed (if no transaction records // return null if leaf entry should be destroyed (if no transaction records
// are for inserts). // are for inserts).
// Transaction records in packed le are stored inner to outer (first xr is innermost), // Transaction records in packed le are stored inner to outer (first xr is innermost),
// with some information extracted out of the transaction records into the header. // with some information extracted out of the transaction records into the header.
...@@ -927,7 +927,7 @@ le_pack(ULE ule, // data to be packed into new leafentry ...@@ -927,7 +927,7 @@ le_pack(ULE ule, // data to be packed into new leafentry
rval = 0; rval = 0;
goto cleanup; goto cleanup;
} }
found_insert:; found_insert:
memsize = le_memsize_from_ule(ule); memsize = le_memsize_from_ule(ule);
LEAFENTRY new_leafentry; LEAFENTRY new_leafentry;
get_space_for_le(data_buffer, idx, keyp, keylen, old_le_size, memsize, &new_leafentry); get_space_for_le(data_buffer, idx, keyp, keylen, old_le_size, memsize, &new_leafentry);
...@@ -982,7 +982,7 @@ found_insert:; ...@@ -982,7 +982,7 @@ found_insert:;
for (i = 0; i < ule->num_cuxrs; i++) { for (i = 0; i < ule->num_cuxrs; i++) {
p += uxr_pack_length_and_bit(ule->uxrs + ule->num_cuxrs - 1 - i, p); p += uxr_pack_length_and_bit(ule->uxrs + ule->num_cuxrs - 1 - i, p);
} }
//pack interesting values inner to outer //pack interesting values inner to outer
if (ule->num_puxrs!=0) { if (ule->num_puxrs!=0) {
UXR innermost = ule->uxrs + ule->num_cuxrs + ule->num_puxrs - 1; UXR innermost = ule->uxrs + ule->num_cuxrs + ule->num_puxrs - 1;
...@@ -1020,7 +1020,7 @@ found_insert:; ...@@ -1020,7 +1020,7 @@ found_insert:;
size_t bytes_written; size_t bytes_written;
bytes_written = (size_t)p - (size_t)new_leafentry; bytes_written = (size_t)p - (size_t)new_leafentry;
invariant(bytes_written == memsize); invariant(bytes_written == memsize);
#if ULE_DEBUG #if ULE_DEBUG
if (omt) { //Disable recursive debugging. if (omt) { //Disable recursive debugging.
size_t memsize_verify = leafentry_memsize(new_leafentry); size_t memsize_verify = leafentry_memsize(new_leafentry);
......
...@@ -183,6 +183,7 @@ size_t toku_mempool_get_size(const struct mempool *mp) { ...@@ -183,6 +183,7 @@ size_t toku_mempool_get_size(const struct mempool *mp) {
return mp->size; return mp->size;
} }
// TODO(yoni): unify the toku_mempool_get*_size and toku_mempool_get*_space functions (use either size or space but not both)
size_t toku_mempool_get_frag_size(const struct mempool *mp) { size_t toku_mempool_get_frag_size(const struct mempool *mp) {
return mp->frag_size; return mp->frag_size;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment