Commit 49425d16 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

closes #5864 add compression, decompression, serialization, deserialization...

closes #5864 add compression, decompression, serialization, deserialization statistics to engine status. all interesting code paths should be covered.


git-svn-id: file:///svn/toku/tokudb@51729 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2c3fd6c1
......@@ -83,10 +83,13 @@ struct ftnode_fetch_extra {
int child_to_read;
// when we read internal nodes, we want to read all the data off disk in one I/O
// then we'll treat it as normal and only decompress the needed partitions etc.
bool read_all_partitions;
// Accounting: How many bytes were fetched, and how much time did it take?
tokutime_t bytes_read;
uint64_t read_time;
// Accounting: How many bytes were read, and how much time did we spend doing I/O?
uint64_t bytes_read;
tokutime_t io_time;
tokutime_t decompress_time;
tokutime_t deserialize_time;
};
struct toku_fifo_entry_key_msn_heaviside_extra {
......@@ -533,7 +536,7 @@ int toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_RO
void toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized);
int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash, ROLLBACK_LOG_NODE *logp, FT h);
int toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, struct ftnode_fetch_extra* bfe);
int toku_deserialize_bp_from_compressed(FTNODE node, int childnum, DESCRIPTOR desc, ft_compare_func cmp);
int toku_deserialize_bp_from_compressed(FTNODE node, int childnum, struct ftnode_fetch_extra *bfe);
int toku_deserialize_ftnode_from (int fd, BLOCKNUM off, uint32_t /*fullhash*/, FTNODE *ftnode, FTNODE_DISK_DATA* ndd, struct ftnode_fetch_extra* bfe);
// <CER> For verifying old, non-upgraded nodes (versions 13 and 14).
......@@ -631,18 +634,21 @@ STAT64INFO_S toku_get_and_clear_basement_stats(FTNODE leafnode);
#define WHEN_FTTRACE(x) ((void)0)
#endif
void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h);
void toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe);
void toku_ft_status_update_flush_reason(FTNODE node, uint64_t uncompressed_bytes_flushed, uint64_t bytes_written, tokutime_t write_time, bool for_checkpoint);
extern void toku_ftnode_clone_callback(void* value_data, void** cloned_value_data, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs);
extern void toku_ftnode_checkpoint_complete_callback(void *value_data);
extern void toku_ftnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *ftnode_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool is_clone);
extern int toku_ftnode_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM nodename, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int*dirty, void*extraargs);
extern void toku_ftnode_pe_est_callback(void* ftnode_pv, void* disk_data, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs);
extern int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR old_attr, PAIR_ATTR* new_attr, void *extraargs);
extern bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs);
void toku_ft_status_update_serialize_times(tokutime_t serialize_time, tokutime_t compress_time);
void toku_ft_status_update_deserialize_times(tokutime_t deserialize_time, tokutime_t decompress_time);
void toku_ftnode_clone_callback(void* value_data, void** cloned_value_data, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs);
void toku_ftnode_checkpoint_complete_callback(void *value_data);
void toku_ftnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *ftnode_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool is_clone);
int toku_ftnode_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM nodename, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int*dirty, void*extraargs);
void toku_ftnode_pe_est_callback(void* ftnode_pv, void* disk_data, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs);
int toku_ftnode_pe_callback (void *ftnode_pv, PAIR_ATTR old_attr, PAIR_ATTR* new_attr, void *extraargs);
bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs);
int toku_ftnode_pf_callback(void* ftnode_pv, void* UU(disk_data), void* read_extraargs, int fd, PAIR_ATTR* sizep);
extern int toku_ftnode_cleaner_callback( void *ftnode_pv, BLOCKNUM blocknum, uint32_t fullhash, void *extraargs);
int toku_ftnode_cleaner_callback( void *ftnode_pv, BLOCKNUM blocknum, uint32_t fullhash, void *extraargs);
void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h);
// Given pinned node and pinned child, split child into two
// and update node with information about its new child.
......@@ -719,7 +725,8 @@ static inline void fill_bfe_for_full_read(struct ftnode_fetch_extra *bfe, FT h)
bfe->disable_prefetching = false;
bfe->read_all_partitions = false;
bfe->bytes_read = 0;
bfe->read_time = 0;
bfe->io_time = 0;
bfe->deserialize_time = 0;
}
//
......@@ -758,7 +765,7 @@ static inline void fill_bfe_for_subset_read(
bfe->disable_prefetching = disable_prefetching;
bfe->read_all_partitions = read_all_partitions;
bfe->bytes_read = 0;
bfe->read_time = 0;
bfe->io_time = 0;
}
//
......@@ -780,7 +787,7 @@ static inline void fill_bfe_for_min_read(struct ftnode_fetch_extra *bfe, FT h) {
bfe->disable_prefetching = false;
bfe->read_all_partitions = false;
bfe->bytes_read = 0;
bfe->read_time = 0;
bfe->io_time = 0;
}
static inline void destroy_bfe_for_prefetch(struct ftnode_fetch_extra *bfe) {
......@@ -813,7 +820,7 @@ static inline void fill_bfe_for_prefetch(struct ftnode_fetch_extra *bfe,
bfe->disable_prefetching = c->disable_prefetching;
bfe->read_all_partitions = false;
bfe->bytes_read = 0;
bfe->read_time = 0;
bfe->io_time = 0;
}
struct ancestors {
......@@ -1037,6 +1044,10 @@ typedef enum {
FT_NUM_MSG_BUFFER_FETCHED_WRITE,
FT_BYTES_MSG_BUFFER_FETCHED_WRITE,
FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE,
FT_NODE_COMPRESS_TOKUTIME, // seconds spent compressing nodes to memory
FT_NODE_SERIALIZE_TOKUTIME, // seconds spent serializing nodes to memory
FT_NODE_DECOMPRESS_TOKUTIME, // seconds spent decompressing nodes to memory
FT_NODE_DESERIALIZE_TOKUTIME, // seconds spent deserializing nodes to memory
FT_PRO_NUM_ROOT_SPLIT,
FT_PRO_NUM_ROOT_H0_INJECT,
FT_PRO_NUM_ROOT_H1_INJECT,
......
......@@ -235,7 +235,7 @@ status_init(void)
STATUS_INIT(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, TOKUTIME, "buffers fetched for prefetch (seconds)");
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write");
STATUS_INIT(FT_BYTES_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write (bytes)");
STATUS_INIT(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write (seconds)");
STATUS_INIT(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, TOKUTIME, "buffers fetched for write (seconds)");
// Disk write statistics.
//
......@@ -258,6 +258,12 @@ status_init(void)
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, PARCOUNT, "nonleaf nodes flushed to disk (for checkpoint) (uncompressed bytes)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_TOKUTIME_FOR_CHECKPOINT, TOKUTIME, "nonleaf nodes flushed to disk (for checkpoint) (seconds)");
// CPU time statistics for [de]serialization and [de]compression.
STATUS_INIT(FT_NODE_COMPRESS_TOKUTIME, TOKUTIME, "node compression to memory (seconds)");
STATUS_INIT(FT_NODE_SERIALIZE_TOKUTIME, TOKUTIME, "node serialization to memory (seconds)");
STATUS_INIT(FT_NODE_DECOMPRESS_TOKUTIME, TOKUTIME, "node decompression to memory (seconds)");
STATUS_INIT(FT_NODE_DESERIALIZE_TOKUTIME, TOKUTIME, "node deserialization to memory (seconds)");
// Promotion statistics.
STATUS_INIT(FT_PRO_NUM_ROOT_SPLIT, PARCOUNT, "promotion: roots split");
STATUS_INIT(FT_PRO_NUM_ROOT_H0_INJECT, PARCOUNT, "promotion: leaf roots injected into");
......@@ -858,15 +864,15 @@ toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe)
if (bfe->type == ftnode_fetch_prefetch) {
STATUS_INC(FT_NUM_PIVOTS_FETCHED_PREFETCH, 1);
STATUS_INC(FT_BYTES_PIVOTS_FETCHED_PREFETCH, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_PREFETCH, bfe->read_time);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_PREFETCH, bfe->io_time);
} else if (bfe->type == ftnode_fetch_all) {
STATUS_INC(FT_NUM_PIVOTS_FETCHED_WRITE, 1);
STATUS_INC(FT_BYTES_PIVOTS_FETCHED_WRITE, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_WRITE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_WRITE, bfe->io_time);
} else if (bfe->type == ftnode_fetch_subset) {
STATUS_INC(FT_NUM_PIVOTS_FETCHED_QUERY, 1);
STATUS_INC(FT_BYTES_PIVOTS_FETCHED_QUERY, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_QUERY, bfe->read_time);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_QUERY, bfe->io_time);
}
}
......@@ -1158,7 +1164,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_PREFETCH, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_PREFETCH, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_PREFETCH, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_PREFETCH, bfe->io_time);
}
} else if (bfe->type == ftnode_fetch_all) {
if (state == PT_COMPRESSED) {
......@@ -1166,7 +1172,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_WRITE, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_WRITE, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_WRITE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_WRITE, bfe->io_time);
}
} else if (childnum == bfe->child_to_read) {
if (state == PT_COMPRESSED) {
......@@ -1174,7 +1180,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_NORMAL, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_NORMAL, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_NORMAL, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_NORMAL, bfe->io_time);
}
} else {
if (state == PT_COMPRESSED) {
......@@ -1182,7 +1188,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_AGGRESSIVE, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_AGGRESSIVE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_AGGRESSIVE, bfe->io_time);
}
}
}
......@@ -1193,7 +1199,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_PREFETCH, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, bfe->io_time);
}
} else if (bfe->type == ftnode_fetch_all) {
if (state == PT_COMPRESSED) {
......@@ -1201,7 +1207,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_WRITE, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_WRITE, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, bfe->io_time);
}
} else if (childnum == bfe->child_to_read) {
if (state == PT_COMPRESSED) {
......@@ -1209,7 +1215,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_NORMAL, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_NORMAL, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_NORMAL, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_NORMAL, bfe->io_time);
}
} else {
if (state == PT_COMPRESSED) {
......@@ -1217,12 +1223,22 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->bytes_read);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->io_time);
}
}
}
}
void toku_ft_status_update_serialize_times(tokutime_t serialize_time, tokutime_t compress_time) {
STATUS_INC(FT_NODE_SERIALIZE_TOKUTIME, serialize_time);
STATUS_INC(FT_NODE_COMPRESS_TOKUTIME, compress_time);
}
void toku_ft_status_update_deserialize_times(tokutime_t deserialize_time, tokutime_t decompress_time) {
STATUS_INC(FT_NODE_DESERIALIZE_TOKUTIME, deserialize_time);
STATUS_INC(FT_NODE_DECOMPRESS_TOKUTIME, decompress_time);
}
// callback for partially reading a node
// could have just used toku_ftnode_fetch_callback, but wanted to separate the two cases to separate functions
int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraargs, int fd, PAIR_ATTR* sizep) {
......@@ -1252,14 +1268,10 @@ int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraar
if ((lc <= i && i <= rc) || toku_bfe_wants_child_available(bfe, i)) {
enum pt_state state = BP_STATE(node, i);
if (state == PT_COMPRESSED) {
r = toku_deserialize_bp_from_compressed(node, i, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
r = toku_deserialize_bp_from_compressed(node, i, bfe);
} else {
invariant(state == PT_ON_DISK);
tokutime_t io_t0 = toku_time_now();
r = toku_deserialize_bp_from_disk(node, ndd, i, fd, bfe);
tokutime_t io_t1 = toku_time_now();
bfe->bytes_read = BP_SIZE(ndd, i);
bfe->read_time = io_t1 - io_t0;
}
ft_status_update_partial_fetch_reason(bfe, i, state, (node->height == 0));
}
......
......@@ -41,20 +41,6 @@ toku_ft_upgrade_get_status(FT_UPGRADE_STATUS s) {
*s = ft_upgrade_status;
}
// performance tracing
#define DO_TOKU_TRACE 0
#if DO_TOKU_TRACE
static inline void do_toku_trace(const char *cp, int len) {
const int toku_trace_fd = -1;
write(toku_trace_fd, cp, len);
}
#define toku_trace(a) do_toku_trace(a, strlen(a))
#else
#define toku_trace(a)
#endif
static int num_cores = 0; // cache the number of cores for the parallelization
static struct toku_thread_pool *ft_pool = NULL;
......@@ -339,7 +325,6 @@ compress_ftnode_sub_block(struct sub_block *sb, enum toku_compression_method met
sb->compressed_size += 8; // now add the eight bytes that we saved for the sizes
sb->xsum = x1764_memory(sb->compressed_ptr,sb->compressed_size);
//
// This is the end result for Dr. No and forward. For ftnodes, sb->compressed_ptr contains
// two integers at the beginning, the size and uncompressed size, and then the compressed
......@@ -408,7 +393,6 @@ static void serialize_ftnode_info(FTNODE node,
invariant(sb->uncompressed_size==wb.ndone);
}
// This is the size of the uncompressed data, not including the compression headers
unsigned int
toku_serialize_ftnode_size (FTNODE node) {
......@@ -639,14 +623,27 @@ rebalance_ftnode_leaf(FTNODE node, unsigned int basementnodesize)
toku_free(num_les_this_bn);
} // end of rebalance_ftnode_leaf()
struct serialize_times {
tokutime_t serialize_time;
tokutime_t compress_time;
};
static void
serialize_and_compress_partition(FTNODE node,
int childnum,
enum toku_compression_method compression_method,
SUB_BLOCK sb)
SUB_BLOCK sb,
struct serialize_times *st)
{
// serialize, compress, update status
tokutime_t t0 = toku_time_now();
serialize_ftnode_partition(node, childnum, sb);
tokutime_t t1 = toku_time_now();
compress_ftnode_sub_block(sb, compression_method);
tokutime_t t2 = toku_time_now();
st->serialize_time += t1 - t0;
st->compress_time += t2 - t1;
}
void
......@@ -657,7 +654,12 @@ toku_create_compressed_partition_from_available(
SUB_BLOCK sb
)
{
serialize_and_compress_partition(node, childnum, compression_method, sb);
struct serialize_times st;
memset(&st, 0, sizeof(st));
serialize_and_compress_partition(node, childnum, compression_method, sb, &st);
toku_ft_status_update_serialize_times(st.serialize_time, st.compress_time);
//
// now we have an sb that would be ready for being written out,
// but we are not writing it out, we are storing it in cache for a potentially
......@@ -678,18 +680,16 @@ toku_create_compressed_partition_from_available(
toku_free(sb->uncompressed_ptr);
sb->uncompressed_ptr = NULL;
}
}
static void
serialize_and_compress_serially(FTNODE node,
int npartitions,
enum toku_compression_method compression_method,
struct sub_block sb[]) {
struct sub_block sb[],
struct serialize_times *st) {
for (int i = 0; i < npartitions; i++) {
serialize_and_compress_partition(node, i, compression_method, &sb[i]);
serialize_and_compress_partition(node, i, compression_method, &sb[i], st);
}
}
......@@ -699,6 +699,7 @@ struct serialize_compress_work {
int i;
enum toku_compression_method compression_method;
struct sub_block *sb;
struct serialize_times st;
};
static void *
......@@ -709,7 +710,7 @@ serialize_and_compress_worker(void *arg) {
if (w == NULL)
break;
int i = w->i;
serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i]);
serialize_and_compress_partition(w->node, i, w->compression_method, &w->sb[i], &w->st);
}
workset_release_ref(ws);
return arg;
......@@ -719,9 +720,10 @@ static void
serialize_and_compress_in_parallel(FTNODE node,
int npartitions,
enum toku_compression_method compression_method,
struct sub_block sb[]) {
struct sub_block sb[],
struct serialize_times *st) {
if (npartitions == 1) {
serialize_and_compress_partition(node, 0, compression_method, &sb[0]);
serialize_and_compress_partition(node, 0, compression_method, &sb[0], st);
} else {
int T = num_cores;
if (T > npartitions)
......@@ -738,7 +740,8 @@ serialize_and_compress_in_parallel(FTNODE node,
.node = node,
.i = i,
.compression_method = compression_method,
.sb = sb };
.sb = sb,
.st = { .serialize_time = 0, .compress_time = 0} };
workset_put_locked(&ws, &work[i].base);
}
workset_unlock(&ws);
......@@ -747,9 +750,28 @@ serialize_and_compress_in_parallel(FTNODE node,
serialize_and_compress_worker(&ws);
workset_join(&ws);
workset_destroy(&ws);
// gather up the statistics from each thread's work item
for (int i = 0; i < npartitions; i++) {
st->serialize_time += work[i].st.serialize_time;
st->compress_time += work[i].st.compress_time;
}
}
}
static void
serialize_and_compress_sb_node_info(FTNODE node, struct sub_block *sb,
enum toku_compression_method compression_method, struct serialize_times *st) {
// serialize, compress, update serialize times.
tokutime_t t0 = toku_time_now();
serialize_ftnode_info(node, sb);
tokutime_t t1 = toku_time_now();
compress_ftnode_sub_block(sb, compression_method);
tokutime_t t2 = toku_time_now();
st->serialize_time += t1 - t0;
st->compress_time += t2 - t1;
}
// Writes out each child to a separate malloc'd buffer, then compresses
// all of them, and writes the uncompressed header, to bytes_to_write,
......@@ -787,18 +809,24 @@ toku_serialize_ftnode_to_memory (FTNODE node,
//
// First, let's serialize and compress the individual sub blocks
//
struct serialize_times st;
memset(&st, 0, sizeof(st));
if (in_parallel) {
serialize_and_compress_in_parallel(node, npartitions, compression_method, sb);
serialize_and_compress_in_parallel(node, npartitions, compression_method, sb, &st);
}
else {
serialize_and_compress_serially(node, npartitions, compression_method, sb);
serialize_and_compress_serially(node, npartitions, compression_method, sb, &st);
}
//
// Now lets create a sub-block that has the common node information,
// This does NOT include the header
//
serialize_ftnode_info(node, &sb_node_info);
compress_ftnode_sub_block(&sb_node_info, compression_method);
serialize_and_compress_sb_node_info(node, &sb_node_info, compression_method, &st);
// update the serialize times, ignore the header for simplicity. we captured all
// of the partitions' serialize times so that's probably good enough.
toku_ft_status_update_serialize_times(st.serialize_time, st.compress_time);
// now we have compressed each of our pieces into individual sub_blocks,
// we can put the header and all the subblocks into a single buffer
......@@ -871,48 +899,49 @@ toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DA
size_t n_to_write;
size_t n_uncompressed_bytes;
char *compressed_buf = NULL;
{
// because toku_serialize_ftnode_to is only called for
// in toku_ftnode_flush_callback, we pass false
// for in_parallel. The reasoning is that when we write
// nodes to disk via toku_ftnode_flush_callback, we
// assume that it is being done on a non-critical
// background thread (probably for checkpointing), and therefore
// should not hog CPU,
//
// Should the above facts change, we may want to revisit
// passing false for in_parallel here
//
// alternatively, we could have made in_parallel a parameter
// for toku_serialize_ftnode_to, but instead we did this.
int r = toku_serialize_ftnode_to_memory(
node,
ndd,
h->h->basementnodesize,
h->h->compression_method,
do_rebalancing,
false, // in_parallel
&n_to_write,
&n_uncompressed_bytes,
&compressed_buf
);
if (r!=0) return r;
char *compressed_buf = nullptr;
// because toku_serialize_ftnode_to is only called for
// in toku_ftnode_flush_callback, we pass false
// for in_parallel. The reasoning is that when we write
// nodes to disk via toku_ftnode_flush_callback, we
// assume that it is being done on a non-critical
// background thread (probably for checkpointing), and therefore
// should not hog CPU,
//
// Should the above facts change, we may want to revisit
// passing false for in_parallel here
//
// alternatively, we could have made in_parallel a parameter
// for toku_serialize_ftnode_to, but instead we did this.
int r = toku_serialize_ftnode_to_memory(
node,
ndd,
h->h->basementnodesize,
h->h->compression_method,
do_rebalancing,
false, // in_parallel
&n_to_write,
&n_uncompressed_bytes,
&compressed_buf
);
if (r != 0) {
return r;
}
{
// If the node has never been written, then write the whole buffer, including the zeros
invariant(blocknum.b>=0);
DISKOFF offset;
// If the node has never been written, then write the whole buffer, including the zeros
invariant(blocknum.b>=0);
DISKOFF offset;
toku_blocknum_realloc_on_disk(h->blocktable, blocknum, n_to_write, &offset,
h, fd, for_checkpoint); //dirties h
toku_blocknum_realloc_on_disk(h->blocktable, blocknum, n_to_write, &offset,
h, fd, for_checkpoint); //dirties h
tokutime_t io_t0 = toku_time_now();
toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
tokutime_t io_t1 = toku_time_now();
toku_ft_status_update_flush_reason(node, n_uncompressed_bytes, n_to_write, io_t1 - io_t0, for_checkpoint);
}
tokutime_t t0 = toku_time_now();
toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
tokutime_t t1 = toku_time_now();
tokutime_t io_time = t1 - t0;
toku_ft_status_update_flush_reason(node, n_uncompressed_bytes, n_to_write, io_time, for_checkpoint);
toku_free(compressed_buf);
node->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
......@@ -945,7 +974,6 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
xids_create_from_buffer(rbuf, &xids);
rbuf_bytes(rbuf, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rbuf, &val, &vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val);
int32_t *dest;
if (cmp) {
if (ft_msg_type_applies_once(type)) {
......@@ -967,7 +995,6 @@ deserialize_child_buffer(NONLEAF_CHILDINFO bnc, struct rbuf *rbuf,
}
r = toku_fifo_enq(bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, dest); /* Copies the data into the fifo */
lazy_assert_zero(r);
//printf("Inserted\n");
xids_destroy(&xids);
}
invariant(rbuf->ndone == rbuf->size);
......@@ -1013,7 +1040,6 @@ dump_bad_block(unsigned char *vp, uint64_t size) {
fprintf(stderr, "\n");
}
////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////
......@@ -1139,25 +1165,26 @@ static const int read_header_heuristic_max = 32*1024;
#define MIN(a,b) (((a)>(b)) ? (b) : (a))
#endif
static void read_ftnode_header_from_fd_into_rbuf_if_small_enough (int fd, BLOCKNUM blocknum, FT h, struct rbuf *rb, struct ftnode_fetch_extra *bfe)
static void read_ftnode_header_from_fd_into_rbuf_if_small_enough (int fd, BLOCKNUM blocknum, FT ft, struct rbuf *rb, struct ftnode_fetch_extra *bfe)
// Effect: If the header part of the node is small enough, then read it into the rbuf. The rbuf will be allocated to be big enough in any case.
{
DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size);
toku_translate_blocknum_to_offset_size(ft->blocktable, blocknum, &offset, &size);
DISKOFF read_size = MIN(read_header_heuristic_max, size);
uint8_t *XMALLOC_N(size, raw_block);
rbuf_init(rb, raw_block, read_size);
{
// read the block
tokutime_t io_t0 = toku_time_now();
ssize_t rlen = toku_os_pread(fd, raw_block, read_size, offset);
tokutime_t io_t1 = toku_time_now();
assert(rlen>=0);
rbuf_init(rb, raw_block, rlen);
bfe->bytes_read = rlen;
bfe->read_time = io_t1 - io_t0;
toku_ft_status_update_pivot_fetch_reason(bfe);
}
// read the block
tokutime_t t0 = toku_time_now();
ssize_t rlen = toku_os_pread(fd, raw_block, read_size, offset);
tokutime_t t1 = toku_time_now();
assert(rlen >= 0);
rbuf_init(rb, raw_block, rlen);
bfe->bytes_read = rlen;
bfe->io_time = t1 - t0;
toku_ft_status_update_pivot_fetch_reason(bfe);
}
//
......@@ -1314,7 +1341,6 @@ setup_available_ftnode_partition(FTNODE node, int i) {
}
}
// Assign the child_to_read member of the bfe from the given brt node
// that has been brought into memory.
static void
......@@ -1335,7 +1361,6 @@ update_bfe_using_ftnode(FTNODE node, struct ftnode_fetch_extra *bfe)
}
}
// Using the search parameters in the bfe, this function will
// initialize all of the given brt node's partitions.
static void
......@@ -1384,7 +1409,6 @@ setup_partitions_using_bfe(FTNODE node,
}
}
static void setup_ftnode_partitions(FTNODE node, struct ftnode_fetch_extra* bfe, bool data_in_memory)
// Effect: Used when reading a ftnode into main memory, this sets up the partitions.
// We set bfe->child_to_read as well as the BP_STATE and the data pointers (e.g., with set_BSB or set_BNULL or other set_ operations).
......@@ -1400,7 +1424,6 @@ static void setup_ftnode_partitions(FTNODE node, struct ftnode_fetch_extra* bfe,
setup_partitions_using_bfe(node, bfe, data_in_memory);
}
/* deserialize the partition from the sub-block's uncompressed buffer
* and destroy the uncompressed buffer
*/
......@@ -1470,16 +1493,18 @@ deserialize_ftnode_partition(
}
static int
decompress_and_deserialize_worker(struct rbuf curr_rbuf, struct sub_block curr_sb, FTNODE node, int child, DESCRIPTOR desc, ft_compare_func cmp)
decompress_and_deserialize_worker(struct rbuf curr_rbuf, struct sub_block curr_sb, FTNODE node, int child,
DESCRIPTOR desc, ft_compare_func cmp, tokutime_t *decompress_time)
{
int r = 0;
tokutime_t t0 = toku_time_now();
r = read_and_decompress_sub_block(&curr_rbuf, &curr_sb);
if (r != 0) {
goto exit;
tokutime_t t1 = toku_time_now();
if (r == 0) {
// at this point, sb->uncompressed_ptr stores the serialized node partition
r = deserialize_ftnode_partition(&curr_sb, node, child, desc, cmp);
}
// at this point, sb->uncompressed_ptr stores the serialized node partition
r = deserialize_ftnode_partition(&curr_sb, node, child, desc, cmp);
exit:
*decompress_time = t1 - t0;
return r;
}
......@@ -1526,6 +1551,13 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
// Return 0 if it worked. If something goes wrong (including that we are looking at some old data format that doesn't have partitions) then return nonzero.
{
int r = 0;
tokutime_t t0, t1;
tokutime_t decompress_time = 0;
tokutime_t deserialize_time = 0;
t0 = toku_time_now();
FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
if (rb->size < 24) {
......@@ -1617,13 +1649,17 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
// Now decompress the subblock
sb_node_info.uncompressed_ptr = toku_xmalloc(sb_node_info.uncompressed_size);
toku_decompress(
(Bytef *) sb_node_info.uncompressed_ptr,
sb_node_info.uncompressed_size,
(Bytef *) sb_node_info.compressed_ptr,
sb_node_info.compressed_size
);
{
tokutime_t decompress_t0 = toku_time_now();
toku_decompress(
(Bytef *) sb_node_info.uncompressed_ptr,
sb_node_info.uncompressed_size,
(Bytef *) sb_node_info.compressed_ptr,
sb_node_info.compressed_size
);
tokutime_t decompress_t1 = toku_time_now();
decompress_time = decompress_t1 - decompress_t0;
}
// at this point sb->uncompressed_ptr stores the serialized node info.
r = deserialize_ftnode_info(&sb_node_info, node);
......@@ -1645,14 +1681,20 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
// for partitions staying compressed, create sub_block
setup_ftnode_partitions(node, bfe, false);
// We must capture deserialize and decompression time before
// the pf_callback, otherwise we would double-count.
t1 = toku_time_now();
deserialize_time = (t1 - t0) - decompress_time;
// do partial fetch if necessary
if (bfe->type != ftnode_fetch_none) {
PAIR_ATTR attr;
r = toku_ftnode_pf_callback(node, *ndd, bfe, fd, &attr);
if (r != 0) {
goto cleanup;
}
}
// handle clock
for (int i = 0; i < node->n_children; i++) {
if (toku_bfe_wants_child_available(bfe, i)) {
......@@ -1664,6 +1706,7 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
r = 0;
cleanup:
toku_ft_status_update_deserialize_times(deserialize_time, decompress_time);
if (r != 0) {
if (node) {
toku_free(*ndd);
......@@ -2174,6 +2217,13 @@ deserialize_ftnode_from_rbuf(
{
int r = 0;
struct sub_block sb_node_info;
tokutime_t t0, t1;
tokutime_t decompress_time = 0;
tokutime_t deserialize_time = 0;
t0 = toku_time_now();
FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
// now start reading from rbuf
......@@ -2234,9 +2284,14 @@ deserialize_ftnode_from_rbuf(
invariant(stored_checksum == checksum);
}
//now we read and decompress the pivot and child information
// now we read and decompress the pivot and child information
sub_block_init(&sb_node_info);
r = read_and_decompress_sub_block(rb, &sb_node_info);
{
tokutime_t sb_decompress_t0 = toku_time_now();
r = read_and_decompress_sub_block(rb, &sb_node_info);
tokutime_t sb_decompress_t1 = toku_time_now();
decompress_time += sb_decompress_t1 - sb_decompress_t0;
}
if (r != 0) {
goto cleanup;
}
......@@ -2290,13 +2345,17 @@ deserialize_ftnode_from_rbuf(
// should be and sets up the memory so that we are ready to use it
switch (BP_STATE(node,i)) {
case PT_AVAIL:
// case where we read and decompress the partition
r = decompress_and_deserialize_worker(curr_rbuf, curr_sb, node, i, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
if (r != 0) {
goto cleanup;
case PT_AVAIL: {
// case where we read and decompress the partition
tokutime_t partition_decompress_time;
r = decompress_and_deserialize_worker(curr_rbuf, curr_sb, node, i,
&bfe->h->cmp_descriptor, bfe->h->compare_fun, &partition_decompress_time);
decompress_time += partition_decompress_time;
if (r != 0) {
goto cleanup;
}
break;
}
break;
case PT_COMPRESSED:
// case where we leave the partition in the compressed state
r = check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i);
......@@ -2311,13 +2370,21 @@ deserialize_ftnode_from_rbuf(
}
*ftnode = node;
r = 0;
cleanup:
t1 = toku_time_now();
deserialize_time = (t1 - t0) - decompress_time;
toku_ft_status_update_deserialize_times(deserialize_time, decompress_time);
if (r != 0) {
// NOTE: Right now, callers higher in the stack will assert on
// failure, so this is OK for production. However, if we
// create tools that use this function to search for errors in
// the BRT, then we will leak memory.
if (node) toku_free(node);
if (node) {
toku_free(node);
}
}
return r;
......@@ -2353,33 +2420,45 @@ toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, i
uint8_t *XMALLOC_N(curr_size, raw_block);
rbuf_init(&rb, raw_block, curr_size);
{
// read the block
ssize_t rlen = toku_os_pread(fd, raw_block, curr_size, node_offset+curr_offset);
// <CER> TODO: Should we return an error for this mismatched size?
lazy_assert((DISKOFF)rlen == curr_size);
}
tokutime_t t0 = toku_time_now();
// read
ssize_t rlen = toku_os_pread(fd, raw_block, curr_size, node_offset+curr_offset);
lazy_assert((DISKOFF)rlen == curr_size);
tokutime_t t1 = toku_time_now();
// decompress
struct sub_block curr_sb;
sub_block_init(&curr_sb);
r = read_and_decompress_sub_block(&rb, &curr_sb);
if (r != 0) {
goto exit;
// deserialize
tokutime_t t2 = toku_time_now();
if (r == 0) {
// at this point, sb->uncompressed_ptr stores the serialized node partition
r = deserialize_ftnode_partition(&curr_sb, node, childnum, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
}
// at this point, sb->uncompressed_ptr stores the serialized node partition
r = deserialize_ftnode_partition(&curr_sb, node, childnum, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
tokutime_t t3 = toku_time_now();
// capture stats
tokutime_t io_time = t1 - t0;
tokutime_t decompress_time = t2 - t1;
tokutime_t deserialize_time = t3 - t2;
toku_ft_status_update_deserialize_times(deserialize_time, decompress_time);
bfe->bytes_read = rlen;
bfe->io_time = io_time;
exit:
toku_free(raw_block);
return r;
}
// Take a ftnode partition that is in the compressed state, and make it avail
int
toku_deserialize_bp_from_compressed(FTNODE node, int childnum,
DESCRIPTOR desc, ft_compare_func cmp) {
toku_deserialize_bp_from_compressed(FTNODE node, int childnum, struct ftnode_fetch_extra *bfe) {
int r = 0;
assert(BP_STATE(node, childnum) == PT_COMPRESSED);
SUB_BLOCK curr_sb = BSB(node, childnum);
......@@ -2389,14 +2468,27 @@ toku_deserialize_bp_from_compressed(FTNODE node, int childnum,
setup_available_ftnode_partition(node, childnum);
BP_STATE(node,childnum) = PT_AVAIL;
// decompress the sub_block
tokutime_t t0 = toku_time_now();
toku_decompress(
(Bytef *) curr_sb->uncompressed_ptr,
curr_sb->uncompressed_size,
(Bytef *) curr_sb->compressed_ptr,
curr_sb->compressed_size
);
r = deserialize_ftnode_partition(curr_sb, node, childnum, desc, cmp);
tokutime_t t1 = toku_time_now();
r = deserialize_ftnode_partition(curr_sb, node, childnum, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
tokutime_t t2 = toku_time_now();
tokutime_t decompress_time = t1 - t0;
tokutime_t deserialize_time = t2 - t1;
toku_ft_status_update_deserialize_times(deserialize_time, decompress_time);
toku_free(curr_sb->compressed_ptr);
toku_free(curr_sb);
return r;
......@@ -2412,11 +2504,20 @@ deserialize_ftnode_from_fd(int fd,
STAT64INFO info)
{
struct rbuf rb = RBUF_INITIALIZER;
tokutime_t t0 = toku_time_now();
read_block_from_fd_into_rbuf(fd, blocknum, bfe->h, &rb);
tokutime_t t1 = toku_time_now();
// Decompress and deserialize the ftnode. Time statistics
// are taken inside this function.
int r = deserialize_ftnode_from_rbuf(ftnode, ndd, blocknum, fullhash, bfe, info, &rb, fd);
if (r != 0) {
dump_bad_block(rb.buf,rb.size);
}
bfe->bytes_read = rb.size;
bfe->io_time = t1 - t0;
toku_free(rb.buf);
return r;
}
......@@ -2432,13 +2533,13 @@ toku_deserialize_ftnode_from (int fd,
)
// Effect: Read a node in. If possible, read just the header.
{
toku_trace("deserial start");
int r = 0;
struct rbuf rb = RBUF_INITIALIZER;
// each function below takes the appropriate io/decompression/deserialize statistics
if (!bfe->read_all_partitions) {
read_ftnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->h, &rb, bfe);
r = deserialize_ftnode_header_from_rbuf_if_small_enough(ftnode, ndd, blocknum, fullhash, bfe, &rb, fd);
} else {
// force us to do it the old way
......@@ -2448,7 +2549,6 @@ toku_deserialize_ftnode_from (int fd,
// Something went wrong, go back to doing it the old way.
r = deserialize_ftnode_from_fd(fd, blocknum, fullhash, ftnode, ndd, bfe, NULL);
}
toku_trace("deserial done");
toku_free(rb.buf);
return r;
......@@ -2521,7 +2621,6 @@ serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calc
lazy_assert(calculated_size==wb.ndone);
}
// TODO: can't fail. assert on ENOMEM for compressed_buf...
static void
serialize_uncompressed_block_to_memory(char * uncompressed_buf,
int n_sub_blocks,
......@@ -2567,7 +2666,6 @@ serialize_uncompressed_block_to_memory(char * uncompressed_buf,
*bytes_to_write = compressed_buf;
}
void
toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized) {
// get the size of the serialized node
......@@ -2723,7 +2821,6 @@ deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknu
int
decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
toku_trace("decompress");
int r = 0;
// get the number of compressed sub blocks
int n_sub_blocks;
......@@ -2806,8 +2903,6 @@ decompress_from_raw_block_into_rbuf(uint8_t *raw_block, size_t raw_block_size, s
goto exit;
}
toku_trace("decompress done");
rb->ndone=0;
exit:
return r;
......@@ -2893,7 +2988,6 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum,
int
toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash,
ROLLBACK_LOG_NODE *logp, FT h) {
toku_trace("deserial start");
int layout_version = 0;
int r;
struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0};
......@@ -2926,8 +3020,6 @@ toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash
r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, fullhash, logp, h, &rb);
toku_trace("deserial done");
cleanup:
if (rb.buf) toku_free(rb.buf);
return r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment