Commit e58fe6dc authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4260], [t:4239], merge to main

git-svn-id: file:///svn/toku/tokudb@37751 c7de825b-a66e-492c-adef-691d508d4ae1
parent ffa2cbac
......@@ -214,6 +214,25 @@ typedef struct __toku_engine_status {
uint64_t msg_bytes_max; /* how many bytes of messages currently in trees (estimate)*/
uint64_t msg_num; /* how many messages injected at root*/
uint64_t msg_num_broadcast; /* how many broadcast messages injected at root*/
uint64_t num_basements_decompressed_normal; /* how many basement nodes were decompressed because they were the target of a query */
uint64_t num_basements_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_decompressed_prefetch;
uint64_t num_basements_decompressed_write;
uint64_t num_msg_buffer_decompressed_normal; /* how many msg buffers were decompressed because they were the target of a query */
uint64_t num_msg_buffer_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_decompressed_prefetch;
uint64_t num_msg_buffer_decompressed_write;
uint64_t num_pivots_fetched_query; /* how many pivots were fetched were fetched for a query */
uint64_t num_pivots_fetched_prefetch; /* ... for a prefetch */
uint64_t num_pivots_fetched_write; /* ... for a write */
uint64_t num_basements_fetched_normal; /* how many basement nodes were fetched because they were the target of a query */
uint64_t num_basements_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_fetched_prefetch;
uint64_t num_basements_fetched_write;
uint64_t num_msg_buffer_fetched_normal; /* how many msg buffers were fetched because they were the target of a query */
uint64_t num_msg_buffer_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_fetched_prefetch;
uint64_t num_msg_buffer_fetched_write;
u_int64_t le_max_committed_xr; /* max committed transaction records in any packed le */
u_int64_t le_max_provisional_xr; /* max provisional transaction records in any packed le */
u_int64_t le_max_memsize; /* max memsize of any packed le */
......@@ -331,6 +350,7 @@ typedef enum {
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 64
#define DB_DBT_MALLOC 4
......
......@@ -214,6 +214,25 @@ typedef struct __toku_engine_status {
uint64_t msg_bytes_max; /* how many bytes of messages currently in trees (estimate)*/
uint64_t msg_num; /* how many messages injected at root*/
uint64_t msg_num_broadcast; /* how many broadcast messages injected at root*/
uint64_t num_basements_decompressed_normal; /* how many basement nodes were decompressed because they were the target of a query */
uint64_t num_basements_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_decompressed_prefetch;
uint64_t num_basements_decompressed_write;
uint64_t num_msg_buffer_decompressed_normal; /* how many msg buffers were decompressed because they were the target of a query */
uint64_t num_msg_buffer_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_decompressed_prefetch;
uint64_t num_msg_buffer_decompressed_write;
uint64_t num_pivots_fetched_query; /* how many pivots were fetched were fetched for a query */
uint64_t num_pivots_fetched_prefetch; /* ... for a prefetch */
uint64_t num_pivots_fetched_write; /* ... for a write */
uint64_t num_basements_fetched_normal; /* how many basement nodes were fetched because they were the target of a query */
uint64_t num_basements_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_fetched_prefetch;
uint64_t num_basements_fetched_write;
uint64_t num_msg_buffer_fetched_normal; /* how many msg buffers were fetched because they were the target of a query */
uint64_t num_msg_buffer_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_fetched_prefetch;
uint64_t num_msg_buffer_fetched_write;
u_int64_t le_max_committed_xr; /* max committed transaction records in any packed le */
u_int64_t le_max_provisional_xr; /* max provisional transaction records in any packed le */
u_int64_t le_max_memsize; /* max memsize of any packed le */
......@@ -332,6 +351,7 @@ typedef enum {
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 64
#define DB_DBT_MALLOC 4
......
......@@ -214,6 +214,25 @@ typedef struct __toku_engine_status {
uint64_t msg_bytes_max; /* how many bytes of messages currently in trees (estimate)*/
uint64_t msg_num; /* how many messages injected at root*/
uint64_t msg_num_broadcast; /* how many broadcast messages injected at root*/
uint64_t num_basements_decompressed_normal; /* how many basement nodes were decompressed because they were the target of a query */
uint64_t num_basements_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_decompressed_prefetch;
uint64_t num_basements_decompressed_write;
uint64_t num_msg_buffer_decompressed_normal; /* how many msg buffers were decompressed because they were the target of a query */
uint64_t num_msg_buffer_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_decompressed_prefetch;
uint64_t num_msg_buffer_decompressed_write;
uint64_t num_pivots_fetched_query; /* how many pivots were fetched were fetched for a query */
uint64_t num_pivots_fetched_prefetch; /* ... for a prefetch */
uint64_t num_pivots_fetched_write; /* ... for a write */
uint64_t num_basements_fetched_normal; /* how many basement nodes were fetched because they were the target of a query */
uint64_t num_basements_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_fetched_prefetch;
uint64_t num_basements_fetched_write;
uint64_t num_msg_buffer_fetched_normal; /* how many msg buffers were fetched because they were the target of a query */
uint64_t num_msg_buffer_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_fetched_prefetch;
uint64_t num_msg_buffer_fetched_write;
u_int64_t le_max_committed_xr; /* max committed transaction records in any packed le */
u_int64_t le_max_provisional_xr; /* max provisional transaction records in any packed le */
u_int64_t le_max_memsize; /* max memsize of any packed le */
......@@ -332,6 +351,7 @@ typedef enum {
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 64
#define DB_DBT_MALLOC 4
......
......@@ -214,6 +214,25 @@ typedef struct __toku_engine_status {
uint64_t msg_bytes_max; /* how many bytes of messages currently in trees (estimate)*/
uint64_t msg_num; /* how many messages injected at root*/
uint64_t msg_num_broadcast; /* how many broadcast messages injected at root*/
uint64_t num_basements_decompressed_normal; /* how many basement nodes were decompressed because they were the target of a query */
uint64_t num_basements_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_decompressed_prefetch;
uint64_t num_basements_decompressed_write;
uint64_t num_msg_buffer_decompressed_normal; /* how many msg buffers were decompressed because they were the target of a query */
uint64_t num_msg_buffer_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_decompressed_prefetch;
uint64_t num_msg_buffer_decompressed_write;
uint64_t num_pivots_fetched_query; /* how many pivots were fetched were fetched for a query */
uint64_t num_pivots_fetched_prefetch; /* ... for a prefetch */
uint64_t num_pivots_fetched_write; /* ... for a write */
uint64_t num_basements_fetched_normal; /* how many basement nodes were fetched because they were the target of a query */
uint64_t num_basements_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_fetched_prefetch;
uint64_t num_basements_fetched_write;
uint64_t num_msg_buffer_fetched_normal; /* how many msg buffers were fetched because they were the target of a query */
uint64_t num_msg_buffer_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_fetched_prefetch;
uint64_t num_msg_buffer_fetched_write;
u_int64_t le_max_committed_xr; /* max committed transaction records in any packed le */
u_int64_t le_max_provisional_xr; /* max provisional transaction records in any packed le */
u_int64_t le_max_memsize; /* max memsize of any packed le */
......@@ -332,6 +351,7 @@ typedef enum {
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 128
#define DB_DBT_MALLOC 4
......
......@@ -214,6 +214,25 @@ typedef struct __toku_engine_status {
uint64_t msg_bytes_max; /* how many bytes of messages currently in trees (estimate)*/
uint64_t msg_num; /* how many messages injected at root*/
uint64_t msg_num_broadcast; /* how many broadcast messages injected at root*/
uint64_t num_basements_decompressed_normal; /* how many basement nodes were decompressed because they were the target of a query */
uint64_t num_basements_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_decompressed_prefetch;
uint64_t num_basements_decompressed_write;
uint64_t num_msg_buffer_decompressed_normal; /* how many msg buffers were decompressed because they were the target of a query */
uint64_t num_msg_buffer_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_decompressed_prefetch;
uint64_t num_msg_buffer_decompressed_write;
uint64_t num_pivots_fetched_query; /* how many pivots were fetched were fetched for a query */
uint64_t num_pivots_fetched_prefetch; /* ... for a prefetch */
uint64_t num_pivots_fetched_write; /* ... for a write */
uint64_t num_basements_fetched_normal; /* how many basement nodes were fetched because they were the target of a query */
uint64_t num_basements_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_fetched_prefetch;
uint64_t num_basements_fetched_write;
uint64_t num_msg_buffer_fetched_normal; /* how many msg buffers were fetched because they were the target of a query */
uint64_t num_msg_buffer_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_fetched_prefetch;
uint64_t num_msg_buffer_fetched_write;
u_int64_t le_max_committed_xr; /* max committed transaction records in any packed le */
u_int64_t le_max_provisional_xr; /* max provisional transaction records in any packed le */
u_int64_t le_max_memsize; /* max memsize of any packed le */
......@@ -332,6 +351,7 @@ typedef enum {
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 2
#define DB_DBT_MALLOC 8
......
......@@ -160,6 +160,7 @@ static void print_defines (void) {
printf("#define DB_PRELOCKED_WRITE 0x00400000\n"); // private tokudb
printf("#define DB_PRELOCKED_FILE_READ 0x00200000\n"); // private tokudb
printf("#define DB_IS_HOT_INDEX 0x00100000\n"); // private tokudb
printf("#define DBC_DISABLE_PREFETCHING 0x20000000\n"); // private tokudb
{
//dbt flags
......@@ -609,6 +610,25 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
printf(" uint64_t msg_bytes_max; /* how many bytes of messages currently in trees (estimate)*/\n");
printf(" uint64_t msg_num; /* how many messages injected at root*/\n");
printf(" uint64_t msg_num_broadcast; /* how many broadcast messages injected at root*/\n");
printf(" uint64_t num_basements_decompressed_normal; /* how many basement nodes were decompressed because they were the target of a query */\n");
printf(" uint64_t num_basements_decompressed_aggressive; /* ... because they were between lc and rc */\n");
printf(" uint64_t num_basements_decompressed_prefetch;\n");
printf(" uint64_t num_basements_decompressed_write;\n");
printf(" uint64_t num_msg_buffer_decompressed_normal; /* how many msg buffers were decompressed because they were the target of a query */\n");
printf(" uint64_t num_msg_buffer_decompressed_aggressive; /* ... because they were between lc and rc */\n");
printf(" uint64_t num_msg_buffer_decompressed_prefetch;\n");
printf(" uint64_t num_msg_buffer_decompressed_write;\n");
printf(" uint64_t num_pivots_fetched_query; /* how many pivots were fetched were fetched for a query */\n");
printf(" uint64_t num_pivots_fetched_prefetch; /* ... for a prefetch */\n");
printf(" uint64_t num_pivots_fetched_write; /* ... for a write */\n");
printf(" uint64_t num_basements_fetched_normal; /* how many basement nodes were fetched because they were the target of a query */\n");
printf(" uint64_t num_basements_fetched_aggressive; /* ... because they were between lc and rc */\n");
printf(" uint64_t num_basements_fetched_prefetch;\n");
printf(" uint64_t num_basements_fetched_write;\n");
printf(" uint64_t num_msg_buffer_fetched_normal; /* how many msg buffers were fetched because they were the target of a query */\n");
printf(" uint64_t num_msg_buffer_fetched_aggressive; /* ... because they were between lc and rc */\n");
printf(" uint64_t num_msg_buffer_fetched_prefetch;\n");
printf(" uint64_t num_msg_buffer_fetched_write;\n");
printf(" u_int64_t le_max_committed_xr; /* max committed transaction records in any packed le */ \n");
printf(" u_int64_t le_max_provisional_xr; /* max provisional transaction records in any packed le */ \n");
printf(" u_int64_t le_max_memsize; /* max memsize of any packed le */ \n");
......
......@@ -214,6 +214,25 @@ typedef struct __toku_engine_status {
uint64_t msg_bytes_max; /* how many bytes of messages currently in trees (estimate)*/
uint64_t msg_num; /* how many messages injected at root*/
uint64_t msg_num_broadcast; /* how many broadcast messages injected at root*/
uint64_t num_basements_decompressed_normal; /* how many basement nodes were decompressed because they were the target of a query */
uint64_t num_basements_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_decompressed_prefetch;
uint64_t num_basements_decompressed_write;
uint64_t num_msg_buffer_decompressed_normal; /* how many msg buffers were decompressed because they were the target of a query */
uint64_t num_msg_buffer_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_decompressed_prefetch;
uint64_t num_msg_buffer_decompressed_write;
uint64_t num_pivots_fetched_query; /* how many pivots were fetched were fetched for a query */
uint64_t num_pivots_fetched_prefetch; /* ... for a prefetch */
uint64_t num_pivots_fetched_write; /* ... for a write */
uint64_t num_basements_fetched_normal; /* how many basement nodes were fetched because they were the target of a query */
uint64_t num_basements_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_fetched_prefetch;
uint64_t num_basements_fetched_write;
uint64_t num_msg_buffer_fetched_normal; /* how many msg buffers were fetched because they were the target of a query */
uint64_t num_msg_buffer_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_fetched_prefetch;
uint64_t num_msg_buffer_fetched_write;
u_int64_t le_max_committed_xr; /* max committed transaction records in any packed le */
u_int64_t le_max_provisional_xr; /* max provisional transaction records in any packed le */
u_int64_t le_max_memsize; /* max memsize of any packed le */
......@@ -332,6 +351,7 @@ typedef enum {
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 2
#define DB_DBT_MALLOC 8
......
......@@ -214,6 +214,25 @@ typedef struct __toku_engine_status {
uint64_t msg_bytes_max; /* how many bytes of messages currently in trees (estimate)*/
uint64_t msg_num; /* how many messages injected at root*/
uint64_t msg_num_broadcast; /* how many broadcast messages injected at root*/
uint64_t num_basements_decompressed_normal; /* how many basement nodes were decompressed because they were the target of a query */
uint64_t num_basements_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_decompressed_prefetch;
uint64_t num_basements_decompressed_write;
uint64_t num_msg_buffer_decompressed_normal; /* how many msg buffers were decompressed because they were the target of a query */
uint64_t num_msg_buffer_decompressed_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_decompressed_prefetch;
uint64_t num_msg_buffer_decompressed_write;
uint64_t num_pivots_fetched_query; /* how many pivots were fetched were fetched for a query */
uint64_t num_pivots_fetched_prefetch; /* ... for a prefetch */
uint64_t num_pivots_fetched_write; /* ... for a write */
uint64_t num_basements_fetched_normal; /* how many basement nodes were fetched because they were the target of a query */
uint64_t num_basements_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_basements_fetched_prefetch;
uint64_t num_basements_fetched_write;
uint64_t num_msg_buffer_fetched_normal; /* how many msg buffers were fetched because they were the target of a query */
uint64_t num_msg_buffer_fetched_aggressive; /* ... because they were between lc and rc */
uint64_t num_msg_buffer_fetched_prefetch;
uint64_t num_msg_buffer_fetched_write;
u_int64_t le_max_committed_xr; /* max committed transaction records in any packed le */
u_int64_t le_max_provisional_xr; /* max provisional transaction records in any packed le */
u_int64_t le_max_memsize; /* max memsize of any packed le */
......@@ -332,6 +351,7 @@ typedef enum {
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 2
#define DB_DBT_MALLOC 8
......
......@@ -79,6 +79,14 @@ struct brtnode_fetch_extra {
brt_search_t* search;
DBT *range_lock_left_key, *range_lock_right_key;
BOOL left_is_neg_infty, right_is_pos_infty;
// states if we should try to aggressively fetch basement nodes
// that are not specifically needed for current query,
// but may be needed for other cursor operations user is doing
// For example, if we have not disabled prefetching,
// and the user is doing a dictionary wide scan, then
// even though a query may only want one basement node,
// we fetch all basement nodes in a leaf node.
BOOL disable_prefetching;
// this value will be set during the fetch_callback call by toku_brtnode_fetch_callback or toku_brtnode_pf_req_callback
// thi callbacks need to evaluate this anyway, so we cache it here so the search code does not reevaluate it
int child_to_read;
......@@ -485,6 +493,7 @@ struct brtenv {
long long checksum_number;
};
void toku_brt_status_update_pivot_fetch_reason(struct brtnode_fetch_extra *bfe);
extern void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, BOOL write_me, BOOL keep_me, BOOL for_checkpoint);
extern int toku_brtnode_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, PAIR_ATTR *sizep, int*dirty, void*extraargs);
extern void toku_brtnode_pe_est_callback(void* brtnode_pv, long* bytes_freed_estimate, enum partial_eviction_cost *cost, void* write_extraargs);
......@@ -522,6 +531,7 @@ struct brt_cursor {
BOOL left_is_neg_infty, right_is_pos_infty;
BOOL is_snapshot_read; // true if query is read_committed, false otherwise
BOOL is_leaf_mode;
BOOL disable_prefetching;
TOKUTXN ttxn;
struct brt_cursor_leaf_info leaf_info;
};
......@@ -541,6 +551,7 @@ static inline void fill_bfe_for_full_read(struct brtnode_fetch_extra *bfe, struc
bfe->left_is_neg_infty = FALSE;
bfe->right_is_pos_infty = FALSE;
bfe->child_to_read = -1;
bfe->disable_prefetching = FALSE;
}
//
......@@ -556,7 +567,8 @@ static inline void fill_bfe_for_subset_read(
DBT *left,
DBT *right,
BOOL left_is_neg_infty,
BOOL right_is_pos_infty
BOOL right_is_pos_infty,
BOOL disable_prefetching
)
{
bfe->type = brtnode_fetch_subset;
......@@ -567,6 +579,7 @@ static inline void fill_bfe_for_subset_read(
bfe->left_is_neg_infty = left_is_neg_infty;
bfe->right_is_pos_infty = right_is_pos_infty;
bfe->child_to_read = -1;
bfe->disable_prefetching = disable_prefetching;
}
//
......@@ -584,6 +597,7 @@ static inline void fill_bfe_for_min_read(struct brtnode_fetch_extra *bfe, struct
bfe->left_is_neg_infty = FALSE;
bfe->right_is_pos_infty = FALSE;
bfe->child_to_read = -1;
bfe->disable_prefetching = FALSE;
}
static inline void destroy_bfe_for_prefetch(struct brtnode_fetch_extra *bfe) {
......@@ -628,6 +642,7 @@ static inline void fill_bfe_for_prefetch(struct brtnode_fetch_extra *bfe,
bfe->left_is_neg_infty = c->left_is_neg_infty;
bfe->right_is_pos_infty = c->right_is_pos_infty;
bfe->child_to_read = -1;
bfe->disable_prefetching = c->disable_prefetching;
}
struct ancestors {
......@@ -787,6 +802,25 @@ struct brt_status {
uint64_t msg_bytes_max; // how many bytes of messages currently in trees (estimate)
uint64_t msg_num; // how many messages injected at root
uint64_t msg_num_broadcast; // how many broadcast messages injected at root
uint64_t num_basements_decompressed_normal; // how many basement nodes were decompressed because they were the target of a query
uint64_t num_basements_decompressed_aggressive; // ... because they were between lc and rc
uint64_t num_basements_decompressed_prefetch;
uint64_t num_basements_decompressed_write;
uint64_t num_msg_buffer_decompressed_normal; // how many msg buffers were decompressed because they were the target of a query
uint64_t num_msg_buffer_decompressed_aggressive; // ... because they were between lc and rc
uint64_t num_msg_buffer_decompressed_prefetch;
uint64_t num_msg_buffer_decompressed_write;
uint64_t num_pivots_fetched_query; // how many pivots were fetched for a query
uint64_t num_pivots_fetched_prefetch; // ... for a prefetch
uint64_t num_pivots_fetched_write; // ... for a write
uint64_t num_basements_fetched_normal; // how many basement nodes were fetched because they were the target of a query
uint64_t num_basements_fetched_aggressive; // ... because they were between lc and rc
uint64_t num_basements_fetched_prefetch;
uint64_t num_basements_fetched_write;
uint64_t num_msg_buffer_fetched_normal; // how many msg buffers were fetched because they were the target of a query
uint64_t num_msg_buffer_fetched_aggressive; // ... because they were between lc and rc
uint64_t num_msg_buffer_fetched_prefetch;
uint64_t num_msg_buffer_fetched_write;
};
void toku_brt_get_status(BRT_STATUS);
......
......@@ -1273,6 +1273,7 @@ static void setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* b
BP_STATE(node, i) = PT_ON_DISK;
}
BP_WORKDONE(node,i) = 0;
switch (BP_STATE(node,i)) {
case PT_AVAIL:
setup_available_brtnode_partition(node, i);
......@@ -1413,8 +1414,9 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
node->layout_version_original = rbuf_int(rb);
node->build_id = rbuf_int(rb);
node->n_children = rbuf_int(rb);
// Guaranteed to be have been able to read up to here. If n_children is too big, we may have a problem, so check that we won't overflow while
// reading the partition locations.
// Guaranteed to be have been able to read up to here. If n_children
// is too big, we may have a problem, so check that we won't overflow
// while reading the partition locations.
unsigned int nhsize = serialize_node_header_size(node); // we can do this because n_children is filled in.
unsigned int needed_size = nhsize + 12; // we need 12 more so that we can read the compressed block size information that follows for the nodeinfo.
if (needed_size > rb->size) {
......@@ -1446,6 +1448,7 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
goto cleanup;
}
// We got the entire header and node info!
toku_brt_status_update_pivot_fetch_reason(bfe);
// Finish reading compressed the sub_block
bytevec* cp = (bytevec*)&sb_node_info.compressed_ptr;
......@@ -1471,7 +1474,9 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
toku_free(sb_node_info.uncompressed_ptr);
sb_node_info.uncompressed_ptr = NULL;
// Now we have the brtnode_info. We have a bunch more stuff in the rbuf, so we might be able to store the compressed data for some objects.
// Now we have the brtnode_info. We have a bunch more stuff in the
// rbuf, so we might be able to store the compressed data for some
// objects.
// We can proceed to deserialize the individual subblocks.
assert(bfe->type == brtnode_fetch_none || bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_all || bfe->type == brtnode_fetch_prefetch);
......@@ -1480,29 +1485,17 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
// for partitions staying compressed, create sub_block
setup_brtnode_partitions(node, bfe, false);
// determine the range to preetch
int lc, rc;
if (bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_prefetch) {
lc = toku_bfe_leftmost_child_wanted(bfe, node);
rc = toku_bfe_rightmost_child_wanted(bfe, node);
} else {
lc = -1;
rc = -1;
if (bfe->type != brtnode_fetch_none) {
PAIR_ATTR attr;
toku_brtnode_pf_callback(node, bfe, fd, &attr);
}
cilk_for (int i = 0; i < node->n_children; i++) {
assert(BP_STATE(node, i) == PT_ON_DISK);
// We only touch the clock for basement nodes that the bfe wants,
// and not basement nodes that the are being prefetched
// handle clock
for (int i = 0; i < node->n_children; i++) {
if (toku_bfe_wants_child_available(bfe, i)) {
assert(BP_STATE(node,i) == PT_AVAIL);
BP_TOUCH_CLOCK(node,i);
}
if ((lc <= i && i <= rc) || toku_bfe_wants_child_available(bfe, i)) {
assert(BP_STATE(node,i) == PT_ON_DISK);
toku_deserialize_bp_from_disk(node, i, fd, bfe);
}
}
*brtnode = node;
r = 0;
......
......@@ -538,6 +538,17 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
}
void
toku_brt_status_update_pivot_fetch_reason(struct brtnode_fetch_extra *bfe)
{
if (bfe->type == brtnode_fetch_prefetch) {
brt_status.num_pivots_fetched_prefetch++;
} else if (bfe->type == brtnode_fetch_all) {
brt_status.num_pivots_fetched_write++;
} else if (bfe->type == brtnode_fetch_subset) {
brt_status.num_pivots_fetched_query++;
}
}
//fd is protected (must be holding fdlock)
int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM nodename, u_int32_t fullhash,
......@@ -546,8 +557,9 @@ int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM noden
assert(*brtnode_pv == NULL);
struct brtnode_fetch_extra *bfe = (struct brtnode_fetch_extra *)extraargs;
BRTNODE *result=(BRTNODE*)brtnode_pv;
// deserialize the node, must pass the bfe in because we cannot evaluate what piece of the
// the node is necessary until we get it at least partially into memory
// deserialize the node, must pass the bfe in because we cannot
// evaluate what piece of the the node is necessary until we get it at
// least partially into memory
int r = toku_deserialize_brtnode_from(fd, nodename, fullhash, result, bfe);
if (r == 0) {
*sizep = make_brtnode_pair_attr(*result);
......@@ -781,6 +793,9 @@ BOOL toku_brtnode_pf_req_callback(void* brtnode_pv, void* read_extraargs) {
brt_status_update_partial_fetch(BP_STATE(node, bfe->child_to_read));
}
else if (bfe->type == brtnode_fetch_prefetch) {
// makes no sense to have prefetching disabled
// and still call this function
assert(!bfe->disable_prefetching);
int lc = toku_bfe_leftmost_child_wanted(bfe, node);
int rc = toku_bfe_rightmost_child_wanted(bfe, node);
for (int i = lc; i <= rc; ++i) {
......@@ -797,6 +812,71 @@ BOOL toku_brtnode_pf_req_callback(void* brtnode_pv, void* read_extraargs) {
return retval;
}
static void
brt_status_update_partial_fetch_reason(
struct brtnode_fetch_extra *bfe,
int i,
int state,
BOOL is_leaf
)
{
invariant(state == PT_COMPRESSED || state == PT_ON_DISK);
if (is_leaf) {
if (bfe->type == brtnode_fetch_prefetch) {
if (state == PT_COMPRESSED) {
brt_status.num_basements_decompressed_prefetch++;
} else {
brt_status.num_basements_fetched_prefetch++;
}
} else if (bfe->type == brtnode_fetch_all) {
if (state == PT_COMPRESSED) {
brt_status.num_basements_decompressed_write++;
} else {
brt_status.num_basements_fetched_write++;
}
} else if (i == bfe->child_to_read) {
if (state == PT_COMPRESSED) {
brt_status.num_basements_decompressed_normal++;
} else {
brt_status.num_basements_fetched_normal++;
}
} else {
if (state == PT_COMPRESSED) {
brt_status.num_basements_decompressed_aggressive++;
} else {
brt_status.num_basements_fetched_aggressive++;
}
}
}
else {
if (bfe->type == brtnode_fetch_prefetch) {
if (state == PT_COMPRESSED) {
brt_status.num_msg_buffer_decompressed_prefetch++;
} else {
brt_status.num_msg_buffer_fetched_prefetch++;
}
} else if (bfe->type == brtnode_fetch_all) {
if (state == PT_COMPRESSED) {
brt_status.num_msg_buffer_decompressed_write++;
} else {
brt_status.num_msg_buffer_fetched_write++;
}
} else if (i == bfe->child_to_read) {
if (state == PT_COMPRESSED) {
brt_status.num_msg_buffer_decompressed_normal++;
} else {
brt_status.num_msg_buffer_fetched_normal++;
}
} else {
if (state == PT_COMPRESSED) {
brt_status.num_msg_buffer_decompressed_aggressive++;
} else {
brt_status.num_msg_buffer_fetched_aggressive++;
}
}
}
}
// callback for partially reading a node
// could have just used toku_brtnode_fetch_callback, but wanted to separate the two cases to separate functions
int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, PAIR_ATTR* sizep) {
......@@ -807,7 +887,10 @@ int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, PAI
assert((bfe->type == brtnode_fetch_subset) || (bfe->type == brtnode_fetch_all) || (bfe->type == brtnode_fetch_prefetch));
// determine the range to prefetch
int lc, rc;
if (bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_prefetch) {
if (!bfe->disable_prefetching &&
(bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_prefetch)
)
{
lc = toku_bfe_leftmost_child_wanted(bfe, node);
rc = toku_bfe_rightmost_child_wanted(bfe, node);
} else {
......@@ -821,6 +904,7 @@ int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, PAI
continue;
}
if ((lc <= i && i <= rc) || toku_bfe_wants_child_available(bfe, i)) {
brt_status_update_partial_fetch_reason(bfe, i, BP_STATE(node, i), (node->height == 0));
if (BP_STATE(node,i) == PT_COMPRESSED) {
cilk_spawn toku_deserialize_bp_from_compressed(node, i, &bfe->h->descriptor, bfe->h->compare_fun);
}
......@@ -3843,7 +3927,8 @@ int toku_brt_cursor (
BRT brt,
BRT_CURSOR *cursorptr,
TOKUTXN ttxn,
BOOL is_snapshot_read
BOOL is_snapshot_read,
BOOL disable_prefetching
)
{
if (is_snapshot_read) {
......@@ -3868,6 +3953,7 @@ int toku_brt_cursor (
cursor->is_snapshot_read = is_snapshot_read;
cursor->is_leaf_mode = FALSE;
cursor->ttxn = ttxn;
cursor->disable_prefetching = disable_prefetching;
toku_list_push(&brt->cursors, &cursor->cursors_link);
*cursorptr = cursor;
return 0;
......@@ -4543,7 +4629,7 @@ brt_node_maybe_prefetch(BRT brt, BRTNODE node, int childnum, BRT_CURSOR brtcurso
// if we want to prefetch in the tree
// then prefetch the next children if there are any
if (*doprefetch && brt_cursor_prefetching(brtcursor)) {
if (*doprefetch && brt_cursor_prefetching(brtcursor) && !brtcursor->disable_prefetching) {
int rc = brt_cursor_rightmost_child_wanted(brtcursor, brt, node);
for (int i = childnum + 1; (i <= childnum + TOKU_DO_PREFETCH) && (i <= rc); i++) {
BLOCKNUM nextchildblocknum = BP_BLOCKNUM(node, i);
......@@ -4612,7 +4698,8 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
&brtcursor->range_lock_left_key,
&brtcursor->range_lock_right_key,
brtcursor->left_is_neg_infty,
brtcursor->right_is_pos_infty
brtcursor->right_is_pos_infty,
brtcursor->disable_prefetching
);
{
int rr = toku_pin_brtnode(brt, childblocknum, fullhash,
......@@ -4858,7 +4945,8 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
&brtcursor->range_lock_left_key,
&brtcursor->range_lock_right_key,
brtcursor->left_is_neg_infty,
brtcursor->right_is_pos_infty
brtcursor->right_is_pos_infty,
brtcursor->disable_prefetching
);
r = toku_pin_brtnode(brt, *rootp, fullhash,(UNLOCKERS)NULL,(ANCESTORS)NULL, &infinite_bounds, &bfe, TRUE, &node);
assert(r==0 || r== TOKUDB_TRY_AGAIN);
......@@ -5015,7 +5103,7 @@ int
toku_brt_flatten(BRT brt, TOKUTXN ttxn)
{
BRT_CURSOR tmp_cursor;
int r = toku_brt_cursor(brt, &tmp_cursor, ttxn, FALSE);
int r = toku_brt_cursor(brt, &tmp_cursor, ttxn, FALSE, FALSE);
if (r!=0) return r;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, tmp_cursor->brt);
r = brt_cursor_search(tmp_cursor, &search, brt_flatten_getf, NULL, FALSE);
......@@ -5281,7 +5369,7 @@ toku_brt_lookup (BRT brt, DBT *k, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v)
int r, rr;
BRT_CURSOR cursor;
rr = toku_brt_cursor(brt, &cursor, NULL, FALSE);
rr = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
if (rr != 0) return rr;
int op = DB_SET;
......
......@@ -186,7 +186,7 @@ int toku_verify_brt_with_progress (BRT brt, int (*progress_callback)(void *extra
//int show_brt_blocknumbers(BRT);
typedef struct brt_cursor *BRT_CURSOR;
int toku_brt_cursor (BRT, BRT_CURSOR*, TOKUTXN, BOOL) __attribute__ ((warn_unused_result));
int toku_brt_cursor (BRT, BRT_CURSOR*, TOKUTXN, BOOL, BOOL) __attribute__ ((warn_unused_result));
void toku_brt_cursor_set_leaf_mode(BRT_CURSOR);
int toku_brt_cursor_is_leaf_mode(BRT_CURSOR);
void toku_brt_cursor_set_range_lock(BRT_CURSOR, const DBT *, const DBT *, BOOL, BOOL);
......
......@@ -23,7 +23,7 @@ le_cursor_create(LE_CURSOR *le_cursor_result, BRT brt, TOKUTXN txn) {
if (le_cursor == NULL)
result = errno;
else {
result = toku_brt_cursor(brt, &le_cursor->brt_cursor, txn, FALSE);
result = toku_brt_cursor(brt, &le_cursor->brt_cursor, txn, FALSE, FALSE);
if (result == 0) {
// TODO move the leaf mode to the brt cursor constructor
toku_brt_cursor_set_leaf_mode(le_cursor->brt_cursor);
......
......@@ -19,7 +19,6 @@ int64_key_cmp (DB *db UU(), const DBT *a, const DBT *b) {
return 0;
}
#if 0
static void
test_prefetch_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
int r;
......@@ -33,9 +32,30 @@ test_prefetch_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
memset(&cursor->range_lock_right_key, 0 , sizeof(DBT));
cursor->left_is_neg_infty = TRUE;
cursor->right_is_pos_infty = TRUE;
cursor->disable_prefetching = FALSE;
struct brtnode_fetch_extra bfe;
// quick test to see that we have the right behavior when we set
// disable_prefetching to TRUE
cursor->disable_prefetching = TRUE;
fill_bfe_for_prefetch(&bfe, brt_h, cursor);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &bfe);
assert(r==0);
assert(dn->n_children == 3);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_ON_DISK);
r = toku_brtnode_pf_callback(dn, &bfe, fd, &attr);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_ON_DISK);
destroy_bfe_for_prefetch(&bfe);
toku_brtnode_free(&dn);
// now enable prefetching again
cursor->disable_prefetching = FALSE;
fill_bfe_for_prefetch(&bfe, brt_h, cursor);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &bfe);
assert(r==0);
......@@ -138,7 +158,6 @@ test_prefetch_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
toku_free(cursor);
}
#endif
static void
test_subset_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
......@@ -168,11 +187,38 @@ test_subset_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
&left,
&right,
FALSE,
FALSE,
FALSE
);
// fake the childnum to read
// set disable_prefetching ON
bfe.child_to_read = 2;
bfe.disable_prefetching = TRUE;
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &bfe);
assert(r==0);
assert(dn->n_children == 3);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_AVAIL);
// need to call this twice because we had a subset read before, that touched the clock
toku_brtnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, NULL);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_AVAIL);
toku_brtnode_pe_callback(dn, make_pair_attr(0xffffffff), &attr, NULL);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_COMPRESSED);
r = toku_brtnode_pf_callback(dn, &bfe, fd, &attr);
assert(BP_STATE(dn,0) == PT_ON_DISK);
assert(BP_STATE(dn,1) == PT_ON_DISK);
assert(BP_STATE(dn,2) == PT_AVAIL);
toku_brtnode_free(&dn);
// fake the childnum to read
bfe.child_to_read = 2;
bfe.disable_prefetching = FALSE;
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &bfe);
assert(r==0);
assert(dn->n_children == 3);
......@@ -217,7 +263,6 @@ test_subset_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
assert(BP_STATE(dn,2) == PT_ON_DISK);
toku_brtnode_free(&dn);
toku_free(cursor);
}
......@@ -307,7 +352,7 @@ test_prefetching(void) {
r = toku_serialize_brtnode_to(fd, make_blocknum(20), &sn, brt->h, 1, 1, FALSE);
assert(r==0);
//test_prefetch_read(fd, brt, brt_h);
test_prefetch_read(fd, brt, brt_h);
test_subset_read(fd, brt, brt_h);
kv_pair_free(sn.childkeys[0]);
......
......@@ -158,7 +158,8 @@ test2(int fd, struct brt_header *brt_h, BRTNODE *dn) {
&left,
&right,
TRUE,
TRUE
TRUE,
FALSE
);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, dn, &bfe_subset);
......
......@@ -52,7 +52,7 @@ static void test_sub_block(int n) {
assert(error == 0);
BRT_CURSOR cursor;
error = toku_brt_cursor(brt, &cursor, NULL, FALSE);
error = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(error == 0);
for (i=0; ; i++) {
......
......@@ -52,7 +52,7 @@ static void test_multiple_brt_cursor_dbts(int n, DB *db) {
}
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i], NULL, FALSE);
r = toku_brt_cursor(brt, &cursors[i], NULL, FALSE, FALSE);
assert(r == 0);
}
......
......@@ -20,7 +20,7 @@ static void assert_cursor_notfound(BRT brt, int position) {
BRT_CURSOR cursor=0;
int r;
r = toku_brt_cursor(brt, &cursor, NULL, FALSE);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(r==0);
struct check_pair pair = {0,0,0,0,0};
......@@ -36,7 +36,7 @@ static void assert_cursor_value(BRT brt, int position, long long value) {
BRT_CURSOR cursor=0;
int r;
r = toku_brt_cursor(brt, &cursor, NULL, FALSE);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -53,7 +53,7 @@ static void assert_cursor_first_last(BRT brt, long long firstv, long long lastv)
BRT_CURSOR cursor=0;
int r;
r = toku_brt_cursor(brt, &cursor, NULL, FALSE);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("first key: ");
......@@ -251,7 +251,7 @@ static void assert_cursor_walk(BRT brt, int n) {
int i;
int r;
r = toku_brt_cursor(brt, &cursor, NULL, FALSE);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -317,7 +317,7 @@ static void assert_cursor_rwalk(BRT brt, int n) {
int i;
int r;
r = toku_brt_cursor(brt, &cursor, NULL, FALSE);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -403,7 +403,7 @@ static void assert_cursor_walk_inorder(BRT brt, int n) {
int r;
char *prevkey = 0;
r = toku_brt_cursor(brt, &cursor, NULL, FALSE);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -505,7 +505,7 @@ static void test_brt_cursor_split(int n, DB *db) {
assert(r==0);
}
r = toku_brt_cursor(brt, &cursor, NULL, FALSE);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -570,7 +570,7 @@ static void test_multiple_brt_cursors(int n, DB *db) {
int i;
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i], NULL, FALSE);
r = toku_brt_cursor(brt, &cursors[i], NULL, FALSE, FALSE);
assert(r == 0);
}
......@@ -620,7 +620,7 @@ static void test_multiple_brt_cursor_walk(int n, DB *db) {
int c;
/* create the cursors */
for (c=0; c<ncursors; c++) {
r = toku_brt_cursor(brt, &cursors[c], NULL, FALSE);
r = toku_brt_cursor(brt, &cursors[c], NULL, FALSE, FALSE);
assert(r == 0);
}
......@@ -707,7 +707,7 @@ static void test_brt_cursor_set(int n, int cursor_op, DB *db) {
assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor, NULL, FALSE);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(r==0);
/* set cursor to random keys in set { 0, 10, 20, .. 10*(n-1) } */
......@@ -780,7 +780,7 @@ static void test_brt_cursor_set_range(int n, DB *db) {
assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor, NULL, FALSE);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(r==0);
/* pick random keys v in 0 <= v < 10*n, the cursor should point
......@@ -830,7 +830,7 @@ static void test_brt_cursor_delete(int n, DB *db) {
error = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, null_txn, test_brt_cursor_keycompare, db);
assert(error == 0);
error = toku_brt_cursor(brt, &cursor, NULL, FALSE);
error = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE);
assert(error == 0);
DBT key, val;
......
......@@ -261,7 +261,7 @@ static void test_cursor_last_empty(void) {
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
r = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, null_txn, toku_builtin_compare_fun, null_db); assert(r==0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
r = toku_brt_cursor(brt, &cursor, NULL, FALSE); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE); assert(r==0);
{
struct check_pair pair = {0,0,0,0,0};
r = toku_brt_cursor_get(cursor, NULL, lookup_checkf, &pair, DB_LAST);
......@@ -297,7 +297,7 @@ static void test_cursor_next (void) {
r = toku_brt_insert(brt, toku_fill_dbt(&kbt, "hello", 6), toku_fill_dbt(&vbt, "there", 6), null_txn);
r = toku_brt_insert(brt, toku_fill_dbt(&kbt, "byebye", 7), toku_fill_dbt(&vbt, "byenow", 7), null_txn);
if (verbose) printf("%s:%d calling toku_brt_cursor(...)\n", __FILE__, __LINE__);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE); assert(r==0);
toku_init_dbt(&kbt);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
toku_init_dbt(&vbt);
......@@ -389,7 +389,7 @@ static void test_wrongendian_compare (int wrong_p, unsigned int N) {
}
{
BRT_CURSOR cursor=0;
r = toku_brt_cursor(brt, &cursor, NULL, FALSE); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE); assert(r==0);
for (i=0; i<2; i++) {
unsigned char a[4],b[4];
......@@ -429,7 +429,7 @@ static void test_wrongendian_compare (int wrong_p, unsigned int N) {
toku_cachetable_verify(ct);
}
BRT_CURSOR cursor=0;
r = toku_brt_cursor(brt, &cursor, NULL, FALSE); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE); assert(r==0);
for (i=0; i<N; i++) {
unsigned char a[4],b[4];
......@@ -573,7 +573,7 @@ static void test_brt_delete_present(int n) {
/* cursor should not find anything */
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL, FALSE);
r = toku_brt_cursor(t, &cursor, NULL, FALSE, FALSE);
assert(r == 0);
{
......@@ -704,7 +704,7 @@ static void test_brt_delete_cursor_first(int n) {
/* cursor should find the last key: n-1 */
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL, FALSE);
r = toku_brt_cursor(t, &cursor, NULL, FALSE, FALSE);
assert(r == 0);
{
......@@ -805,7 +805,7 @@ static void test_new_brt_cursor_create_close (void) {
int i;
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i], NULL, FALSE); assert(r == 0);
r = toku_brt_cursor(brt, &cursors[i], NULL, FALSE, FALSE); assert(r == 0);
}
for (i=0; i<n; i++) {
......@@ -839,7 +839,7 @@ static void test_new_brt_cursor_first(int n) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL, FALSE); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, FALSE, FALSE); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
......@@ -891,7 +891,7 @@ static void test_new_brt_cursor_last(int n) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL, FALSE); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, FALSE, FALSE); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
......@@ -943,7 +943,7 @@ static void test_new_brt_cursor_next(int n) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL, FALSE); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, FALSE, FALSE); assert(r == 0);
for (i=0; ; i++) {
int kk = toku_htonl(i);
......@@ -986,7 +986,7 @@ static void test_new_brt_cursor_prev(int n) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL, FALSE); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, FALSE, FALSE); assert(r == 0);
for (i=n-1; ; i--) {
int kk = toku_htonl(i);
......@@ -1029,7 +1029,7 @@ static void test_new_brt_cursor_current(int n) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL, FALSE); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, FALSE, FALSE); assert(r == 0);
for (i=0; ; i++) {
{
......@@ -1113,7 +1113,7 @@ static void test_new_brt_cursor_set_range(int n) {
r = toku_brt_insert(brt, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor, NULL, FALSE); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE); assert(r==0);
/* pick random keys v in 0 <= v < 10*n, the cursor should point
to the smallest key in the tree that is >= v */
......@@ -1170,7 +1170,7 @@ static void test_new_brt_cursor_set(int n, int cursor_op, DB *db) {
r = toku_brt_insert(brt, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor, NULL, FALSE); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE); assert(r==0);
/* set cursor to random keys in set { 0, 10, 20, .. 10*(n-1) } */
for (i=0; i<n; i++) {
......
......@@ -58,7 +58,7 @@ static void verify_dbfile(int n, const char *name) {
if (verbose) traceit("Verified brt internals");
BRT_CURSOR cursor = NULL;
r = toku_brt_cursor(t, &cursor, NULL, FALSE); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, FALSE, FALSE); assert(r == 0);
int i;
for (i=0; ; i++) {
......
......@@ -251,7 +251,7 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
r = toku_brt_open(t, name, 0, 0, ct, null_txn, 0); assert(r==0);
BRT_CURSOR cursor = NULL;
r = toku_brt_cursor(t, &cursor, NULL, FALSE); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, FALSE, FALSE); assert(r == 0);
int i;
for (i=0; i<n; i++) {
......
......@@ -23,7 +23,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
r = toku_open_brt(fname, 1, &brt, 1<<12, 1<<9, ct, null_txn, test_brt_cursor_keycompare, db); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, FALSE, FALSE); assert(r==0);
int i;
for (i=0; i<1000; i++) {
......
......@@ -59,7 +59,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute_
BRT_CURSOR c;
char lkey[100],rkey[100];
DBT lk, rk;
r = toku_brt_cursor(t, &c, null_txn, FALSE); assert(r == 0);
r = toku_brt_cursor(t, &c, null_txn, FALSE, FALSE); assert(r == 0);
snprintf(lkey, 100, "hello%d", i);
snprintf(rkey, 100, "hello%d", i + 100);
toku_brt_cursor_set_range_lock(c, toku_fill_dbt(&lk, lkey, 1+strlen(lkey)),
......
......@@ -13,17 +13,17 @@
static void
verify_val(DBT const *a, DBT const *b, void *c) {
assert(a->size == sizeof(int));
assert(b->size == sizeof(int));
int* expected = (int *)c;
assert(*expected == *(int *)a->data);
assert(*expected == *(int *)b->data);
assert(a->size == sizeof(u_int64_t));
assert(b->size == sizeof(u_int64_t));
u_int64_t* expected = (u_int64_t *)c;
assert(*expected == *(u_int64_t *)a->data);
assert(*expected == *(u_int64_t *)b->data);
}
static int
verify_fwd_fast(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
u_int64_t* expected = (u_int64_t *)c;
*expected = *expected + 1;
return TOKUDB_CURSOR_CONTINUE;
}
......@@ -31,7 +31,7 @@ verify_fwd_fast(DBT const *a, DBT const *b, void *c) {
static int
verify_fwd_slow(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
u_int64_t* expected = (u_int64_t *)c;
*expected = *expected + 1;
return 0;
}
......@@ -39,7 +39,7 @@ verify_fwd_slow(DBT const *a, DBT const *b, void *c) {
static int
verify_bwd_fast(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
u_int64_t* expected = (u_int64_t *)c;
*expected = *expected - 1;
return TOKUDB_CURSOR_CONTINUE;
}
......@@ -47,21 +47,62 @@ verify_bwd_fast(DBT const *a, DBT const *b, void *c) {
static int
verify_bwd_slow(DBT const *a, DBT const *b, void *c) {
verify_val(a,b,c);
int* expected = (int *)c;
u_int64_t* expected = (u_int64_t *)c;
*expected = *expected - 1;
return 0;
}
u_int64_t num_pivots_fetched_prefetch;
u_int64_t num_basements_decompressed_aggressive;
u_int64_t num_basements_decompressed_prefetch;
u_int64_t num_basements_fetched_aggressive;
u_int64_t num_basements_fetched_prefetch;
static void
init_eng_stat_vars(DB_ENV* env) {
ENGINE_STATUS engstat;
int r = env->get_engine_status(env, &engstat, NULL, 0);
CKERR(r);
num_pivots_fetched_prefetch = engstat.num_pivots_fetched_prefetch;
num_basements_decompressed_aggressive = engstat.num_basements_decompressed_aggressive;
num_basements_decompressed_prefetch = engstat.num_basements_decompressed_prefetch;
num_basements_fetched_aggressive = engstat.num_basements_fetched_aggressive;
num_basements_fetched_prefetch = engstat.num_basements_fetched_prefetch;
}
static void
test_bulk_fetch (int n, BOOL prelock) {
if (verbose) printf("test_rand_insert:%d \n", n);
check_eng_stat_vars_unchanged(DB_ENV* env) {
ENGINE_STATUS engstat;
int r = env->get_engine_status(env, &engstat, NULL, 0);
CKERR(r);
assert(num_pivots_fetched_prefetch == engstat.num_pivots_fetched_prefetch);
assert(num_basements_decompressed_aggressive == engstat.num_basements_decompressed_aggressive);
assert(num_basements_decompressed_prefetch == engstat.num_basements_decompressed_prefetch);
assert(num_basements_fetched_aggressive == engstat.num_basements_fetched_aggressive);
assert(num_basements_fetched_prefetch == engstat.num_basements_fetched_prefetch);
}
static void
print_relevant_eng_stat_vars(DB_ENV* env) {
ENGINE_STATUS engstat;
int r = env->get_engine_status(env, &engstat, NULL, 0);
CKERR(r);
printf("num_pivots_fetched_prefetch %"PRId64" \n", engstat.num_pivots_fetched_prefetch);
printf("num_basements_decompressed_aggressive %"PRId64" \n", engstat.num_basements_decompressed_aggressive);
printf("num_basements_decompressed_prefetch %"PRId64" \n", engstat.num_basements_decompressed_prefetch);
printf("num_basements_fetched_aggressive %"PRId64" \n", engstat.num_basements_fetched_aggressive);
printf("num_basements_fetched_prefetch %"PRId64" \n", engstat.num_basements_fetched_prefetch);
}
static void
test_bulk_fetch (u_int64_t n, BOOL prelock, BOOL disable_prefetching) {
if (verbose) printf("test_rand_insert:%"PRId64" \n", n);
DB_TXN * const null_txn = 0;
const char * const fname = "test.bulk_fetch.brt";
int r;
r = system("rm -rf " ENVDIR);
CKERR(r);
r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
......@@ -69,7 +110,8 @@ test_bulk_fetch (int n, BOOL prelock) {
/* create the dup database file */
DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0);
r=env->set_default_bt_compare(env, int_dbt_cmp); CKERR(r);
r=env->set_default_bt_compare(env, int64_dbt_cmp); CKERR(r);
r = env->set_cachesize(env, 0, (u_int32_t)n, 1); assert(r == 0);
r = env->open(env, ENVDIR, DB_CREATE+DB_PRIVATE+DB_INIT_MPOOL, 0); assert(r == 0);
DB *db;
......@@ -79,11 +121,13 @@ test_bulk_fetch (int n, BOOL prelock) {
assert(r == 0);
r = db->set_pagesize(db, 4096);
assert(r == 0);
r = db->set_readpagesize(db, 1024);
assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
int keys[n];
int i;
u_int64_t keys[n];
u_int64_t i;
for (i=0; i<n; i++) {
keys[i] = i;
}
......@@ -100,7 +144,11 @@ test_bulk_fetch (int n, BOOL prelock) {
DBC* cursor;
// verify fast
r = db->cursor(db, NULL, &cursor, 0);
u_int32_t flags = disable_prefetching ? DBC_DISABLE_PREFETCHING : 0;
if (disable_prefetching) {
init_eng_stat_vars(env);
}
r = db->cursor(db, NULL, &cursor, flags);
CKERR(r);
if (prelock) {
r = cursor->c_pre_acquire_range_lock(
......@@ -110,15 +158,24 @@ test_bulk_fetch (int n, BOOL prelock) {
);
CKERR(r);
}
int expected = 0;
u_int64_t expected = 0;
while (r != DB_NOTFOUND) {
r = cursor->c_getf_next(cursor, 0, verify_fwd_fast, &expected);
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
if (disable_prefetching) {
check_eng_stat_vars_unchanged(env);
}
if (verbose) {
print_relevant_eng_stat_vars(env);
}
// verify slow
r = db->cursor(db, NULL, &cursor, 0);
if (disable_prefetching) {
init_eng_stat_vars(env);
}
r = db->cursor(db, NULL, &cursor, flags);
CKERR(r);
if (prelock) {
r = cursor->c_pre_acquire_range_lock(
......@@ -134,9 +191,18 @@ test_bulk_fetch (int n, BOOL prelock) {
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
if (disable_prefetching) {
check_eng_stat_vars_unchanged(env);
}
if (verbose) {
print_relevant_eng_stat_vars(env);
}
// now do backwards
r = db->cursor(db, NULL, &cursor, 0);
if (disable_prefetching) {
init_eng_stat_vars(env);
}
r = db->cursor(db, NULL, &cursor, flags);
CKERR(r);
if (prelock) {
r = cursor->c_pre_acquire_range_lock(
......@@ -152,9 +218,18 @@ test_bulk_fetch (int n, BOOL prelock) {
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
if (disable_prefetching) {
check_eng_stat_vars_unchanged(env);
}
if (verbose) {
print_relevant_eng_stat_vars(env);
}
// verify slow
r = db->cursor(db, NULL, &cursor, 0);
if (disable_prefetching) {
init_eng_stat_vars(env);
}
r = db->cursor(db, NULL, &cursor, flags);
CKERR(r);
if (prelock) {
r = cursor->c_pre_acquire_range_lock(
......@@ -170,6 +245,12 @@ test_bulk_fetch (int n, BOOL prelock) {
assert(r==0 || r==DB_NOTFOUND);
}
r = cursor->c_close(cursor); CKERR(r);
if (disable_prefetching) {
check_eng_stat_vars_unchanged(env);
}
if (verbose) {
print_relevant_eng_stat_vars(env);
}
r = db->close(db, 0); CKERR(r);
......@@ -179,7 +260,9 @@ test_bulk_fetch (int n, BOOL prelock) {
int
test_main(int argc, char *const argv[]) {
parse_args(argc, argv);
test_bulk_fetch(10000, FALSE);
test_bulk_fetch(10000, TRUE);
test_bulk_fetch(10000, FALSE, TRUE);
test_bulk_fetch(10000, TRUE, TRUE);
test_bulk_fetch(10000, FALSE, FALSE);
test_bulk_fetch(10000, TRUE, FALSE);
return 0;
}
......@@ -2091,6 +2091,25 @@ env_get_engine_status(DB_ENV * env, ENGINE_STATUS * engstat, char * env_panic_st
engstat->msg_bytes_max = brt_stat.msg_bytes_max;
engstat->msg_num = brt_stat.msg_num;
engstat->msg_num_broadcast = brt_stat.msg_num_broadcast;
engstat->num_basements_decompressed_normal = brt_stat.num_basements_decompressed_normal;
engstat->num_basements_decompressed_aggressive = brt_stat.num_basements_decompressed_aggressive;
engstat->num_basements_decompressed_prefetch = brt_stat.num_basements_decompressed_prefetch;
engstat->num_basements_decompressed_write = brt_stat.num_basements_decompressed_write;
engstat->num_msg_buffer_decompressed_normal = brt_stat.num_msg_buffer_decompressed_normal;
engstat->num_msg_buffer_decompressed_aggressive = brt_stat.num_msg_buffer_decompressed_aggressive;
engstat->num_msg_buffer_decompressed_prefetch = brt_stat.num_msg_buffer_decompressed_prefetch;
engstat->num_msg_buffer_decompressed_write = brt_stat.num_msg_buffer_decompressed_write;
engstat->num_pivots_fetched_query = brt_stat.num_pivots_fetched_query;
engstat->num_pivots_fetched_prefetch = brt_stat.num_pivots_fetched_prefetch;
engstat->num_pivots_fetched_write = brt_stat.num_pivots_fetched_write;
engstat->num_basements_fetched_normal = brt_stat.num_basements_fetched_normal;
engstat->num_basements_fetched_aggressive = brt_stat.num_basements_fetched_aggressive;
engstat->num_basements_fetched_prefetch = brt_stat.num_basements_fetched_prefetch;
engstat->num_basements_fetched_write = brt_stat.num_basements_fetched_write;
engstat->num_msg_buffer_fetched_normal = brt_stat.num_msg_buffer_fetched_normal;
engstat->num_msg_buffer_fetched_aggressive = brt_stat.num_msg_buffer_fetched_aggressive;
engstat->num_msg_buffer_fetched_prefetch = brt_stat.num_msg_buffer_fetched_prefetch;
engstat->num_msg_buffer_fetched_write = brt_stat.num_msg_buffer_fetched_write;
}
{
u_int64_t fsync_count, fsync_time;
......@@ -2353,6 +2372,25 @@ env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) {
n += snprintf(buff + n, bufsiz - n, "msg_bytes_max %"PRIu64"\n", engstat.msg_bytes_max);
n += snprintf(buff + n, bufsiz - n, "msg_num %"PRIu64"\n", engstat.msg_num);
n += snprintf(buff + n, bufsiz - n, "msg_num_broadcast %"PRIu64"\n", engstat.msg_num_broadcast);
n += snprintf(buff + n, bufsiz - n, "num_basements_decompressed_normal %"PRIu64"\n", engstat.num_basements_decompressed_normal);
n += snprintf(buff + n, bufsiz - n, "num_basements_decompressed_aggressive %"PRIu64"\n", engstat.num_basements_decompressed_aggressive);
n += snprintf(buff + n, bufsiz - n, "num_basements_decompressed_prefetch %"PRIu64"\n", engstat.num_basements_decompressed_prefetch);
n += snprintf(buff + n, bufsiz - n, "num_basements_decompressed_write %"PRIu64"\n", engstat.num_basements_decompressed_write);
n += snprintf(buff + n, bufsiz - n, "num_msg_buffer_decompressed_normal %"PRIu64"\n", engstat.num_msg_buffer_decompressed_normal);
n += snprintf(buff + n, bufsiz - n, "num_msg_buffer_decompressed_aggressive %"PRIu64"\n", engstat.num_msg_buffer_decompressed_aggressive);
n += snprintf(buff + n, bufsiz - n, "num_msg_buffer_decompressed_prefetch %"PRIu64"\n", engstat.num_msg_buffer_decompressed_prefetch);
n += snprintf(buff + n, bufsiz - n, "num_msg_buffer_decompressed_write %"PRIu64"\n", engstat.num_msg_buffer_decompressed_write);
n += snprintf(buff + n, bufsiz - n, "num_pivots_fetched_query %"PRIu64"\n", engstat.num_pivots_fetched_query);
n += snprintf(buff + n, bufsiz - n, "num_pivots_fetched_prefetch %"PRIu64"\n", engstat.num_pivots_fetched_prefetch);
n += snprintf(buff + n, bufsiz - n, "num_pivots_fetched_write %"PRIu64"\n", engstat.num_pivots_fetched_write);
n += snprintf(buff + n, bufsiz - n, "num_basements_fetched_normal %"PRIu64"\n", engstat.num_basements_fetched_normal);
n += snprintf(buff + n, bufsiz - n, "num_basements_fetched_aggressive %"PRIu64"\n", engstat.num_basements_fetched_aggressive);
n += snprintf(buff + n, bufsiz - n, "num_basements_fetched_prefetch %"PRIu64"\n", engstat.num_basements_fetched_prefetch);
n += snprintf(buff + n, bufsiz - n, "num_basements_fetched_write %"PRIu64"\n", engstat.num_basements_fetched_write);
n += snprintf(buff + n, bufsiz - n, "num_msg_buffer_fetched_normal %"PRIu64"\n", engstat.num_msg_buffer_fetched_normal);
n += snprintf(buff + n, bufsiz - n, "num_msg_buffer_fetched_aggressive %"PRIu64"\n", engstat.num_msg_buffer_fetched_aggressive);
n += snprintf(buff + n, bufsiz - n, "num_msg_buffer_fetched_prefetch %"PRIu64"\n", engstat.num_msg_buffer_fetched_prefetch);
n += snprintf(buff + n, bufsiz - n, "num_msg_buffer_fetched_write %"PRIu64"\n", engstat.num_msg_buffer_fetched_write);
n += snprintf(buff + n, bufsiz - n, "multi_inserts %"PRIu64"\n", engstat.multi_inserts);
n += snprintf(buff + n, bufsiz - n, "multi_inserts_fail %"PRIu64"\n", engstat.multi_inserts_fail);
n += snprintf(buff + n, bufsiz - n, "multi_deletes %"PRIu64"\n", engstat.multi_deletes);
......@@ -4189,7 +4227,7 @@ toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
// lock_flags |= DB_PRELOCKED
//}
r = toku_db_cursor(cursor->dbp, dbc_struct_i(cursor)->txn, &count_cursor, 0, 0);
r = toku_db_cursor(cursor->dbp, dbc_struct_i(cursor)->txn, &count_cursor, DBC_DISABLE_PREFETCHING, 0);
if (r != 0) goto finish;
r = toku_c_getf_set(count_cursor, lock_flags, &currentkey, ydb_getf_do_nothing, NULL);
......@@ -4218,7 +4256,7 @@ db_getf_set(DB *db, DB_TXN *txn, u_int32_t flags, DBT *key, YDB_CALLBACK_FUNCTIO
DBC *c;
uint32_t create_flags = flags & (DB_ISOLATION_FLAGS | DB_RMW);
flags &= ~DB_ISOLATION_FLAGS;
int r = toku_db_cursor(db, txn, &c, create_flags, 1);
int r = toku_db_cursor(db, txn, &c, create_flags | DBC_DISABLE_PREFETCHING, 1);
if (r==0) {
r = toku_c_getf_set(c, flags, key, f, extra);
int r2 = toku_c_close(c);
......@@ -4464,7 +4502,7 @@ toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int is_temporar
DB_ENV* env = db->dbenv;
int r;
size_t result_size = sizeof(DBC)+sizeof(struct __toku_dbc_internal); // internal stuff stuck on the end
if (flags & ~(DB_SERIALIZABLE | DB_INHERIT_ISOLATION | DB_RMW)) {
if (flags & ~(DB_SERIALIZABLE | DB_INHERIT_ISOLATION | DB_RMW | DBC_DISABLE_PREFETCHING)) {
return toku_ydb_do_error(
env,
EINVAL,
......@@ -4527,7 +4565,8 @@ toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int is_temporar
db->i->brt,
&dbc_struct_i(result)->c,
txn ? db_txn_struct_i(txn)->tokutxn : NULL,
is_snapshot_read
is_snapshot_read,
((flags & DBC_DISABLE_PREFETCHING) != 0)
);
assert(r == 0 || r == TOKUDB_MVCC_DICTIONARY_TOO_NEW);
if (r == 0) {
......@@ -4565,7 +4604,7 @@ toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
DBC *dbc;
r = toku_db_cursor(db, txn, &dbc, iso_flags, 1);
r = toku_db_cursor(db, txn, &dbc, iso_flags | DBC_DISABLE_PREFETCHING, 1);
if (r!=0) return r;
u_int32_t c_get_flags = DB_SET;
r = toku_c_get(dbc, key, data, c_get_flags | lock_flags);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment