Commit ebdf618b authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

refs #5710 add accounting for flush count, number of bytes, IO time (stored as...

refs #5710 add accounting for flush count, number of bytes, IO time (stored as tokutime, shown in seconds), uncompressed size, and compressed size. also add these stats for when the logger writes to disk (os_pwrite, not fsync)


git-svn-id: file:///svn/toku/tokudb@50513 c7de825b-a66e-492c-adef-691d508d4ae1
parent 860c7994
......@@ -529,6 +529,7 @@ int toku_serialize_ftnode_to_memory (FTNODE node,
bool do_rebalancing,
bool in_parallel,
/*out*/ size_t *n_bytes_to_write,
/*out*/ size_t *n_uncompresed_bytes,
/*out*/ char **bytes_to_write);
int toku_serialize_ftnode_to(int fd, BLOCKNUM, FTNODE node, FTNODE_DISK_DATA* ndd, bool do_rebalancing, FT h, bool for_checkpoint);
int toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized_log, bool is_serialized,
......@@ -638,6 +639,7 @@ STAT64INFO_S toku_get_and_clear_basement_stats(FTNODE leafnode);
void toku_evict_bn_from_memory(FTNODE node, int childnum, FT h);
void toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe);
void toku_ft_status_update_flush_reason(FTNODE node, uint64_t uncompressed_bytes_flushed, uint64_t bytes_written, tokutime_t write_time, bool for_checkpoint);
extern void toku_ftnode_clone_callback(void* value_data, void** cloned_value_data, PAIR_ATTR* new_attr, bool for_checkpoint, void* write_extraargs);
extern void toku_ftnode_checkpoint_complete_callback(void *value_data);
extern void toku_ftnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *ftnode_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, bool write_me, bool keep_me, bool for_checkpoint, bool is_clone);
......@@ -977,9 +979,21 @@ typedef enum {
FT_SEARCH_TRIES_GT_HEIGHT, // number of searches that required more tries than the height of the tree
FT_SEARCH_TRIES_GT_HEIGHTPLUS3, // number of searches that required more tries than the height of the tree plus three
FT_DISK_FLUSH_LEAF, // number of leaf nodes flushed to disk, not for checkpoint
FT_DISK_FLUSH_LEAF_BYTES, // number of leaf nodes flushed to disk, not for checkpoint
FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES, // number of leaf nodes flushed to disk, not for checkpoint
FT_DISK_FLUSH_LEAF_TOKUTIME, // number of leaf nodes flushed to disk, not for checkpoint
FT_DISK_FLUSH_NONLEAF, // number of nonleaf nodes flushed to disk, not for checkpoint
FT_DISK_FLUSH_NONLEAF_BYTES, // number of nonleaf nodes flushed to disk, not for checkpoint
FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES, // number of nonleaf nodes flushed to disk, not for checkpoint
FT_DISK_FLUSH_NONLEAF_TOKUTIME, // number of nonleaf nodes flushed to disk, not for checkpoint
FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT, // number of leaf nodes flushed to disk for checkpoint
FT_DISK_FLUSH_LEAF_BYTES_FOR_CHECKPOINT, // number of leaf nodes flushed to disk for checkpoint
FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT,// number of leaf nodes flushed to disk for checkpoint
FT_DISK_FLUSH_LEAF_TOKUTIME_FOR_CHECKPOINT,// number of leaf nodes flushed to disk for checkpoint
FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT, // number of nonleaf nodes flushed to disk for checkpoint
FT_DISK_FLUSH_NONLEAF_BYTES_FOR_CHECKPOINT,// number of nonleaf nodes flushed to disk for checkpoint
FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT,// number of nonleaf nodes flushed to disk for checkpoint
FT_DISK_FLUSH_NONLEAF_TOKUTIME_FOR_CHECKPOINT,// number of nonleaf nodes flushed to disk for checkpoint
FT_CREATE_LEAF, // number of leaf nodes created
FT_CREATE_NONLEAF, // number of nonleaf nodes created
FT_DESTROY_LEAF, // number of leaf nodes destroyed
......@@ -999,37 +1013,37 @@ typedef enum {
FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE,
FT_NUM_PIVOTS_FETCHED_QUERY, // how many pivots were fetched for a query
FT_BYTES_PIVOTS_FETCHED_QUERY, // how many pivots were fetched for a query
FT_NANOTIME_PIVOTS_FETCHED_QUERY, // how many pivots were fetched for a query
FT_TOKUTIME_PIVOTS_FETCHED_QUERY, // how many pivots were fetched for a query
FT_NUM_PIVOTS_FETCHED_PREFETCH, // ... for a prefetch
FT_BYTES_PIVOTS_FETCHED_PREFETCH, // ... for a prefetch
FT_NANOTIME_PIVOTS_FETCHED_PREFETCH, // ... for a prefetch
FT_TOKUTIME_PIVOTS_FETCHED_PREFETCH, // ... for a prefetch
FT_NUM_PIVOTS_FETCHED_WRITE, // ... for a write
FT_BYTES_PIVOTS_FETCHED_WRITE, // ... for a write
FT_NANOTIME_PIVOTS_FETCHED_WRITE, // ... for a write
FT_TOKUTIME_PIVOTS_FETCHED_WRITE, // ... for a write
FT_NUM_BASEMENTS_FETCHED_NORMAL, // how many basement nodes were fetched because they were the target of a query
FT_BYTES_BASEMENTS_FETCHED_NORMAL, // how many basement nodes were fetched because they were the target of a query
FT_NANOTIME_BASEMENTS_FETCHED_NORMAL, // how many basement nodes were fetched because they were the target of a query
FT_TOKUTIME_BASEMENTS_FETCHED_NORMAL, // how many basement nodes were fetched because they were the target of a query
FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, // ... because they were between lc and rc
FT_BYTES_BASEMENTS_FETCHED_AGGRESSIVE, // ... because they were between lc and rc
FT_NANOTIME_BASEMENTS_FETCHED_AGGRESSIVE, // ... because they were between lc and rc
FT_TOKUTIME_BASEMENTS_FETCHED_AGGRESSIVE, // ... because they were between lc and rc
FT_NUM_BASEMENTS_FETCHED_PREFETCH,
FT_BYTES_BASEMENTS_FETCHED_PREFETCH,
FT_NANOTIME_BASEMENTS_FETCHED_PREFETCH,
FT_TOKUTIME_BASEMENTS_FETCHED_PREFETCH,
FT_NUM_BASEMENTS_FETCHED_WRITE,
FT_BYTES_BASEMENTS_FETCHED_WRITE,
FT_NANOTIME_BASEMENTS_FETCHED_WRITE,
FT_TOKUTIME_BASEMENTS_FETCHED_WRITE,
FT_NUM_MSG_BUFFER_FETCHED_NORMAL, // how many msg buffers were fetched because they were the target of a query
FT_BYTES_MSG_BUFFER_FETCHED_NORMAL, // how many msg buffers were fetched because they were the target of a query
FT_NANOTIME_MSG_BUFFER_FETCHED_NORMAL, // how many msg buffers were fetched because they were the target of a query
FT_TOKUTIME_MSG_BUFFER_FETCHED_NORMAL, // how many msg buffers were fetched because they were the target of a query
FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, // ... because they were between lc and rc
FT_BYTES_MSG_BUFFER_FETCHED_AGGRESSIVE, // ... because they were between lc and rc
FT_NANOTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, // ... because they were between lc and rc
FT_TOKUTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, // ... because they were between lc and rc
FT_NUM_MSG_BUFFER_FETCHED_PREFETCH,
FT_BYTES_MSG_BUFFER_FETCHED_PREFETCH,
FT_NANOTIME_MSG_BUFFER_FETCHED_PREFETCH,
FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH,
FT_NUM_MSG_BUFFER_FETCHED_WRITE,
FT_BYTES_MSG_BUFFER_FETCHED_WRITE,
FT_NANOTIME_MSG_BUFFER_FETCHED_WRITE,
FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE,
FT_PRO_NUM_ROOT_SPLIT,
FT_PRO_NUM_ROOT_H0_INJECT,
FT_PRO_NUM_ROOT_H1_INJECT,
......
......@@ -198,46 +198,65 @@ status_init(void)
STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH, PARCOUNT, "buffers decompressed for prefetch");
STATUS_INIT(FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE, PARCOUNT, "buffers decompressed for write");
// Disk read statistics.
// Disk read statistics:
//
// Pivots: For queries, prefetching, or writing.
STATUS_INIT(FT_NUM_PIVOTS_FETCHED_QUERY, PARCOUNT, "pivots fetched for query");
STATUS_INIT(FT_BYTES_PIVOTS_FETCHED_QUERY, PARCOUNT, "pivots fetched for query (bytes)");
STATUS_INIT(FT_NANOTIME_PIVOTS_FETCHED_QUERY, PARCOUNT, "pivots fetched for query (seconds)");
STATUS_INIT(FT_TOKUTIME_PIVOTS_FETCHED_QUERY, TOKUTIME, "pivots fetched for query (seconds)");
STATUS_INIT(FT_NUM_PIVOTS_FETCHED_PREFETCH, PARCOUNT, "pivots fetched for prefetch");
STATUS_INIT(FT_BYTES_PIVOTS_FETCHED_PREFETCH, PARCOUNT, "pivots fetched for prefetch (bytes)");
STATUS_INIT(FT_NANOTIME_PIVOTS_FETCHED_PREFETCH, PARCOUNT, "pivots fetched for prefetch (seconds)");
STATUS_INIT(FT_TOKUTIME_PIVOTS_FETCHED_PREFETCH, TOKUTIME, "pivots fetched for prefetch (seconds)");
STATUS_INIT(FT_NUM_PIVOTS_FETCHED_WRITE, PARCOUNT, "pivots fetched for write");
STATUS_INIT(FT_BYTES_PIVOTS_FETCHED_WRITE, PARCOUNT, "pivots fetched for write (bytes)");
STATUS_INIT(FT_NANOTIME_PIVOTS_FETCHED_WRITE, PARCOUNT, "pivots fetched for write (seconds)");
STATUS_INIT(FT_TOKUTIME_PIVOTS_FETCHED_WRITE, TOKUTIME, "pivots fetched for write (seconds)");
// Basements: For queries, aggressive fetching in prelocked range, prefetching, or writing.
STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_NORMAL, PARCOUNT, "basements fetched as a target of a query");
STATUS_INIT(FT_BYTES_BASEMENTS_FETCHED_NORMAL, PARCOUNT, "basements fetched as a target of a query (bytes)");
STATUS_INIT(FT_NANOTIME_BASEMENTS_FETCHED_NORMAL, PARCOUNT, "basements fetched as a target of a query (seconds)");
STATUS_INIT(FT_TOKUTIME_BASEMENTS_FETCHED_NORMAL, TOKUTIME, "basements fetched as a target of a query (seconds)");
STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, PARCOUNT, "basements fetched for prelocked range");
STATUS_INIT(FT_BYTES_BASEMENTS_FETCHED_AGGRESSIVE, PARCOUNT, "basements fetched for prelocked range (bytes)");
STATUS_INIT(FT_NANOTIME_BASEMENTS_FETCHED_AGGRESSIVE, PARCOUNT, "basements fetched for prelocked range (seconds)");
STATUS_INIT(FT_TOKUTIME_BASEMENTS_FETCHED_AGGRESSIVE, TOKUTIME, "basements fetched for prelocked range (seconds)");
STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_PREFETCH, PARCOUNT, "basements fetched for prefetch");
STATUS_INIT(FT_BYTES_BASEMENTS_FETCHED_PREFETCH, PARCOUNT, "basements fetched for prefetch (bytes)");
STATUS_INIT(FT_NANOTIME_BASEMENTS_FETCHED_PREFETCH, PARCOUNT, "basements fetched for prefetch (seconds)");
STATUS_INIT(FT_TOKUTIME_BASEMENTS_FETCHED_PREFETCH, TOKUTIME, "basements fetched for prefetch (seconds)");
STATUS_INIT(FT_NUM_BASEMENTS_FETCHED_WRITE, PARCOUNT, "basements fetched for write");
STATUS_INIT(FT_BYTES_BASEMENTS_FETCHED_WRITE, PARCOUNT, "basements fetched for write (bytes");
STATUS_INIT(FT_NANOTIME_BASEMENTS_FETCHED_WRITE, PARCOUNT, "basements fetched for write (seconds)");
STATUS_INIT(FT_TOKUTIME_BASEMENTS_FETCHED_WRITE, TOKUTIME, "basements fetched for write (seconds)");
// Buffers: For queries, aggressive fetching in prelocked range, prefetching, or writing.
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_NORMAL, PARCOUNT, "buffers fetched as a target of a query");
STATUS_INIT(FT_BYTES_MSG_BUFFER_FETCHED_NORMAL, PARCOUNT, "buffers fetched as a target of a query (bytes)");
STATUS_INIT(FT_NANOTIME_MSG_BUFFER_FETCHED_NORMAL, PARCOUNT, "buffers fetched as a target of a query (seconds)");
STATUS_INIT(FT_TOKUTIME_MSG_BUFFER_FETCHED_NORMAL, TOKUTIME, "buffers fetched as a target of a query (seconds)");
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, PARCOUNT, "buffers fetched for prelocked range");
STATUS_INIT(FT_BYTES_MSG_BUFFER_FETCHED_AGGRESSIVE, PARCOUNT, "buffers fetched for prelocked range (bytes)");
STATUS_INIT(FT_NANOTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, PARCOUNT, "buffers fetched for prelocked range (seconds)");
STATUS_INIT(FT_TOKUTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, TOKUTIME, "buffers fetched for prelocked range (seconds)");
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, PARCOUNT, "buffers fetched for prefetch");
STATUS_INIT(FT_BYTES_MSG_BUFFER_FETCHED_PREFETCH, PARCOUNT, "buffers fetched for prefetch (bytes)");
STATUS_INIT(FT_NANOTIME_MSG_BUFFER_FETCHED_PREFETCH, PARCOUNT, "buffers fetched for prefetch (seconds)");
STATUS_INIT(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, TOKUTIME, "buffers fetched for prefetch (seconds)");
STATUS_INIT(FT_NUM_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write");
STATUS_INIT(FT_BYTES_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write (bytes)");
STATUS_INIT(FT_NANOTIME_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write (seconds)");
STATUS_INIT(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, PARCOUNT, "buffers fetched for write (seconds)");
// Disk write statistics.
//
// Leaf/Nonleaf: Not for checkpoint
STATUS_INIT(FT_DISK_FLUSH_LEAF, PARCOUNT, "leaf nodes flushed to disk (not for checkpoint)");
STATUS_INIT(FT_DISK_FLUSH_LEAF_BYTES, PARCOUNT, "leaf nodes flushed to disk (not for checkpoint) (bytes)");
STATUS_INIT(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES, PARCOUNT, "leaf nodes flushed to disk (not for checkpoint) (uncompressed bytes)");
STATUS_INIT(FT_DISK_FLUSH_LEAF_TOKUTIME, TOKUTIME, "leaf nodes flushed to disk (not for checkpoint) (seconds)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF, PARCOUNT, "nonleaf nodes flushed to disk (not for checkpoint)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_BYTES, PARCOUNT, "nonleaf nodes flushed to disk (not for checkpoint) (bytes)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES, PARCOUNT, "nonleaf nodes flushed to disk (not for checkpoint) (uncompressed bytes)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_TOKUTIME, TOKUTIME, "nonleaf nodes flushed to disk (not for checkpoint) (seconds)");
// Leaf/Nonleaf: For checkpoint
STATUS_INIT(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT, PARCOUNT, "leaf nodes flushed to disk (for checkpoint)");
STATUS_INIT(FT_DISK_FLUSH_LEAF_BYTES_FOR_CHECKPOINT, PARCOUNT, "leaf nodes flushed to disk (for checkpoint) (bytes)");
STATUS_INIT(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, PARCOUNT, "leaf nodes flushed to disk (for checkpoint) (uncompressed bytes)");
STATUS_INIT(FT_DISK_FLUSH_LEAF_TOKUTIME_FOR_CHECKPOINT, TOKUTIME, "leaf nodes flushed to disk (for checkpoint) (seconds)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT, PARCOUNT, "nonleaf nodes flushed to disk (for checkpoint)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_BYTES_FOR_CHECKPOINT, PARCOUNT, "nonleaf nodes flushed to disk (for checkpoint) (bytes)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, PARCOUNT, "nonleaf nodes flushed to disk (for checkpoint) (uncompressed bytes)");
STATUS_INIT(FT_DISK_FLUSH_NONLEAF_TOKUTIME_FOR_CHECKPOINT, TOKUTIME, "nonleaf nodes flushed to disk (for checkpoint) (seconds)");
// Promotion statistics.
STATUS_INIT(FT_PRO_NUM_ROOT_SPLIT, PARCOUNT, "promotion: roots split");
......@@ -270,7 +289,14 @@ toku_ft_get_status(FT_STATUS s) {
*s = ft_status;
}
#define STATUS_INC(x, d) increment_partitioned_counter(ft_status.status[x].value.parcount, d)
#define STATUS_INC(x, d) \
do { \
if (ft_status.status[x].type == PARCOUNT) { \
increment_partitioned_counter(ft_status.status[x].value.parcount, d); \
} else { \
ft_status.status[x].value.num += d; \
} \
} while (0)
bool is_entire_node_in_memory(FTNODE node) {
for (int i = 0; i < node->n_children; i++) {
......@@ -622,21 +648,35 @@ toku_get_and_clear_basement_stats(FTNODE leafnode) {
return deltas;
}
static void ft_status_update_flush_reason(FTNODE node, bool for_checkpoint) {
void toku_ft_status_update_flush_reason(FTNODE node,
uint64_t uncompressed_bytes_flushed, uint64_t bytes_written,
tokutime_t write_time, bool for_checkpoint) {
if (node->height == 0) {
if (for_checkpoint) {
STATUS_INC(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT, 1);
STATUS_INC(FT_DISK_FLUSH_LEAF_BYTES_FOR_CHECKPOINT, bytes_written);
STATUS_INC(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, uncompressed_bytes_flushed);
STATUS_INC(FT_DISK_FLUSH_LEAF_TOKUTIME_FOR_CHECKPOINT, write_time);
}
else {
STATUS_INC(FT_DISK_FLUSH_LEAF, 1);
STATUS_INC(FT_DISK_FLUSH_LEAF_BYTES, bytes_written);
STATUS_INC(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES, uncompressed_bytes_flushed);
STATUS_INC(FT_DISK_FLUSH_LEAF_TOKUTIME, write_time);
}
}
else {
if (for_checkpoint) {
STATUS_INC(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT, 1);
STATUS_INC(FT_DISK_FLUSH_NONLEAF_BYTES_FOR_CHECKPOINT, bytes_written);
STATUS_INC(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, uncompressed_bytes_flushed);
STATUS_INC(FT_DISK_FLUSH_NONLEAF_TOKUTIME_FOR_CHECKPOINT, write_time);
}
else {
STATUS_INC(FT_DISK_FLUSH_NONLEAF, 1);
STATUS_INC(FT_DISK_FLUSH_NONLEAF_BYTES, bytes_written);
STATUS_INC(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES, uncompressed_bytes_flushed);
STATUS_INC(FT_DISK_FLUSH_NONLEAF_TOKUTIME, write_time);
}
}
}
......@@ -777,7 +817,6 @@ void toku_ftnode_flush_callback (
int r = toku_serialize_ftnode_to(fd, ftnode->thisnodename, ftnode, ndd, !is_clone, h, for_checkpoint);
assert_zero(r);
ftnode->layout_version_read_from_disk = FT_LAYOUT_VERSION;
ft_status_update_flush_reason(ftnode, for_checkpoint);
}
if (!keep_me) {
if (!is_clone) {
......@@ -806,15 +845,15 @@ toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe)
if (bfe->type == ftnode_fetch_prefetch) {
STATUS_INC(FT_NUM_PIVOTS_FETCHED_PREFETCH, 1);
STATUS_INC(FT_BYTES_PIVOTS_FETCHED_PREFETCH, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_PIVOTS_FETCHED_PREFETCH, bfe->read_time);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_PREFETCH, bfe->read_time);
} else if (bfe->type == ftnode_fetch_all) {
STATUS_INC(FT_NUM_PIVOTS_FETCHED_WRITE, 1);
STATUS_INC(FT_BYTES_PIVOTS_FETCHED_WRITE, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_PIVOTS_FETCHED_WRITE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_WRITE, bfe->read_time);
} else if (bfe->type == ftnode_fetch_subset) {
STATUS_INC(FT_NUM_PIVOTS_FETCHED_QUERY, 1);
STATUS_INC(FT_BYTES_PIVOTS_FETCHED_QUERY, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_PIVOTS_FETCHED_QUERY, bfe->read_time);
STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_QUERY, bfe->read_time);
}
}
......@@ -1090,7 +1129,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_PREFETCH, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_PREFETCH, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_BASEMENTS_FETCHED_PREFETCH, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_PREFETCH, bfe->read_time);
}
} else if (bfe->type == ftnode_fetch_all) {
if (state == PT_COMPRESSED) {
......@@ -1098,7 +1137,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_WRITE, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_WRITE, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_BASEMENTS_FETCHED_WRITE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_WRITE, bfe->read_time);
}
} else if (childnum == bfe->child_to_read) {
if (state == PT_COMPRESSED) {
......@@ -1106,7 +1145,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_NORMAL, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_NORMAL, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_BASEMENTS_FETCHED_NORMAL, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_NORMAL, bfe->read_time);
}
} else {
if (state == PT_COMPRESSED) {
......@@ -1114,7 +1153,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, 1);
STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_AGGRESSIVE, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_BASEMENTS_FETCHED_AGGRESSIVE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_AGGRESSIVE, bfe->read_time);
}
}
}
......@@ -1125,7 +1164,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_PREFETCH, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_MSG_BUFFER_FETCHED_PREFETCH, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, bfe->read_time);
}
} else if (bfe->type == ftnode_fetch_all) {
if (state == PT_COMPRESSED) {
......@@ -1133,7 +1172,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_WRITE, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_WRITE, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_MSG_BUFFER_FETCHED_WRITE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, bfe->read_time);
}
} else if (childnum == bfe->child_to_read) {
if (state == PT_COMPRESSED) {
......@@ -1141,7 +1180,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_NORMAL, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_NORMAL, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_MSG_BUFFER_FETCHED_NORMAL, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_NORMAL, bfe->read_time);
}
} else {
if (state == PT_COMPRESSED) {
......@@ -1149,7 +1188,7 @@ ft_status_update_partial_fetch_reason(
} else {
STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, 1);
STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->bytes_read);
STATUS_INC(FT_NANOTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->read_time);
STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->read_time);
}
}
}
......@@ -1310,10 +1349,11 @@ toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_c
paranoid_invariant(layout_version != 0);
paranoid_invariant(height >= 0);
if (height == 0)
if (height == 0) {
STATUS_INC(FT_CREATE_LEAF, 1);
else
} else {
STATUS_INC(FT_CREATE_NONLEAF, 1);
}
n->max_msn_applied_to_node_on_disk = ZERO_MSN; // correct value for root node, harmless for others
n->flags = flags;
......
......@@ -762,6 +762,7 @@ toku_serialize_ftnode_to_memory (FTNODE node,
bool do_rebalancing,
bool in_parallel, // for loader is true, for toku_ftnode_flush_callback, is false
/*out*/ size_t *n_bytes_to_write,
/*out*/ size_t *n_uncompresed_bytes,
/*out*/ char **bytes_to_write)
{
toku_assert_entire_node_in_memory(node);
......@@ -804,15 +805,19 @@ toku_serialize_ftnode_to_memory (FTNODE node,
// The total size of the node is:
// size of header + disk size of the n+1 sub_block's created above
uint32_t total_node_size = (serialize_node_header_size(node) // uncomrpessed header
uint32_t total_node_size = (serialize_node_header_size(node) // uncompressed header
+ sb_node_info.compressed_size // compressed nodeinfo (without its checksum)
+ 4); // nodinefo's checksum
uint32_t total_uncompressed_size = (serialize_node_header_size(node) // uncompressed header
+ sb_node_info.uncompressed_size // uncompressed nodeinfo (without its checksum)
+ 4); // nodinefo's checksum
// store the BP_SIZESs
for (int i = 0; i < node->n_children; i++) {
uint32_t len = sb[i].compressed_size + 4; // data and checksum
BP_SIZE (*ndd,i) = len;
BP_START(*ndd,i) = total_node_size;
total_node_size += sb[i].compressed_size + 4;
total_uncompressed_size += sb[i].uncompressed_size + 4;
}
char *XMALLOC_N(total_node_size, data);
......@@ -843,6 +848,7 @@ toku_serialize_ftnode_to_memory (FTNODE node,
assert(curr_ptr - data == total_node_size);
*bytes_to_write = data;
*n_bytes_to_write = total_node_size;
*n_uncompresed_bytes = total_uncompressed_size;
//
// now that node has been serialized, go through sub_block's and free
......@@ -863,6 +869,7 @@ int
toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DATA* ndd, bool do_rebalancing, FT h, bool for_checkpoint) {
size_t n_to_write;
size_t n_uncompressed_bytes;
char *compressed_buf = NULL;
{
// because toku_serialize_ftnode_to is only called for
......@@ -886,27 +893,26 @@ toku_serialize_ftnode_to (int fd, BLOCKNUM blocknum, FTNODE node, FTNODE_DISK_DA
do_rebalancing,
false, // in_parallel
&n_to_write,
&n_uncompressed_bytes,
&compressed_buf
);
if (r!=0) return r;
}
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
{
// If the node has never been written, then write the whole buffer, including the zeros
invariant(blocknum.b>=0);
//printf("%s:%d h=%p\n", __FILE__, __LINE__, h);
//printf("%s:%d translated_blocknum_limit=%lu blocknum.b=%lu\n", __FILE__, __LINE__, h->translated_blocknum_limit, blocknum.b);
//printf("%s:%d allocator=%p\n", __FILE__, __LINE__, h->block_allocator);
//printf("%s:%d bt=%p\n", __FILE__, __LINE__, h->block_translation);
DISKOFF offset;
toku_blocknum_realloc_on_disk(h->blocktable, blocknum, n_to_write, &offset,
h, fd, for_checkpoint); //dirties h
tokutime_t io_t0 = toku_time_now();
toku_os_full_pwrite(fd, compressed_buf, n_to_write, offset);
tokutime_t io_t1 = toku_time_now();
toku_ft_status_update_flush_reason(node, n_uncompressed_bytes, n_to_write, io_t1 - io_t0, for_checkpoint);
}
//printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size);
toku_free(compressed_buf);
node->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
return 0;
......
......@@ -2721,9 +2721,10 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
// serialize leaf to buffer
size_t serialized_leaf_size = 0;
size_t uncompressed_serialized_leaf_size = 0;
char *serialized_leaf = NULL;
FTNODE_DISK_DATA ndd = NULL;
result = toku_serialize_ftnode_to_memory(lbuf->node, &ndd, target_basementnodesize, target_compression_method, true, true, &serialized_leaf_size, &serialized_leaf);
result = toku_serialize_ftnode_to_memory(lbuf->node, &ndd, target_basementnodesize, target_compression_method, true, true, &serialized_leaf_size, &uncompressed_serialized_leaf_size, &serialized_leaf);
// write it out
if (result == 0) {
......@@ -2925,9 +2926,10 @@ static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum
FTNODE_DISK_DATA ndd = NULL;
if (result == 0) {
size_t n_bytes;
size_t n_uncompressed_bytes;
char *bytes;
int r;
r = toku_serialize_ftnode_to_memory(node, &ndd, target_basementnodesize, target_compression_method, true, true, &n_bytes, &bytes);
r = toku_serialize_ftnode_to_memory(node, &ndd, target_basementnodesize, target_compression_method, true, true, &n_bytes, &n_uncompressed_bytes, &bytes);
if (r) {
result = r;
} else {
......
......@@ -91,6 +91,10 @@ struct tokulogger {
uint64_t input_lock_ctr; // how many times has input_lock been taken and released
uint64_t output_condition_lock_ctr; // how many times has output_condition_lock been taken and released
uint64_t swap_ctr; // how many times have input/output log buffers been swapped
uint64_t num_writes_to_disk; // how many times did we write to disk?
uint64_t bytes_written_to_disk; // how many bytes have been written to disk?
tokutime_t time_spent_writing_to_disk; // how much tokutime did we spend writing to disk?
void (*remove_finalize_callback) (DICTIONARY_ID, void*); // ydb-level callback to be called when a transaction that ...
void * remove_finalize_callback_extra; // ... deletes a file is committed or when one that creates a file is aborted.
CACHEFILE rollback_cachefile;
......
......@@ -418,7 +418,14 @@ write_outbuf_to_logfile (TOKULOGGER logger, LSN *fsynced_lsn)
// Entry and exit: Holds permission to modify output (and doesn't let it go, so it's ok to also hold the inlock).
{
if (logger->outbuf.n_in_buf>0) {
// Write the outbuf to disk, take accounting measurements
tokutime_t io_t0 = toku_time_now();
toku_os_full_write(logger->fd, logger->outbuf.buf, logger->outbuf.n_in_buf);
tokutime_t io_t1 = toku_time_now();
logger->num_writes_to_disk++;
logger->bytes_written_to_disk += logger->outbuf.n_in_buf;
logger->time_spent_writing_to_disk += (io_t1 - io_t0);
assert(logger->outbuf.max_lsn_in_buf.lsn > logger->written_lsn.lsn); // since there is something in the buffer, its LSN must be bigger than what's previously written.
logger->written_lsn = logger->outbuf.max_lsn_in_buf;
logger->n_in_file += logger->outbuf.n_in_buf;
......@@ -1280,6 +1287,10 @@ status_init(void) {
STATUS_INIT(LOGGER_ILOCK_CTR, UINT64, "ilock count");
STATUS_INIT(LOGGER_OLOCK_CTR, UINT64, "olock count");
STATUS_INIT(LOGGER_SWAP_CTR, UINT64, "swap count");
STATUS_INIT(LOGGER_NUM_WRITES, UINT64, "writes");
STATUS_INIT(LOGGER_BYTES_WRITTEN, UINT64, "writes (bytes)");
STATUS_INIT(LOGGER_UNCOMPRESSED_BYTES_WRITTEN, UINT64, "writes (uncompressed bytes)");
STATUS_INIT(LOGGER_TOKUTIME_WRITES, TOKUTIME, "writes (seconds)");
logger_status.initialized = true;
}
#undef STATUS_INIT
......@@ -1295,6 +1306,11 @@ toku_logger_get_status(TOKULOGGER logger, LOGGER_STATUS statp) {
STATUS_VALUE(LOGGER_ILOCK_CTR) = logger->input_lock_ctr;
STATUS_VALUE(LOGGER_OLOCK_CTR) = logger->output_condition_lock_ctr;
STATUS_VALUE(LOGGER_SWAP_CTR) = logger->swap_ctr;
STATUS_VALUE(LOGGER_NUM_WRITES) = logger->num_writes_to_disk;
STATUS_VALUE(LOGGER_BYTES_WRITTEN) = logger->bytes_written_to_disk;
// No compression on logfiles so the uncompressed size is just number of bytes written
STATUS_VALUE(LOGGER_UNCOMPRESSED_BYTES_WRITTEN) = logger->bytes_written_to_disk;
STATUS_VALUE(LOGGER_TOKUTIME_WRITES) = logger->time_spent_writing_to_disk;
}
*statp = logger_status;
}
......
......@@ -156,6 +156,10 @@ typedef enum {
LOGGER_ILOCK_CTR,
LOGGER_OLOCK_CTR,
LOGGER_SWAP_CTR,
LOGGER_NUM_WRITES,
LOGGER_BYTES_WRITTEN,
LOGGER_UNCOMPRESSED_BYTES_WRITTEN,
LOGGER_TOKUTIME_WRITES,
LOGGER_STATUS_NUM_ROWS
} logger_status_entry;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment