Commit bd11c636 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:3635] [t:3764] [t:3757] [t:3749] merging tokudb.3635+prefetch into...

[t:3635] [t:3764] [t:3757] [t:3749] merging tokudb.3635+prefetch into mainline, pending testing, fixes #3635, #3764, #3757, #3749

git-svn-id: file:///svn/toku/tokudb@33537 c7de825b-a66e-492c-adef-691d508d4ae1
parent b2877988
......@@ -138,9 +138,9 @@ static void parse_args (int argc, char *const argv[]) {
}
//Prelocking is meaningless without transactions
if (do_txns==0) {
prelockflag=0;
//prelockflag=0;
lock_flag=0;
prelock=0;
//prelock=0;
}
}
......@@ -294,6 +294,9 @@ static void scanscan_lwc (void) {
double prevtime = gettime();
DBC *dbc;
r = db->cursor(db, tid, &dbc, 0); assert(r==0);
if(prelock) {
r = dbc->c_pre_acquire_range_lock(dbc, db->dbt_neg_infty(), db->dbt_pos_infty()); assert(r==0);
}
u_int32_t f_flags = 0;
if (prelockflag && (counter || prelock)) {
f_flags |= lock_flag;
......
......@@ -89,6 +89,7 @@ add_estimates (struct subtree_estimates *a, struct subtree_estimates *b) {
enum brtnode_fetch_type {
brtnode_fetch_none=1, // no partitions needed.
brtnode_fetch_subset, // some subset of partitions needed
brtnode_fetch_prefetch, // this is part of a prefetch call
brtnode_fetch_all // every partition is needed
};
......@@ -107,6 +108,8 @@ struct brtnode_fetch_extra {
// parameters needed to find out which child needs to be decompressed (so it can be read)
brt_search_t* search;
BRT brt;
DBT *range_lock_left_key, *range_lock_right_key;
BOOL left_is_neg_infty, right_is_pos_infty;
// this value will be set during the fetch_callback call by toku_brtnode_fetch_callback or toku_brtnode_pf_req_callback
// thi callbacks need to evaluate this anyway, so we cache it here so the search code does not reevaluate it
int child_to_read;
......@@ -123,8 +126,14 @@ static inline void fill_bfe_for_full_read(struct brtnode_fetch_extra *bfe, struc
bfe->h = h;
bfe->search = NULL;
bfe->brt = NULL;
bfe->range_lock_left_key = NULL;
bfe->range_lock_right_key = NULL;
bfe->left_is_neg_infty = FALSE;
bfe->right_is_pos_infty = FALSE;
bfe->child_to_read = -1;
};
}
static inline void fill_bfe_for_prefetch(struct brtnode_fetch_extra *bfe, struct brt_header *h, BRT brt, BRT_CURSOR c);
//
// Helper function to fill a brtnode_fetch_extra with data
......@@ -136,15 +145,23 @@ static inline void fill_bfe_for_subset_read(
struct brtnode_fetch_extra *bfe,
struct brt_header *h,
BRT brt,
brt_search_t* search
brt_search_t* search,
DBT *left,
DBT *right,
BOOL left_is_neg_infty,
BOOL right_is_pos_infty
)
{
bfe->type = brtnode_fetch_subset;
bfe->h = h;
bfe->search = search;
bfe->brt = brt;
bfe->range_lock_left_key = left;
bfe->range_lock_right_key = right;
bfe->left_is_neg_infty = left_is_neg_infty;
bfe->right_is_pos_infty = right_is_pos_infty;
bfe->child_to_read = -1;
};
}
//
// Helper function to fill a brtnode_fetch_extra with data
......@@ -157,8 +174,26 @@ static inline void fill_bfe_for_min_read(struct brtnode_fetch_extra *bfe, struct
bfe->h = h;
bfe->search = NULL;
bfe->brt = NULL;
bfe->range_lock_left_key = NULL;
bfe->range_lock_right_key = NULL;
bfe->left_is_neg_infty = FALSE;
bfe->right_is_pos_infty = FALSE;
bfe->child_to_read = -1;
};
}
static inline void destroy_bfe_for_prefetch(struct brtnode_fetch_extra *bfe) {
assert(bfe->type == brtnode_fetch_prefetch);
if (bfe->range_lock_left_key != NULL) {
toku_destroy_dbt(bfe->range_lock_left_key);
toku_free(bfe->range_lock_left_key);
bfe->range_lock_left_key = NULL;
}
if (bfe->range_lock_right_key != NULL) {
toku_destroy_dbt(bfe->range_lock_right_key);
toku_free(bfe->range_lock_right_key);
bfe->range_lock_right_key = NULL;
}
}
// data of an available partition of a nonleaf brtnode
struct brtnode_nonleaf_childinfo {
......@@ -526,6 +561,8 @@ struct brt_cursor {
BOOL current_in_omt;
BOOL prefetching;
DBT key, val; // The key-value pair that the cursor currently points to
DBT range_lock_left_key, range_lock_right_key;
BOOL left_is_neg_infty, right_is_pos_infty;
OMTCURSOR omtcursor;
u_int64_t root_put_counter; // what was the count on the BRT when we validated the cursor?
TXNID oldest_living_xid;// what was the oldest live txnid when we created the cursor?
......@@ -535,6 +572,33 @@ struct brt_cursor {
struct brt_cursor_leaf_info leaf_info;
};
// this is in a strange place because it needs the cursor struct to be defined
static inline void fill_bfe_for_prefetch(struct brtnode_fetch_extra *bfe, struct brt_header *h, BRT brt, BRT_CURSOR c) {
bfe->type = brtnode_fetch_prefetch;
bfe->h = h;
bfe->search = NULL;
bfe->brt = brt;
{
const DBT *left = &c->range_lock_left_key;
const DBT *right = &c->range_lock_right_key;
if (left->data) {
MALLOC(bfe->range_lock_left_key); resource_assert(bfe->range_lock_left_key);
toku_fill_dbt(bfe->range_lock_left_key, toku_xmemdup(left->data, left->size), left->size);
} else {
bfe->range_lock_left_key = NULL;
}
if (right->data) {
MALLOC(bfe->range_lock_right_key); resource_assert(bfe->range_lock_right_key);
toku_fill_dbt(bfe->range_lock_right_key, toku_xmemdup(right->data, right->size), right->size);
} else {
bfe->range_lock_right_key = NULL;
}
}
bfe->left_is_neg_infty = c->left_is_neg_infty;
bfe->right_is_pos_infty = c->right_is_pos_infty;
bfe->child_to_read = -1;
}
typedef struct ancestors *ANCESTORS;
struct ancestors {
BRTNODE node; // This is the root node if next is NULL.
......@@ -556,6 +620,11 @@ toku_brt_search_which_child(
bool
toku_bfe_wants_child_available (struct brtnode_fetch_extra* bfe, int childnum);
int
toku_bfe_leftmost_child_wanted(struct brtnode_fetch_extra *bfe, BRTNODE node);
int
toku_bfe_rightmost_child_wanted(struct brtnode_fetch_extra *bfe, BRTNODE node);
// allocate a block number
// allocate and initialize a brtnode
// put the brtnode into the cache table
......
......@@ -8,6 +8,15 @@
#include "threadpool.h"
#include <compress.h>
#if defined(HAVE_CILK)
#include <cilk/cilk.h>
#define cilk_worker_count (__cilkrts_get_nworkers())
#else
#define cilk_spawn
#define cilk_sync
#define cilk_for for
#define cilk_worker_count 1
#endif
static BRT_UPGRADE_STATUS_S upgrade_status; // accountability, used in backwards_x.c
......@@ -607,6 +616,27 @@ rebalance_brtnode_leaf(BRTNODE node, unsigned int basementnodesize)
toku_free(new_pivots);
}
static void
serialize_and_compress(BRTNODE node, int npartitions, struct sub_block sb[]);
// tests are showing that serial insertions are slightly faster
// using the pthreads than using CILK. Disabling CILK until we have
// some evidence that it is faster
//#ifdef HAVE_CILK
#if 0
static void
serialize_and_compress(BRTNODE node, int npartitions, struct sub_block sb[]) {
#pragma cilk grainsize = 1
cilk_for (int i = 0; i < npartitions; i++) {
serialize_brtnode_partition(node, i, &sb[i]);
compress_brtnode_sub_block(&sb[i]);
}
}
#else
struct serialize_compress_work {
struct work base;
BRTNODE node;
......@@ -657,6 +687,8 @@ serialize_and_compress(BRTNODE node, int npartitions, struct sub_block sb[]) {
}
}
#endif
// Writes out each child to a separate malloc'd buffer, then compresses
// all of them, and writes the uncompressed header, to bytes_to_write,
// which is malloc'd.
......@@ -677,7 +709,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
// Each partition represents a compressed sub block
// For internal nodes, a sub block is a message buffer
// For leaf nodes, a sub block is a basement node
struct sub_block sb[npartitions];
struct sub_block *MALLOC_N(npartitions, sb);
struct sub_block sb_node_info;
for (int i = 0; i < npartitions; i++) {
sub_block_init(&sb[i]);;
......@@ -687,15 +719,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
//
// First, let's serialize and compress the individual sub blocks
//
#if 0
// TODO: (Zardosht) cilkify this
for (int i = 0; i < npartitions; i++) {
serialize_brtnode_partition(node, i, &sb[i]);
compress_brtnode_sub_block(&sb[i]);
}
#else
serialize_and_compress(node, npartitions, sb);
#endif
//
// Now lets create a sub-block that has the common node information,
......@@ -722,7 +746,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
// set the node bp_offset
//
node->bp_offset = serialize_node_header_size(node) + sb_node_info.compressed_size + 4;
char *data = toku_xmalloc(total_node_size);
char *curr_ptr = data;
// now create the final serialized node
......@@ -763,6 +787,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
toku_free(sb[i].uncompressed_ptr);
}
toku_free(sb);
return 0;
}
......@@ -1071,7 +1096,7 @@ setup_available_brtnode_partition(BRTNODE node, int i) {
static void
setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* bfe) {
if (bfe->type == brtnode_fetch_subset) {
if (bfe->type == brtnode_fetch_subset && bfe->search != NULL) {
// we do not take into account prefetching yet
// as of now, if we need a subset, the only thing
// we can possibly require is a single basement node
......@@ -1085,18 +1110,30 @@ setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* bfe) {
bfe->search
);
}
int lc, rc;
if (bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_prefetch) {
lc = toku_bfe_leftmost_child_wanted(bfe, node);
rc = toku_bfe_rightmost_child_wanted(bfe, node);
} else {
lc = -1;
rc = -1;
}
//
// setup memory needed for the node
//
//printf("node height %d, blocknum %"PRId64", type %d lc %d rc %d\n", node->height, node->thisnodename.b, bfe->type, lc, rc);
for (int i = 0; i < node->n_children; i++) {
BP_INIT_UNTOUCHED_CLOCK(node,i);
BP_STATE(node,i) = toku_bfe_wants_child_available(bfe,i) ? PT_AVAIL : PT_COMPRESSED;
BP_STATE(node, i) = ((toku_bfe_wants_child_available(bfe, i) || (lc <= i && i <= rc))
? PT_AVAIL : PT_COMPRESSED);
BP_WORKDONE(node,i) = 0;
if (BP_STATE(node,i) == PT_AVAIL) {
//printf(" %d is available\n", i);
setup_available_brtnode_partition(node, i);
BP_TOUCH_CLOCK(node,i);
}
else if (BP_STATE(node,i) == PT_COMPRESSED) {
//printf(" %d is compressed\n", i);
set_BSB(node, i, sub_block_creat());
}
else {
......@@ -1153,15 +1190,34 @@ deserialize_brtnode_partition(
assert(rb.ndone == rb.size);
}
static void
decompress_and_deserialize_worker(struct rbuf curr_rbuf, struct sub_block curr_sb, BRTNODE node, int child)
{
read_and_decompress_sub_block(&curr_rbuf, &curr_sb);
// at this point, sb->uncompressed_ptr stores the serialized node partition
deserialize_brtnode_partition(&curr_sb, node, child);
toku_free(curr_sb.uncompressed_ptr);
}
static void
check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf, struct sub_block curr_sb, BRTNODE node, int child)
{
read_compressed_sub_block(&curr_rbuf, &curr_sb);
SUB_BLOCK bp_sb = BSB(node, child);
bp_sb->compressed_size = curr_sb.compressed_size;
bp_sb->uncompressed_size = curr_sb.uncompressed_size;
bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size);
memcpy(bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size);
}
//
// deserializes a brtnode that is in rb (with pointer of rb just past the magic) into a BRTNODE
//
static int
deserialize_brtnode_from_rbuf(
BRTNODE *brtnode,
BLOCKNUM blocknum,
u_int32_t fullhash,
BRTNODE *brtnode,
BLOCKNUM blocknum,
u_int32_t fullhash,
struct brtnode_fetch_extra* bfe,
struct rbuf *rb
)
......@@ -1206,22 +1262,21 @@ deserialize_brtnode_from_rbuf(
toku_free(sb_node_info.uncompressed_ptr);
//
// now that we have read and decompressed up until
// now that we have read and decompressed up until
// the start of the bp's, we can set the node->bp_offset
// so future partial fetches know where to get bp's
//
node->bp_offset = rb->ndone;
// now that the node info has been deserialized, we can proceed to deserialize
// now that the node info has been deserialized, we can proceed to deserialize
// the individual sub blocks
assert(bfe->type == brtnode_fetch_none || bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_all);
assert(bfe->type == brtnode_fetch_none || bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_all || bfe->type == brtnode_fetch_prefetch);
// setup the memory of the partitions
// for partitions being decompressed, create either FIFO or basement node
// for partitions staying compressed, create sub_block
setup_brtnode_partitions(node,bfe);
// TODO: (Zardosht) Cilkify this
for (int i = 0; i < node->n_children; i++) {
u_int32_t curr_offset = (i==0) ? 0 : BP_OFFSET(node,i-1);
u_int32_t curr_size = (i==0) ? BP_OFFSET(node,i) : (BP_OFFSET(node,i) - BP_OFFSET(node,i-1));
......@@ -1230,46 +1285,35 @@ deserialize_brtnode_from_rbuf(
// we need to intialize curr_rbuf to point to this place
struct rbuf curr_rbuf = {.buf = NULL, .size = 0, .ndone = 0};
rbuf_init(&curr_rbuf, rb->buf + rb->ndone + curr_offset, curr_size);
struct sub_block curr_sb;
sub_block_init(&curr_sb);
//
// now we are at the point where we have:
// - read the entire compressed node off of disk,
// - decompressed the pivot and offset information,
// - decompressed the pivot and offset information,
// - have arrived at the individual partitions.
//
// Based on the information in bfe, we want to decompress a subset of
// Based on the information in bfe, we want to decompress a subset of
// of the compressed partitions (also possibly none or possibly all)
// The partitions that we want to decompress and make available
// to the node, we do, the rest we simply copy in compressed
// form into the node, and set the state of the partition to PT_COMPRESSED
//
struct sub_block curr_sb;
sub_block_init(&curr_sb);
// case where we read and decompress the partition
// deserialize_brtnode_info figures out what the state
// should be and sets up the memory so that we are ready to use it
if (BP_STATE(node,i) == PT_AVAIL) {
read_and_decompress_sub_block(&curr_rbuf, &curr_sb);
// at this point, sb->uncompressed_ptr stores the serialized node partition
deserialize_brtnode_partition(&curr_sb, node, i);
toku_free(curr_sb.uncompressed_ptr);
cilk_spawn decompress_and_deserialize_worker(curr_rbuf, curr_sb, node, i);
}
// case where we leave the partition in the compressed state
else if (BP_STATE(node,i) == PT_COMPRESSED) {
read_compressed_sub_block(&curr_rbuf, &curr_sb);
SUB_BLOCK bp_sb = BSB(node, i);
bp_sb->compressed_size = curr_sb.compressed_size;
bp_sb->uncompressed_size = curr_sb.uncompressed_size;
bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size);
memcpy(
bp_sb->compressed_ptr,
curr_sb.compressed_ptr,
bp_sb->compressed_size
);
cilk_spawn check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i);
}
}
cilk_sync;
*brtnode = node;
r = 0;
cleanup:
......
......@@ -110,6 +110,15 @@ Split_or_merge (node, childnum) {
#include "toku_atomic.h"
#include "sub_block.h"
#if defined(HAVE_CILK)
#include <cilk/cilk.h>
#define cilk_worker_count (__cilkrts_get_nworkers())
#else
#define cilk_spawn
#define cilk_sync
#define cilk_for for
#define cilk_worker_count 1
#endif
static const uint32_t this_version = BRT_LAYOUT_VERSION;
......@@ -574,7 +583,43 @@ toku_bfe_wants_child_available (struct brtnode_fetch_extra* bfe, int childnum)
}
}
int
toku_bfe_leftmost_child_wanted(struct brtnode_fetch_extra *bfe, BRTNODE node)
{
lazy_assert(bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_prefetch);
if (bfe->left_is_neg_infty) {
return 0;
} else if (bfe->range_lock_left_key == NULL) {
return -1;
} else {
return toku_brtnode_which_child(node, bfe->range_lock_left_key, bfe->brt);
}
}
int
toku_bfe_rightmost_child_wanted(struct brtnode_fetch_extra *bfe, BRTNODE node)
{
lazy_assert(bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_prefetch);
if (bfe->right_is_pos_infty) {
return node->n_children - 1;
} else if (bfe->range_lock_right_key == NULL) {
return -1;
} else {
return toku_brtnode_which_child(node, bfe->range_lock_right_key, bfe->brt);
}
}
static int
brt_cursor_rightmost_child_wanted(BRT_CURSOR cursor, BRT brt, BRTNODE node)
{
if (cursor->right_is_pos_infty) {
return node->n_children - 1;
} else if (cursor->range_lock_right_key.data == NULL) {
return -1;
} else {
return toku_brtnode_which_child(node, &cursor->range_lock_right_key, brt);
}
}
//fd is protected (must be holding fdlock)
void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename, void *brtnode_v, void *extraargs, long size __attribute__((unused)), BOOL write_me, BOOL keep_me, BOOL for_checkpoint) {
......@@ -680,6 +725,23 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_
}
static inline void
brt_status_update_partial_fetch(u_int8_t state)
{
if (state == PT_AVAIL) {
brt_status.partial_fetch_hit++;
}
else if (state == PT_COMPRESSED) {
brt_status.partial_fetch_compressed++;
}
else if (state == PT_ON_DISK){
brt_status.partial_fetch_miss++;
}
else {
assert(FALSE);
}
}
// Callback that states if a partial fetch of the node is necessary
// Currently, this function is responsible for the following things:
// - reporting to the cachetable whether a partial fetch is required (as required by the contract of the callback)
......@@ -714,20 +776,8 @@ BOOL toku_brtnode_pf_req_callback(void* brtnode_pv, void* read_extraargs) {
// the entire node must be made available
if (BP_STATE(node,i) != PT_AVAIL) {
retval = TRUE;
// do some accounting for the case that we have missed
if (BP_STATE(node,i) == PT_COMPRESSED) {
brt_status.partial_fetch_compressed++;
}
else if (BP_STATE(node,i) == PT_ON_DISK){
brt_status.partial_fetch_miss++;
}
else {
assert(FALSE);
}
}
else {
brt_status.partial_fetch_hit++;
}
brt_status_update_partial_fetch(BP_STATE(node, i));
}
}
else if (bfe->type == brtnode_fetch_subset) {
......@@ -745,22 +795,17 @@ BOOL toku_brtnode_pf_req_callback(void* brtnode_pv, void* read_extraargs) {
);
BP_TOUCH_CLOCK(node,bfe->child_to_read);
// child we want to read is not available, must set retval to TRUE
if (BP_STATE(node,bfe->child_to_read) != PT_AVAIL) {
retval = TRUE;
// do some accounting for the case that we have missed
if (BP_STATE(node,bfe->child_to_read) == PT_COMPRESSED) {
brt_status.partial_fetch_compressed++;
}
else if (BP_STATE(node,bfe->child_to_read) == PT_ON_DISK){
brt_status.partial_fetch_miss++;
}
else {
assert(FALSE);
retval = (BP_STATE(node, bfe->child_to_read) != PT_AVAIL);
brt_status_update_partial_fetch(BP_STATE(node, bfe->child_to_read));
}
else if (bfe->type == brtnode_fetch_prefetch) {
int lc = toku_bfe_leftmost_child_wanted(bfe, node);
int rc = toku_bfe_rightmost_child_wanted(bfe, node);
for (int i = lc; i <= rc; ++i) {
if (BP_STATE(node, i) != PT_AVAIL) {
retval = TRUE;
}
}
else {
retval = FALSE;
brt_status.partial_fetch_hit++;
brt_status_update_partial_fetch(BP_STATE(node, i));
}
}
else {
......@@ -777,28 +822,35 @@ int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, lon
struct brtnode_fetch_extra *bfe = read_extraargs;
// there must be a reason this is being called. If we get a garbage type or the type is brtnode_fetch_none,
// then something went wrong
assert((bfe->type == brtnode_fetch_subset) || (bfe->type == brtnode_fetch_all));
assert((bfe->type == brtnode_fetch_subset) || (bfe->type == brtnode_fetch_all) || (bfe->type == brtnode_fetch_prefetch));
// determine the range to prefetch
int lc, rc;
if (bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_prefetch) {
lc = toku_bfe_leftmost_child_wanted(bfe, node);
rc = toku_bfe_rightmost_child_wanted(bfe, node);
} else {
lc = -1;
rc = -1;
}
// TODO: possibly cilkify expensive operations in this loop
// TODO: review this with others to see if it can be made faster
for (int i = 0; i < node->n_children; i++) {
if (BP_STATE(node,i) == PT_AVAIL) {
continue;
}
if (toku_bfe_wants_child_available(bfe, i)) {
if ((lc <= i && i <= rc) || toku_bfe_wants_child_available(bfe, i)) {
if (BP_STATE(node,i) == PT_COMPRESSED) {
//
// decompress the subblock
//
toku_deserialize_bp_from_compressed(node, i);
cilk_spawn toku_deserialize_bp_from_compressed(node, i);
}
else if (BP_STATE(node,i) == PT_ON_DISK) {
toku_deserialize_bp_from_disk(node, i, fd, bfe);
cilk_spawn toku_deserialize_bp_from_disk(node, i, fd, bfe);
}
else {
assert(FALSE);
}
}
}
cilk_sync;
*sizep = brtnode_memory_size(node);
return 0;
}
......@@ -2989,26 +3041,32 @@ static u_int32_t get_roothash (BRT brt) {
static void apply_cmd_to_in_memory_non_root_leaves (
BRT t,
CACHEKEY nodenum,
u_int32_t fullhash,
BRT_MSG cmd,
BRTNODE parent,
BRT t,
CACHEKEY nodenum,
u_int32_t fullhash,
BRT_MSG cmd,
BRTNODE parent,
int parents_childnum,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
uint64_t * workdone
uint64_t * workdone,
bool *made_change_p
);
static void apply_cmd_to_in_memory_non_root_leaves_starting_at_node (BRT t,
static void apply_cmd_to_in_memory_non_root_leaves_starting_at_node (BRT t,
BRTNODE node,
BRT_MSG cmd,
BOOL is_root,
BRTNODE parent,
BRT_MSG cmd,
BOOL is_root,
BRTNODE parent,
int parents_childnum,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
uint64_t * workdone) {
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
uint64_t * workdone,
bool *made_change_p) {
bool made_change = false;
if (made_change_p == NULL) {
made_change_p = &made_change;
}
// internal node
if (node->height>0) {
if (brt_msg_applies_once(cmd)) {
......@@ -3018,7 +3076,7 @@ static void apply_cmd_to_in_memory_non_root_leaves_starting_at_node (BRT t,
u_int32_t child_fullhash = compute_child_fullhash(t->cf, node, childnum);
if (is_root) // record workdone in root only, if not root then this is a recursive call so just pass along pointer
workdone = &(BP_WORKDONE(node,childnum));
apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), child_fullhash, cmd, node, childnum, &next_ancestors, &next_bounds, workdone);
apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), child_fullhash, cmd, node, childnum, &next_ancestors, &next_bounds, workdone, made_change_p);
}
else if (brt_msg_applies_all(cmd)) {
for (int childnum=0; childnum<node->n_children; childnum++) {
......@@ -3027,52 +3085,54 @@ static void apply_cmd_to_in_memory_non_root_leaves_starting_at_node (BRT t,
u_int32_t child_fullhash = compute_child_fullhash(t->cf, node, childnum);
if (is_root)
workdone = &(BP_WORKDONE(node,childnum));
apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), child_fullhash, cmd, node, childnum, &next_ancestors, &next_bounds, workdone);
apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), child_fullhash, cmd, node, childnum, &next_ancestors, &next_bounds, workdone, made_change_p);
}
}
}
// leaf node
else {
invariant(!is_root);
bool made_change;
toku_apply_cmd_to_leaf(t, node, cmd, &made_change, ancestors, workdone);
toku_apply_cmd_to_leaf(t, node, cmd, made_change_p, ancestors, workdone);
}
if (parent) {
fixup_child_estimates(parent, parents_childnum, node, FALSE);
if (*made_change_p) {
if (parent) {
fixup_child_estimates(parent, parents_childnum, node, FALSE);
} else {
invariant(is_root); // only root has no parent
}
}
else
invariant(is_root); // only root has no parent
}
// apply a single message, stored in root's buffer(s), to all relevant leaves that are in memory
static void apply_cmd_to_in_memory_non_root_leaves (
BRT t,
CACHEKEY nodenum,
u_int32_t fullhash,
BRT_MSG cmd,
BRTNODE parent,
BRT t,
CACHEKEY nodenum,
u_int32_t fullhash,
BRT_MSG cmd,
BRTNODE parent,
int parents_childnum,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
uint64_t * workdone
uint64_t * workdone,
bool *made_change_p
)
{
BRTNODE node = NULL;
BRTNODE node = NULL;
void *node_v;
int r = toku_cachetable_get_and_pin_if_in_memory(
t->cf,
nodenum,
fullhash,
t->cf,
nodenum,
fullhash,
&node_v
);
if (r) { goto exit; }
node = node_v;
node = node_v;
apply_cmd_to_in_memory_non_root_leaves_starting_at_node(t, node, cmd, FALSE, parent, parents_childnum, ancestors, bounds, workdone, made_change_p);
apply_cmd_to_in_memory_non_root_leaves_starting_at_node(t, node, cmd, FALSE, parent, parents_childnum, ancestors, bounds, workdone);
toku_unpin_brtnode(t, node);
exit:
return;
......@@ -3119,7 +3179,7 @@ toku_brt_root_put_cmd (BRT brt, BRT_MSG_S * cmd)
// verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock)
invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn);
if (node->height > 0) {
apply_cmd_to_in_memory_non_root_leaves_starting_at_node(brt, node, cmd, TRUE, NULL, -1, (ANCESTORS)NULL, &infinite_bounds, NULL);
apply_cmd_to_in_memory_non_root_leaves_starting_at_node(brt, node, cmd, TRUE, NULL, -1, (ANCESTORS)NULL, &infinite_bounds, NULL, NULL);
if (nonleaf_node_is_gorged(node)) {
// No need for a loop here. We only inserted one message, so flushing a single child suffices.
flush_some_child(brt, node, TRUE, TRUE,
......@@ -4871,6 +4931,10 @@ int toku_brt_cursor (
cursor->brt = brt;
cursor->current_in_omt = FALSE;
cursor->prefetching = FALSE;
toku_init_dbt(&cursor->range_lock_left_key);
toku_init_dbt(&cursor->range_lock_right_key);
cursor->left_is_neg_infty = FALSE;
cursor->right_is_pos_infty = FALSE;
cursor->oldest_living_xid = ttxn ? toku_logger_get_oldest_living_xid(ttxn->logger, NULL) : TXNID_NONE;
cursor->is_snapshot_read = is_snapshot_read;
cursor->is_leaf_mode = FALSE;
......@@ -4895,6 +4959,31 @@ toku_brt_cursor_is_leaf_mode(BRT_CURSOR brtcursor) {
return brtcursor->is_leaf_mode;
}
void
toku_brt_cursor_set_range_lock(BRT_CURSOR cursor, const DBT *left, const DBT *right,
BOOL left_is_neg_infty, BOOL right_is_pos_infty)
{
if (cursor->range_lock_left_key.data) {
toku_destroy_dbt(&cursor->range_lock_left_key);
}
if (cursor->range_lock_right_key.data) {
toku_destroy_dbt(&cursor->range_lock_right_key);
}
if (left_is_neg_infty) {
cursor->left_is_neg_infty = TRUE;
} else {
toku_fill_dbt(&cursor->range_lock_left_key,
toku_xmemdup(left->data, left->size), left->size);
}
if (right_is_pos_infty) {
cursor->right_is_pos_infty = TRUE;
} else {
toku_fill_dbt(&cursor->range_lock_right_key,
toku_xmemdup(right->data, right->size), right->size);
}
}
// Called during cursor destruction
// It is the same as brt_cursor_invalidate, except that
// we make sure the callback function is never called.
......@@ -4909,6 +4998,12 @@ brt_cursor_invalidate_no_callback(BRT_CURSOR brtcursor) {
int toku_brt_cursor_close(BRT_CURSOR cursor) {
brt_cursor_invalidate_no_callback(cursor);
brt_cursor_cleanup_dbts(cursor);
if (cursor->range_lock_left_key.data) {
toku_destroy_dbt(&cursor->range_lock_left_key);
}
if (cursor->range_lock_right_key.data) {
toku_destroy_dbt(&cursor->range_lock_right_key);
}
toku_list_remove(&cursor->cursors_link);
toku_omt_cursor_destroy(&cursor->omtcursor);
toku_free_n(cursor, sizeof *cursor);
......@@ -5248,7 +5343,7 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
if (node->height > 0) { goto exit; }
// know we are a leaf node
// need to apply messages to each basement node
// TODO: (Zardosht) cilkify this, watch out for setting of max_msn_applied_to_node
// TODO: (Zardosht) cilkify this
for (int i = 0; i < node->n_children; i++) {
BOOL requires_msg_application = partition_requires_msg_application(
node,
......@@ -5256,46 +5351,48 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
ancestors
);
if (!requires_msg_application) {
continue;
}
update_stats = TRUE;
int height = 0;
BASEMENTNODE curr_bn = BLB(node, i);
if (!requires_msg_application) {
continue;
}
update_stats = TRUE;
int height = 0;
BASEMENTNODE curr_bn = BLB(node, i);
SUBTREE_EST curr_se = &BP_SUBTREE_EST(node,i);
ANCESTORS curr_ancestors = ancestors;
struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds);
while (curr_ancestors) {
height++;
apply_buffer_messages_to_basement_node(
t,
curr_bn,
curr_se,
curr_ancestors->node,
curr_ancestors->childnum,
&curr_bounds
);
curr_bn->max_dsn_applied = (curr_ancestors->node->dsn.dsn > curr_bn->max_dsn_applied.dsn)
? curr_ancestors->node->dsn
struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds);
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
height++;
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
apply_buffer_messages_to_basement_node(
t,
curr_bn,
curr_se,
curr_ancestors->node,
curr_ancestors->childnum,
&curr_bounds
);
// we don't want to check this node again if the next time
// we query it, the msn hasn't changed.
curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
}
curr_bn->max_dsn_applied = (curr_ancestors->node->dsn.dsn > curr_bn->max_dsn_applied.dsn)
? curr_ancestors->node->dsn
: curr_bn->max_dsn_applied;
curr_ancestors= curr_ancestors->next;
}
}
}
// Must update the leaf estimates. Might as well use the estimates from the soft copy (even if they make it out to disk), since they are
// the best estimates we have.
if (update_stats) {
toku_brt_leaf_reset_calc_leaf_stats(node);
{
ANCESTORS curr_ancestors = ancestors;
BRTNODE prev_node = node;
while (curr_ancestors) {
BRTNODE next_node = curr_ancestors->node;
fixup_child_estimates(next_node, curr_ancestors->childnum, prev_node, FALSE);
prev_node = next_node;
curr_ancestors = curr_ancestors->next;
}
}
toku_brt_leaf_reset_calc_leaf_stats(node);
{
ANCESTORS curr_ancestors = ancestors;
BRTNODE prev_node = node;
while (curr_ancestors) {
BRTNODE next_node = curr_ancestors->node;
fixup_child_estimates(next_node, curr_ancestors->childnum, prev_node, FALSE);
prev_node = next_node;
curr_ancestors = curr_ancestors->next;
}
}
}
exit:
VERIFY_NODE(t, node);
......@@ -5304,11 +5401,11 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
// This is a bottom layer of the search functions.
static int
brt_search_basement_node(
BASEMENTNODE bn,
brt_search_t *search,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
BOOL *doprefetch,
BASEMENTNODE bn,
brt_search_t *search,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
BOOL *doprefetch,
BLOCKNUM thisnodename,
u_int32_t fullhash,
BRT_CURSOR brtcursor
......@@ -5328,122 +5425,145 @@ brt_search_basement_node(
OMTVALUE datav;
u_int32_t idx = 0;
int r = toku_omt_find(bn->buffer,
heaviside_from_search_t,
search,
direction,
&datav, &idx, NULL);
heaviside_from_search_t,
search,
direction,
&datav, &idx, NULL);
if (r!=0) return r;
LEAFENTRY le = datav;
if (toku_brt_cursor_is_leaf_mode(brtcursor))
goto got_a_good_value; // leaf mode cursors see all leaf entries
goto got_a_good_value; // leaf mode cursors see all leaf entries
if (is_le_val_del(le,brtcursor)) {
// Provisionally deleted stuff is gone.
// So we need to scan in the direction to see if we can find something
while (1) {
switch (search->direction) {
case BRT_SEARCH_LEFT:
idx++;
if (idx>=toku_omt_size(bn->buffer)) return DB_NOTFOUND;
break;
case BRT_SEARCH_RIGHT:
if (idx==0) return DB_NOTFOUND;
idx--;
break;
default:
assert(FALSE);
}
r = toku_omt_fetch(bn->buffer, idx, &datav, NULL);
assert_zero(r); // we just validated the index
le = datav;
if (!is_le_val_del(le,brtcursor)) goto got_a_good_value;
}
// Provisionally deleted stuff is gone.
// So we need to scan in the direction to see if we can find something
while (1) {
switch (search->direction) {
case BRT_SEARCH_LEFT:
idx++;
if (idx>=toku_omt_size(bn->buffer)) return DB_NOTFOUND;
break;
case BRT_SEARCH_RIGHT:
if (idx==0) return DB_NOTFOUND;
idx--;
break;
default:
assert(FALSE);
}
r = toku_omt_fetch(bn->buffer, idx, &datav, NULL);
assert_zero(r); // we just validated the index
le = datav;
if (!is_le_val_del(le,brtcursor)) goto got_a_good_value;
}
}
got_a_good_value:
{
u_int32_t keylen;
void *key;
u_int32_t vallen;
void *val;
r = brt_cursor_extract_key_and_val(le,
brtcursor,
&keylen,
&key,
&vallen,
&val);
assert(brtcursor->current_in_omt == FALSE);
if (r==0) {
r = getf(keylen, key, vallen, val, getf_v);
}
if (r==0) {
// Leave the omtcursor alone above (pass NULL to omt_find/fetch)
// This prevents the omt from calling associate(), which would
// require a lock to keep the list of cursors safe when the omt
// is used by the brt. (We don't want to impose the locking requirement
// on the omt for non-brt uses.)
//
// Instead, all associating of omtcursors with omts (for leaf nodes)
// is done in brt_cursor_update.
brtcursor->leaf_info.to_be.omt = bn->buffer;
brtcursor->leaf_info.to_be.index = idx;
u_int32_t keylen;
void *key;
u_int32_t vallen;
void *val;
r = brt_cursor_extract_key_and_val(le,
brtcursor,
&keylen,
&key,
&vallen,
&val);
assert(brtcursor->current_in_omt == FALSE);
if (r==0) {
r = getf(keylen, key, vallen, val, getf_v);
}
if (r==0) {
// Leave the omtcursor alone above (pass NULL to omt_find/fetch)
// This prevents the omt from calling associate(), which would
// require a lock to keep the list of cursors safe when the omt
// is used by the brt. (We don't want to impose the locking requirement
// on the omt for non-brt uses.)
//
// Instead, all associating of omtcursors with omts (for leaf nodes)
// is done in brt_cursor_update.
brtcursor->leaf_info.to_be.omt = bn->buffer;
brtcursor->leaf_info.to_be.index = idx;
brtcursor->leaf_info.fullhash = fullhash;
brtcursor->leaf_info.blocknumber = thisnodename;
brt_cursor_update(brtcursor);
//The search was successful. Prefetching can continue.
*doprefetch = TRUE;
}
brt_cursor_update(brtcursor);
//The search was successful. Prefetching can continue.
*doprefetch = TRUE;
}
}
return r;
}
static int
brt_search_node (
BRT brt,
BRTNODE node,
brt_search_t *search,
BRT brt,
BRTNODE node,
brt_search_t *search,
int child_to_search,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
BOOL *doprefetch,
BRT_CURSOR brtcursor,
UNLOCKERS unlockers,
ANCESTORS,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
BOOL *doprefetch,
BRT_CURSOR brtcursor,
UNLOCKERS unlockers,
ANCESTORS,
struct pivot_bounds const * const bounds
);
// the number of nodes to prefetch
#define TOKU_DO_PREFETCH 0
#define TOKU_DO_PREFETCH 1
#if TOKU_DO_PREFETCH
static int
brtnode_fetch_callback_and_free_bfe(CACHEFILE cf, int fd, BLOCKNUM nodename, u_int32_t fullhash, void **brtnode_pv, long *sizep, int *dirtyp, void *extraargs)
{
int r = toku_brtnode_fetch_callback(cf, fd, nodename, fullhash, brtnode_pv, sizep, dirtyp, extraargs);
destroy_bfe_for_prefetch(extraargs);
toku_free(extraargs);
return r;
}
static int
brtnode_pf_callback_and_free_bfe(void *brtnode_pv, void *read_extraargs, int fd, long *sizep)
{
int r = toku_brtnode_pf_callback(brtnode_pv, read_extraargs, fd, sizep);
destroy_bfe_for_prefetch(read_extraargs);
toku_free(read_extraargs);
return r;
}
static void
brt_node_maybe_prefetch(BRT brt, BRTNODE node, int childnum, BRT_CURSOR brtcursor, BOOL *doprefetch) {
// if we want to prefetch in the tree
// if we want to prefetch in the tree
// then prefetch the next children if there are any
if (*doprefetch && brt_cursor_prefetching(brtcursor)) {
int i;
for (i=0; i<TOKU_DO_PREFETCH; i++) {
int nextchildnum = childnum+i+1;
if (nextchildnum >= node->n_children)
break;
BLOCKNUM nextchildblocknum = BP_BLOCKNUM(node, nextchildnum);
u_int32_t nextfullhash = compute_child_fullhash(brt->cf, node, nextchildnum);
toku_cachefile_prefetch(
brt->cf,
nextchildblocknum,
nextfullhash,
toku_brtnode_flush_callback,
toku_brtnode_fetch_callback,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
brt->h,
brt->h
);
*doprefetch = FALSE;
}
int rc = brt_cursor_rightmost_child_wanted(brtcursor, brt, node);
for (int i = childnum + 1; (i <= childnum + TOKU_DO_PREFETCH) && (i <= rc); i++) {
BLOCKNUM nextchildblocknum = BP_BLOCKNUM(node, i);
u_int32_t nextfullhash = compute_child_fullhash(brt->cf, node, i);
struct brtnode_fetch_extra *MALLOC(bfe);
fill_bfe_for_prefetch(bfe, brt->h, brt, brtcursor);
BOOL doing_prefetch = FALSE;
toku_cachefile_prefetch(
brt->cf,
nextchildblocknum,
nextfullhash,
toku_brtnode_flush_callback,
brtnode_fetch_callback_and_free_bfe,
toku_brtnode_pe_callback,
toku_brtnode_pf_req_callback,
brtnode_pf_callback_and_free_bfe,
bfe,
brt->h,
&doing_prefetch
);
if (!doing_prefetch) {
destroy_bfe_for_prefetch(bfe);
toku_free(bfe);
}
*doprefetch = FALSE;
}
}
}
......@@ -5467,49 +5587,54 @@ unlock_brtnode_fun (void *v) {
/* search in a node's child */
static int
brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, BOOL *doprefetch, BRT_CURSOR brtcursor, UNLOCKERS unlockers,
ANCESTORS ancestors, struct pivot_bounds const * const bounds)
ANCESTORS ancestors, struct pivot_bounds const * const bounds)
// Effect: Search in a node's child. Searches are read-only now (at least as far as the hardcopy is concerned).
{
struct ancestors next_ancestors = {node, childnum, ancestors};
struct ancestors next_ancestors = {node, childnum, ancestors};
BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
u_int32_t fullhash = compute_child_fullhash(brt->cf, node, childnum);
u_int32_t fullhash = compute_child_fullhash(brt->cf, node, childnum);
BRTNODE childnode;
struct brtnode_fetch_extra bfe;
fill_bfe_for_subset_read(
&bfe,
&bfe,
brt->h,
brt,
search
search,
&brtcursor->range_lock_left_key,
&brtcursor->range_lock_right_key,
brtcursor->left_is_neg_infty,
brtcursor->right_is_pos_infty
);
{
int rr = toku_pin_brtnode(brt, childblocknum, fullhash,
unlockers,
&next_ancestors, bounds,
&bfe,
&childnode);
if (rr==TOKUDB_TRY_AGAIN) return rr;
assert(rr==0);
int rr = toku_pin_brtnode(brt, childblocknum, fullhash,
unlockers,
&next_ancestors, bounds,
&bfe,
&childnode);
if (rr==TOKUDB_TRY_AGAIN) return rr;
assert(rr==0);
}
struct unlock_brtnode_extra unlock_extra = {brt,childnode};
struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers};
struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers};
int r = brt_search_node(brt, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, brtcursor, &next_unlockers, &next_ancestors, bounds);
if (r!=TOKUDB_TRY_AGAIN) {
// Even if r is reactive, we want to handle the maybe reactive child.
// Even if r is reactive, we want to handle the maybe reactive child.
#if TOKU_DO_PREFETCH
// maybe prefetch the next child
if (r == 0)
brt_node_maybe_prefetch(brt, node, childnum, brtcursor, doprefetch);
// maybe prefetch the next child
if (r == 0 && node->height == 1) {
brt_node_maybe_prefetch(brt, node, childnum, brtcursor, doprefetch);
}
#endif
assert(next_unlockers.locked);
toku_unpin_brtnode(brt, childnode); // unpin the childnode before handling the reactive child (because that may make the childnode disappear.)
assert(next_unlockers.locked);
toku_unpin_brtnode(brt, childnode); // unpin the childnode before handling the reactive child (because that may make the childnode disappear.)
} else {
// try again.
// try again.
// there are two cases where we get TOKUDB_TRY_AGAIN
// case 1 is when some later call to toku_pin_brtnode returned
......@@ -5517,9 +5642,9 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
// is when brt_search_node had to stop its search because
// some piece of a node that it needed was not in memory. In this case,
// the node was not unpinned, so we unpin it here
if (next_unlockers.locked) {
if (next_unlockers.locked) {
toku_unpin_brtnode(brt, childnode);
}
}
}
return r;
......@@ -5560,7 +5685,7 @@ maybe_search_save_bound(
BRTNODE node,
int child_searched,
brt_search_t *search
)
)
{
DBT pivotkey;
toku_init_dbt(&pivotkey);
......@@ -5575,16 +5700,16 @@ maybe_search_save_bound(
static int
brt_search_node(
BRT brt,
BRTNODE node,
BRT brt,
BRTNODE node,
brt_search_t *search,
int child_to_search,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
BOOL *doprefetch,
BRT_CURSOR brtcursor,
BRT_GET_CALLBACK_FUNCTION getf,
void *getf_v,
BOOL *doprefetch,
BRT_CURSOR brtcursor,
UNLOCKERS unlockers,
ANCESTORS ancestors,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds
)
{ int r = 0;
......@@ -5597,9 +5722,9 @@ brt_search_node(
while (child_to_search >= 0 && child_to_search < node->n_children) {
//
// Normally, the child we want to use is available, as we checked
// before entering this while loop. However, if we pass through
// before entering this while loop. However, if we pass through
// the loop once, getting DB_NOTFOUND for this first value
// of child_to_search, we enter the while loop again with a
// of child_to_search, we enter the while loop again with a
// child_to_search that may not be in memory. If it is not,
// we need to return TOKUDB_TRY_AGAIN so the query can
// read the appropriate partition into memory
......@@ -5610,22 +5735,22 @@ brt_search_node(
const struct pivot_bounds next_bounds = next_pivot_keys(node, child_to_search, bounds);
if (node->height > 0) {
r = brt_search_child(
brt,
node,
child_to_search,
search,
getf,
getf_v,
doprefetch,
brtcursor,
unlockers,
ancestors,
brt,
node,
child_to_search,
search,
getf,
getf_v,
doprefetch,
brtcursor,
unlockers,
ancestors,
&next_bounds
);
}
else {
r = brt_search_basement_node(
BLB(node, child_to_search),
BLB(node, child_to_search),
search,
getf,
getf_v,
......@@ -5636,10 +5761,10 @@ brt_search_node(
);
}
if (r == 0) return r; //Success
if (r != DB_NOTFOUND) {
return r; //Error (or message to quit early, such as TOKUDB_FOUND_BUT_REJECTED or TOKUDB_TRY_AGAIN)
}
}
// we have a new pivotkey
else {
// If we got a DB_NOTFOUND then we have to search the next record. Possibly everything present is not visible.
......@@ -5665,7 +5790,7 @@ brt_search_node(
else {
child_to_search--;
}
}
}
return r;
}
......@@ -5718,7 +5843,11 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
&bfe,
brt->h,
brt,
search
search,
&brtcursor->range_lock_left_key,
&brtcursor->range_lock_right_key,
brtcursor->left_is_neg_infty,
brtcursor->right_is_pos_infty
);
toku_pin_brtnode_holding_lock(brt, *rootp, fullhash, NULL, &infinite_bounds, &bfe, &node);
......@@ -5952,7 +6081,7 @@ brt_cursor_maybe_get_and_pin_leaf(BRT_CURSOR brtcursor, BRTNODE* leafp) {
int r = toku_cachetable_maybe_get_and_pin_clean(brtcursor->brt->cf,
brtcursor->leaf_info.blocknumber,
brtcursor->leaf_info.fullhash,
&leafv);
&leafv);
if (r == 0) {
*leafp = leafv;
}
......
......@@ -192,6 +192,7 @@ typedef struct brt_cursor *BRT_CURSOR;
int toku_brt_cursor (BRT, BRT_CURSOR*, TOKUTXN, BOOL) __attribute__ ((warn_unused_result));
void toku_brt_cursor_set_leaf_mode(BRT_CURSOR);
int toku_brt_cursor_is_leaf_mode(BRT_CURSOR);
void toku_brt_cursor_set_range_lock(BRT_CURSOR, const DBT *, const DBT *, BOOL, BOOL);
// get is deprecated in favor of the individual functions below
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, int get_flags) __attribute__ ((warn_unused_result));
......
......@@ -24,6 +24,7 @@
// use worker threads 0->no 1->yes
static void cachetable_writer(WORKITEM);
static void cachetable_reader(WORKITEM);
static void cachetable_partial_reader(WORKITEM);
#define TRACE_CACHETABLE 0
#if TRACE_CACHETABLE
......@@ -1466,6 +1467,44 @@ write_pair_for_checkpoint (CACHETABLE ct, PAIR p, BOOL write_if_dirty)
}
}
static void
do_partial_fetch(CACHETABLE ct, CACHEFILE cachefile, PAIR p, CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback, void *read_extraargs)
{
long old_size = p->size;
long size = 0;
//
// The reason we have this assert is a sanity check
// to make sure that it is ok to set the
// state of the pair to CTPAIR_READING.
//
// As of this writing, the checkpoint code assumes
// that every pair that is in the CTPAIR_READING state
// is not dirty. Because we require dirty nodes to be
// fully in memory, we should never have a dirty node
// require a partial fetch. So, just to be sure that
// we can set the pair to CTPAIR_READING, we assert
// that the pair is not dirty
//
assert(!p->dirty);
p->state = CTPAIR_READING;
rwlock_prefer_read_lock(&cachefile->fdlock, ct->mutex);
cachetable_unlock(ct);
int r = pf_callback(p->value, read_extraargs, cachefile->fd, &size);
lazy_assert_zero(r);
cachetable_lock(ct);
rwlock_read_unlock(&cachefile->fdlock);
p->size = size;
ct->size_current += size;
ct->size_current -= old_size;
p->state = CTPAIR_IDLE;
if (p->cq) {
workitem_init(&p->asyncwork, NULL, p);
workqueue_enq(p->cq, &p->asyncwork, 1);
}
rwlock_write_unlock(&p->rwlock);
}
// for debugging
// valid only if this function is called only by a single thread
static u_int64_t get_and_pin_footprint = 0;
......@@ -1570,22 +1609,11 @@ int toku_cachetable_get_and_pin (
if (do_wait_time) {
cachetable_waittime += get_tnow() - t0;
}
t0 = get_tnow();
long old_size = p->size;
long size = 0;
rwlock_prefer_read_lock(&cachefile->fdlock, ct->mutex);
cachetable_unlock(ct);
int r = pf_callback(p->value, read_extraargs, cachefile->fd, &size);
cachetable_lock(ct);
rwlock_read_unlock(&cachefile->fdlock);
p->size = size;
// set the state of the pair back
p->state = CTPAIR_IDLE;
ct->size_current += size;
ct->size_current -= old_size;
lazy_assert_zero(r);
t0 = get_tnow();
do_partial_fetch(ct, cachefile, p, pf_callback, read_extraargs);
cachetable_waittime += get_tnow() - t0;
rwlock_write_unlock(&p->rwlock);
rwlock_read_lock(&p->rwlock, ct->mutex);
}
......@@ -1917,10 +1945,11 @@ int toku_cachetable_get_and_pin_nonblocking (
run_unlockers(unlockers); // The contract says the unlockers are run with the ct lock being held.
if (ct->ydb_unlock_callback) ct->ydb_unlock_callback();
// Now wait for the I/O to occur.
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
long old_size = p->size;
long size = 0;
do_partial_fetch(ct, cf, p, pf_callback, read_extraargs);
cachetable_unlock(ct);
<<<<<<< .working
int r = pf_callback(p->value, read_extraargs, cf->fd, &size);
lazy_assert_zero(r);
cachetable_lock(ct);
......@@ -1932,6 +1961,8 @@ int toku_cachetable_get_and_pin_nonblocking (
ct->size_current -= old_size;
rwlock_write_unlock(&p->rwlock);
cachetable_unlock(ct);
=======
>>>>>>> .merge-right.r33536
if (ct->ydb_lock_callback) ct->ydb_lock_callback();
return TOKUDB_TRY_AGAIN;
}
......@@ -1982,17 +2013,21 @@ struct cachefile_prefetch_args {
void* read_extraargs;
};
//
// PREFETCHING DOES NOT WORK IN MAXWELL AS OF NOW!
//
struct cachefile_partial_prefetch_args {
PAIR p;
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback;
void *read_extraargs;
};
int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)),
void *read_extraargs,
void *write_extraargs)
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
void *read_extraargs,
void *write_extraargs,
BOOL *doing_prefetch)
// Effect: See the documentation for this function in cachetable.h
{
// TODO: Fix prefetching, as part of ticket 3635
......@@ -2005,12 +2040,15 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
// It may be another callback. That is way too many callbacks that are being used
// Fixing this in a clean, simple way requires some thought.
if (0) printf("%s:%d %"PRId64"\n", __FUNCTION__, __LINE__, key.b);
if (doing_prefetch) {
*doing_prefetch = FALSE;
}
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
// lookup
PAIR p;
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
if (p->key.b==key.b && p->cachefile==cf) {
if (p->key.b==key.b && p->cachefile==cf) {
//Maybe check for pending and do write_pair_for_checkpoint()?
pair_touch(p);
break;
......@@ -2020,15 +2058,36 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
// if not found then create a pair in the READING state and fetch it
if (p == 0) {
cachetable_prefetches++;
p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, pe_callback, write_extraargs, CACHETABLE_CLEAN);
p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, pe_callback, write_extraargs, CACHETABLE_CLEAN);
assert(p);
rwlock_write_lock(&p->rwlock, ct->mutex);
struct cachefile_prefetch_args *cpargs = toku_xmalloc(sizeof(struct cachefile_prefetch_args));
struct cachefile_prefetch_args *MALLOC(cpargs);
cpargs->p = p;
cpargs->fetch_callback = fetch_callback;
cpargs->read_extraargs = read_extraargs;
workitem_init(&p->asyncwork, cachetable_reader, cpargs);
workqueue_enq(&ct->wq, &p->asyncwork, 0);
if (doing_prefetch) {
*doing_prefetch = TRUE;
}
} else if (p->state == CTPAIR_IDLE && (rwlock_users(&p->rwlock)==0)) {
// nobody else is using the node, so we should go ahead and prefetch
rwlock_read_lock(&p->rwlock, ct->mutex);
BOOL partial_fetch_required = pf_req_callback(p->value, read_extraargs);
rwlock_read_unlock(&p->rwlock);
if (partial_fetch_required) {
rwlock_write_lock(&p->rwlock, ct->mutex);
struct cachefile_partial_prefetch_args *MALLOC(cpargs);
cpargs->p = p;
cpargs->pf_callback = pf_callback;
cpargs->read_extraargs = read_extraargs;
workitem_init(&p->asyncwork, cachetable_partial_reader, cpargs);
workqueue_enq(&ct->wq, &p->asyncwork, 0);
if (doing_prefetch) {
*doing_prefetch = TRUE;
}
}
}
cachetable_unlock(ct);
return 0;
......@@ -2691,16 +2750,25 @@ static void cachetable_reader(WORKITEM wi) {
// This is only called in toku_cachefile_prefetch, by putting it on a workqueue
// The problem is described in comments in toku_cachefile_prefetch
cachetable_fetch_pair(
ct,
cpargs->p->cachefile,
cpargs->p,
cpargs->fetch_callback,
ct,
cpargs->p->cachefile,
cpargs->p,
cpargs->fetch_callback,
cpargs->read_extraargs
);
cachetable_unlock(ct);
toku_free(cpargs);
}
static void cachetable_partial_reader(WORKITEM wi) {
struct cachefile_partial_prefetch_args *cpargs = workitem_arg(wi);
CACHETABLE ct = cpargs->p->cachefile->cachetable;
cachetable_lock(ct);
do_partial_fetch(ct, cpargs->p->cachefile, cpargs->p, cpargs->pf_callback, cpargs->read_extraargs);
cachetable_unlock(ct);
toku_free(cpargs);
}
// debug functions
......
......@@ -269,10 +269,11 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
CACHETABLE_PARTIAL_EVICTION_CALLBACK pe_callback,
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback __attribute__((unused)),
CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
CACHETABLE_PARTIAL_FETCH_CALLBACK pf_callback,
void *read_extraargs,
void *write_extraargs);
void *write_extraargs,
BOOL *doing_prefetch);
// Effect: Prefetch a memory object for a given key into the cachetable
// Precondition: The cachetable mutex is NOT held.
// Postcondition: The cachetable mutex is NOT held.
......
......@@ -775,6 +775,7 @@ toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
uint32_t hash = log->older_hash;
CACHEFILE cf = txn->logger->rollback_cachefile;
struct brt_header *h = toku_cachefile_get_userdata(cf);
BOOL doing_prefetch = FALSE;
r = toku_cachefile_prefetch(cf, name, hash,
toku_rollback_flush_callback,
toku_rollback_fetch_callback,
......@@ -782,7 +783,8 @@ toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
toku_brtnode_pf_req_callback,
toku_brtnode_pf_callback,
h,
h);
h,
&doing_prefetch);
assert(r==0);
}
return r;
......
......@@ -76,7 +76,7 @@ static void cachetable_prefetch_checkpoint_test(int n, enum cachetable_dirty dir
{
CACHEKEY key = make_blocknum(n+1);
u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
}
......
......@@ -76,7 +76,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until
......
......@@ -77,7 +77,7 @@ static void cachetable_prefetch_close_leak_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until
......
......@@ -77,7 +77,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// close with the prefetch in progress. the close should block until
......
......@@ -92,7 +92,7 @@ static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) {
for (i=0; i<cachetable_size_limit; i++) {
CACHEKEY key = make_blocknum(i);
u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
}
......@@ -103,7 +103,7 @@ static void cachetable_prefetch_flowcontrol_test (int cachetable_size_limit) {
for (i=i; i<2*cachetable_size_limit; i++) {
CACHEKEY key = make_blocknum(i);
u_int32_t fullhash = toku_cachetable_hash(f1, key);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// sleep(1);
}
......
......@@ -81,7 +81,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// verify that get_and_pin waits while the prefetch is in progress
......
......@@ -82,7 +82,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// verify that get_and_pin waits while the prefetch is in progress
......
......@@ -73,7 +73,7 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// verify that maybe_get_and_pin returns an error while the prefetch is in progress
......
......@@ -77,11 +77,11 @@ static void cachetable_prefetch_maybegetandpin_test (void) {
// prefetch block 0. this will take 10 seconds.
CACHEKEY key = make_blocknum(0);
u_int32_t fullhash = toku_cachetable_hash(f1, make_blocknum(0));
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// prefetch again. this should do nothing.
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0);
r = toku_cachefile_prefetch(f1, key, fullhash, flush, fetch, pe_callback, pf_req_callback, pf_callback, 0, 0, NULL);
toku_cachetable_verify(ct);
// verify that maybe_get_and_pin returns an error while the prefetch is in progress
......
......@@ -5497,13 +5497,16 @@ toku_db_key_range64(DB* db, DB_TXN* txn __attribute__((__unused__)), DBT* key, u
return r;
}
static int
static int
toku_c_pre_acquire_range_lock(DBC *dbc, const DBT *key_left, const DBT *key_right) {
DB *db = dbc->dbp;
DB_TXN *txn = dbc_struct_i(dbc)->txn;
HANDLE_PANICKED_DB(db);
if (!db->i->lt || !txn)
return EINVAL;
toku_brt_cursor_set_range_lock(dbc_struct_i(dbc)->c, key_left, key_right,
(key_left == toku_lt_neg_infinity),
(key_right == toku_lt_infinity));
if (!db->i->lt || !txn)
return 0;
//READ_UNCOMMITTED and READ_COMMITTED transactions do not need read locks.
if (!dbc_struct_i(dbc)->rmw && dbc_struct_i(dbc)->iso != TOKU_ISO_SERIALIZABLE)
return 0;
......@@ -5519,7 +5522,7 @@ toku_c_pre_acquire_range_lock(DBC *dbc, const DBT *key_left, const DBT *key_righ
int
toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) {
HANDLE_PANICKED_DB(db);
if (!db->i->lt || !txn) return EINVAL;
if (!db->i->lt || !txn) return 0;
int r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment