Commit 9b004763 authored by John Esmet's avatar John Esmet

FT-585 Move serialize and compression size calculations around so we can malloc

one large buffer to serialize a node instead of many smaller ones, which
should hopefully put less pressure on jemalloc during checkpoints etc.
parent effa06ec
......@@ -165,11 +165,12 @@ void toku_compress (enum toku_compression_method a,
assert(1 <= *destLen);
*destLen = 1;
} else {
qlz_state_compress *XCALLOC(qsc);
toku::scoped_calloc qsc_buf(sizeof(qlz_state_compress));
qlz_state_compress *qsc = reinterpret_cast<qlz_state_compress *>(qsc_buf.get());
size_t actual_destlen = qlz_compress(source, (char*)(dest+1), sourceLen, qsc);
assert(actual_destlen +1 <= *destLen);
*destLen = actual_destlen+1; // add one for the rfc1950-style header byte.
toku_free(qsc);
assert(actual_destlen + 1 <= *destLen);
// add one for the rfc1950-style header byte.
*destLen = actual_destlen + 1;
}
// Fill in that first byte
dest[0] = TOKU_QUICKLZ_METHOD + (QLZ_COMPRESSION_LEVEL << 4);
......
......@@ -376,13 +376,11 @@ static void serialize_child_buffer(NONLEAF_CHILDINFO bnc, struct wbuf *wb) {
//
static void
serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
if (sb->uncompressed_ptr == NULL) {
assert(sb->uncompressed_size == 0);
sb->uncompressed_size = serialize_ftnode_partition_size(node,i);
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
} else {
assert(sb->uncompressed_size > 0);
}
// Caller should have allocated memory.
invariant_notnull(sb->uncompressed_ptr);
invariant(sb->uncompressed_size > 0);
paranoid_invariant(sb->uncompressed_size == serialize_ftnode_partition_size(node, i));
//
// Now put the data into sb->uncompressed_ptr
//
......@@ -413,10 +411,10 @@ serialize_ftnode_partition(FTNODE node, int i, struct sub_block *sb) {
//
static void
compress_ftnode_sub_block(struct sub_block *sb, enum toku_compression_method method) {
assert(sb->compressed_ptr == NULL);
set_compressed_size_bound(sb, method);
// add 8 extra bytes, 4 for compressed size, 4 for decompressed size
sb->compressed_ptr = toku_xmalloc(sb->compressed_size_bound + 8);
invariant(sb->compressed_ptr != nullptr);
invariant(sb->compressed_size_bound > 0);
paranoid_invariant(sb->compressed_size_bound == toku_compress_bound(method, sb->uncompressed_size));
//
// This probably seems a bit complicated. Here is what is going on.
// In TokuDB 5.0, sub_blocks were compressed and the compressed data
......@@ -482,13 +480,12 @@ serialize_ftnode_info_size(FTNODE node)
return retval;
}
static void serialize_ftnode_info(FTNODE node,
SUB_BLOCK sb // output
) {
assert(sb->uncompressed_size == 0);
assert(sb->uncompressed_ptr == NULL);
sb->uncompressed_size = serialize_ftnode_info_size(node);
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
static void serialize_ftnode_info(FTNODE node, SUB_BLOCK sb) {
// Memory must have been allocated by our caller.
invariant(sb->uncompressed_size > 0);
invariant_notnull(sb->uncompressed_ptr);
paranoid_invariant(sb->uncompressed_size == serialize_ftnode_info_size(node));
struct wbuf wb;
wbuf_init(&wb, sb->uncompressed_ptr, sb->uncompressed_size);
......@@ -703,24 +700,40 @@ int toku_serialize_ftnode_to_memory(FTNODE node,
// Each partition represents a compressed sub block
// For internal nodes, a sub block is a message buffer
// For leaf nodes, a sub block is a basement node
toku::scoped_malloc sb_buf(sizeof(struct sub_block) * npartitions);
toku::scoped_calloc sb_buf(sizeof(struct sub_block) * npartitions);
struct sub_block *sb = reinterpret_cast<struct sub_block *>(sb_buf.get());
XREALLOC_N(npartitions, *ndd);
struct sub_block sb_node_info;
for (int i = 0; i < npartitions; i++) {
sub_block_init(&sb[i]);;
}
sub_block_init(&sb_node_info);
//
// First, let's serialize and compress the individual sub blocks
//
struct serialize_times st;
memset(&st, 0, sizeof(st));
// determine how large our serialization and compression buffers need to be.
size_t serialize_buf_size = 0, compression_buf_size = 0;
for (int i = 0; i < node->n_children; i++) {
sb[i].uncompressed_size = serialize_ftnode_partition_size(node, i);
sb[i].compressed_size_bound = toku_compress_bound(compression_method, sb[i].uncompressed_size);
serialize_buf_size += sb[i].uncompressed_size;
compression_buf_size += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
}
// give each sub block a base pointer to enough buffer space for serialization and compression
toku::scoped_malloc serialize_buf(serialize_buf_size);
toku::scoped_malloc compression_buf(compression_buf_size);
for (size_t i = 0, uncompressed_offset = 0, compressed_offset = 0; i < (size_t) node->n_children; i++) {
sb[i].uncompressed_ptr = reinterpret_cast<char *>(serialize_buf.get()) + uncompressed_offset;
sb[i].compressed_ptr = reinterpret_cast<char *>(compression_buf.get()) + compressed_offset;
uncompressed_offset += sb[i].uncompressed_size;
compressed_offset += sb[i].compressed_size_bound + 8; // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
invariant(uncompressed_offset <= serialize_buf_size);
invariant(compressed_offset <= compression_buf_size);
}
// do the actual serialization now that we have buffer space
struct serialize_times st = { 0, 0 };
if (in_parallel) {
serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
}
else {
} else {
serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
}
......@@ -728,16 +741,31 @@ int toku_serialize_ftnode_to_memory(FTNODE node,
// Now lets create a sub-block that has the common node information,
// This does NOT include the header
//
// determine how large our serialization and copmression buffers need to be
struct sub_block sb_node_info;
sub_block_init(&sb_node_info);
size_t sb_node_info_uncompressed_size = serialize_ftnode_info_size(node);
size_t sb_node_info_compressed_size_bound = toku_compress_bound(compression_method, sb_node_info_uncompressed_size);
toku::scoped_malloc sb_node_info_uncompressed_buf(sb_node_info_uncompressed_size);
toku::scoped_malloc sb_node_info_compressed_buf(sb_node_info_compressed_size_bound + 8); // add 8 extra bytes, 4 for compressed size, 4 for decompressed size
sb_node_info.uncompressed_size = sb_node_info_uncompressed_size;
sb_node_info.uncompressed_ptr = sb_node_info_uncompressed_buf.get();
sb_node_info.compressed_size_bound = sb_node_info_compressed_size_bound;
sb_node_info.compressed_ptr = sb_node_info_compressed_buf.get();
// do the actual serialization now that we have buffer space
serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);
//
// At this point, we have compressed each of our pieces into individual sub_blocks,
// we can put the header and all the subblocks into a single buffer and return it.
//
// update the serialize times, ignore the header for simplicity. we captured all
// of the partitions' serialize times so that's probably good enough.
toku_ft_status_update_serialize_times(node, st.serialize_time, st.compress_time);
// now we have compressed each of our pieces into individual sub_blocks,
// we can put the header and all the subblocks into a single buffer
// and return it.
// The total size of the node is:
// size of header + disk size of the n+1 sub_block's created above
uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
......@@ -755,11 +783,10 @@ int toku_serialize_ftnode_to_memory(FTNODE node,
total_uncompressed_size += sb[i].uncompressed_size + 4;
}
// now create the final serialized node
uint32_t total_buffer_size = roundup_to_multiple(512, total_node_size); // make the buffer be 512 bytes.
char *XMALLOC_N_ALIGNED(512, total_buffer_size, data);
char *curr_ptr = data;
// now create the final serialized node
// write the header
struct wbuf wb;
......@@ -783,28 +810,15 @@ int toku_serialize_ftnode_to_memory(FTNODE node,
curr_ptr += sizeof(sb[i].xsum);
}
// Zero the rest of the buffer
for (uint32_t i=total_node_size; i<total_buffer_size; i++) {
data[i]=0;
}
memset(data + total_node_size, 0, total_buffer_size - total_node_size);
assert(curr_ptr - data == total_node_size);
*bytes_to_write = data;
*n_bytes_to_write = total_buffer_size;
*n_uncompressed_bytes = total_uncompressed_size;
//
// now that node has been serialized, go through sub_block's and free
// memory
//
toku_free(sb_node_info.compressed_ptr);
toku_free(sb_node_info.uncompressed_ptr);
for (int i = 0; i < npartitions; i++) {
toku_free(sb[i].compressed_ptr);
toku_free(sb[i].uncompressed_ptr);
}
assert(0 == (*n_bytes_to_write)%512);
assert(0 == ((unsigned long long)(*bytes_to_write))%512);
invariant(*n_bytes_to_write % 512 == 0);
invariant(reinterpret_cast<unsigned long long>(*bytes_to_write) % 512 == 0);
return 0;
}
......@@ -1578,8 +1592,9 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
}
// Now decompress the subblock
sb_node_info.uncompressed_ptr = toku_xmalloc(sb_node_info.uncompressed_size);
{
toku::scoped_malloc sb_node_info_buf(sb_node_info.uncompressed_size);
sb_node_info.uncompressed_ptr = sb_node_info_buf.get();
tokutime_t decompress_t0 = toku_time_now();
toku_decompress(
(Bytef *) sb_node_info.uncompressed_ptr,
......@@ -1589,16 +1604,13 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
);
tokutime_t decompress_t1 = toku_time_now();
decompress_time = decompress_t1 - decompress_t0;
}
// at this point sb->uncompressed_ptr stores the serialized node info.
r = deserialize_ftnode_info(&sb_node_info, node);
if (r != 0) {
goto cleanup;
}
toku_free(sb_node_info.uncompressed_ptr);
sb_node_info.uncompressed_ptr = NULL;
}
// Now we have the ftnode_info. We have a bunch more stuff in the
// rbuf, so we might be able to store the compressed data for some
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment