Commit 9858bf38 authored by John Esmet's avatar John Esmet

refs #5770 Only check one basement node on pin, remove the assumption that adjacent

available nodes are query-able.
parent 08adc6e1
...@@ -193,6 +193,11 @@ toku_create_new_ftnode ( ...@@ -193,6 +193,11 @@ toku_create_new_ftnode (
NULL); NULL);
} }
//
// On success, this function assumes that the caller is trying to pin the node
// with a PL_READ lock. If message application is needed,
// then a PL_WRITE_CHEAP lock is grabbed
//
int int
toku_pin_ftnode_batched( toku_pin_ftnode_batched(
FT_HANDLE brt, FT_HANDLE brt,
...@@ -202,15 +207,22 @@ toku_pin_ftnode_batched( ...@@ -202,15 +207,22 @@ toku_pin_ftnode_batched(
ANCESTORS ancestors, ANCESTORS ancestors,
const PIVOT_BOUNDS bounds, const PIVOT_BOUNDS bounds,
FTNODE_FETCH_EXTRA bfe, FTNODE_FETCH_EXTRA bfe,
pair_lock_type lock_type,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
FTNODE *node_p, FTNODE *node_p,
bool* msgs_applied) bool* msgs_applied)
{ {
void *node_v; void *node_v;
*msgs_applied = false; *msgs_applied = false;
pair_lock_type needed_lock_type = lock_type; FTNODE node = nullptr;
try_again_for_write_lock: MSN max_msn_in_path = ZERO_MSN;
bool needs_ancestors_messages = false;
// this function assumes that if you want ancestor messages applied,
// you are doing a read for a query. This is so we can make some optimizations
// below.
if (apply_ancestor_messages) {
paranoid_invariant(bfe->type == ftnode_fetch_subset);
}
int r = toku_cachetable_get_and_pin_nonblocking_batched( int r = toku_cachetable_get_and_pin_nonblocking_batched(
brt->ft->cf, brt->ft->cf,
blocknum, blocknum,
...@@ -221,63 +233,82 @@ toku_pin_ftnode_batched( ...@@ -221,63 +233,82 @@ toku_pin_ftnode_batched(
toku_ftnode_fetch_callback, toku_ftnode_fetch_callback,
toku_ftnode_pf_req_callback, toku_ftnode_pf_req_callback,
toku_ftnode_pf_callback, toku_ftnode_pf_callback,
needed_lock_type, PL_READ,
bfe, //read_extraargs bfe, //read_extraargs
unlockers); unlockers);
if (r==0) { if (r != 0) {
FTNODE node = static_cast<FTNODE>(node_v); assert(r == TOKUDB_TRY_AGAIN); // Any other error and we should bomb out ASAP.
MSN max_msn_in_path; goto exit;
bool needs_ancestors_messages = false; }
if (apply_ancestor_messages && node->height == 0) { node = static_cast<FTNODE>(node_v);
needs_ancestors_messages = toku_ft_leaf_needs_ancestors_messages(brt->ft, node, ancestors, bounds, &max_msn_in_path); if (apply_ancestor_messages && node->height == 0) {
if (needs_ancestors_messages && needed_lock_type == PL_READ) { needs_ancestors_messages = toku_ft_leaf_needs_ancestors_messages(
toku_unpin_ftnode_read_only(brt->ft, node); brt->ft,
needed_lock_type = PL_WRITE_CHEAP; node,
goto try_again_for_write_lock; ancestors,
bounds,
&max_msn_in_path,
bfe->child_to_read
);
if (needs_ancestors_messages) {
toku_unpin_ftnode_read_only(brt->ft, node);
int rr = toku_cachetable_get_and_pin_nonblocking_batched(
brt->ft->cf,
blocknum,
fullhash,
&node_v,
NULL,
get_write_callbacks_for_node(brt->ft),
toku_ftnode_fetch_callback,
toku_ftnode_pf_req_callback,
toku_ftnode_pf_callback,
PL_WRITE_CHEAP,
bfe, //read_extraargs
unlockers);
if (rr != 0) {
assert(rr == TOKUDB_TRY_AGAIN); // Any other error and we should bomb out ASAP.
r = TOKUDB_TRY_AGAIN;
goto exit;
} }
} node = static_cast<FTNODE>(node_v);
if (apply_ancestor_messages && node->height == 0) { toku_apply_ancestors_messages_to_node(
if (needs_ancestors_messages) { brt,
invariant(needed_lock_type != PL_READ); node,
toku_apply_ancestors_messages_to_node(brt, node, ancestors, bounds, msgs_applied); ancestors,
} else { bounds,
// At this point, we aren't going to run msgs_applied,
// toku_apply_ancestors_messages_to_node but that doesn't bfe->child_to_read
// mean max_msn_applied shouldn't be updated if possible );
// (this saves the CPU work involved in } else {
// toku_ft_leaf_needs_ancestors_messages). // At this point, we aren't going to run
// // toku_apply_ancestors_messages_to_node but that doesn't
// We still have a read lock, so we have not resolved // mean max_msn_applied shouldn't be updated if possible
// checkpointing. If the node is pending and dirty, we // (this saves the CPU work involved in
// can't modify anything, including max_msn, until we // toku_ft_leaf_needs_ancestors_messages).
// resolve checkpointing. If we do, the node might get //
// written out that way as part of a checkpoint with a // We still have a read lock, so we have not resolved
// root that was already written out with a smaller // checkpointing. If the node is pending and dirty, we
// max_msn. During recovery, we would then inject a // can't modify anything, including max_msn, until we
// message based on the root's max_msn, and that message // resolve checkpointing. If we do, the node might get
// would get filtered by the leaf because it had too high // written out that way as part of a checkpoint with a
// a max_msn value. (see #5407) // root that was already written out with a smaller
// // max_msn. During recovery, we would then inject a
// So for simplicity we only update the max_msn if the // message based on the root's max_msn, and that message
// node is clean. That way, in order for the node to get // would get filtered by the leaf because it had too high
// written out, it would have to be dirtied. That // a max_msn value. (see #5407)
// requires a write lock, and a write lock requires you to //
// resolve checkpointing. // So for simplicity we only update the max_msn if the
if (!node->dirty) { // node is clean. That way, in order for the node to get
toku_ft_bn_update_max_msn(node, max_msn_in_path); // written out, it would have to be dirtied. That
} // requires a write lock, and a write lock requires you to
// resolve checkpointing.
if (!node->dirty) {
toku_ft_bn_update_max_msn(node, max_msn_in_path, bfe->child_to_read);
} }
invariant(needed_lock_type != PL_READ || !*msgs_applied);
}
if ((lock_type != PL_READ) && node->height > 0) {
toku_move_ftnode_messages_to_stale(brt->ft, node);
} }
*node_p = node;
// printf("%*sPin %ld\n", 8-node->height, "", blocknum.b);
} else {
assert(r==TOKUDB_TRY_AGAIN); // Any other error and we should bomb out ASAP.
// printf("%*sPin %ld try again\n", 8, "", blocknum.b);
} }
*node_p = node;
exit:
return r; return r;
} }
......
...@@ -150,7 +150,6 @@ toku_pin_ftnode_batched( ...@@ -150,7 +150,6 @@ toku_pin_ftnode_batched(
ANCESTORS ancestors, ANCESTORS ancestors,
const PIVOT_BOUNDS pbounds, const PIVOT_BOUNDS pbounds,
FTNODE_FETCH_EXTRA bfe, FTNODE_FETCH_EXTRA bfe,
pair_lock_type lock_type,
bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this bool apply_ancestor_messages, // this bool is probably temporary, for #3972, once we know how range query estimates work, will revisit this
FTNODE *node_p, FTNODE *node_p,
bool* msgs_applied bool* msgs_applied
......
...@@ -727,13 +727,6 @@ STAT64INFO_S toku_get_and_clear_basement_stats(FTNODE leafnode); ...@@ -727,13 +727,6 @@ STAT64INFO_S toku_get_and_clear_basement_stats(FTNODE leafnode);
#define VERIFY_NODE(t,n) ((void)0) #define VERIFY_NODE(t,n) ((void)0)
#endif #endif
//#define FT_TRACE
#ifdef FT_TRACE
#define WHEN_FTTRACE(x) x
#else
#define WHEN_FTTRACE(x) ((void)0)
#endif
void toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe); void toku_ft_status_update_pivot_fetch_reason(struct ftnode_fetch_extra *bfe);
void toku_ft_status_update_flush_reason(FTNODE node, uint64_t uncompressed_bytes_flushed, uint64_t bytes_written, tokutime_t write_time, bool for_checkpoint); void toku_ft_status_update_flush_reason(FTNODE node, uint64_t uncompressed_bytes_flushed, uint64_t bytes_written, tokutime_t write_time, bool for_checkpoint);
void toku_ft_status_update_serialize_times(FTNODE node, tokutime_t serialize_time, tokutime_t compress_time); void toku_ft_status_update_serialize_times(FTNODE node, tokutime_t serialize_time, tokutime_t compress_time);
...@@ -982,11 +975,11 @@ struct pivot_bounds { ...@@ -982,11 +975,11 @@ struct pivot_bounds {
__attribute__((nonnull)) __attribute__((nonnull))
void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node); void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node);
void toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool* msgs_applied); void toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool* msgs_applied, int child_to_read);
__attribute__((nonnull)) __attribute__((nonnull))
bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, MSN *const max_msn_in_path); bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, MSN *const max_msn_in_path, int child_to_read);
__attribute__((nonnull)) __attribute__((nonnull))
void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied); void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read);
__attribute__((const,nonnull)) __attribute__((const,nonnull))
size_t toku_ft_msg_memsize_in_fifo(FT_MSG cmd); size_t toku_ft_msg_memsize_in_fifo(FT_MSG cmd);
......
...@@ -4509,8 +4509,53 @@ bnc_apply_messages_to_basement_node( ...@@ -4509,8 +4509,53 @@ bnc_apply_messages_to_basement_node(
} }
} }
static void
apply_ancestors_messages_to_bn(
FT_HANDLE t,
FTNODE node,
int childnum,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
TXNID oldest_referenced_xid,
bool* msgs_applied
)
{
BASEMENTNODE curr_bn = BLB(node, childnum);
struct pivot_bounds curr_bounds = next_pivot_keys(node, childnum, bounds);
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
bnc_apply_messages_to_basement_node(
t,
curr_bn,
curr_ancestors->node,
curr_ancestors->childnum,
&curr_bounds,
oldest_referenced_xid,
msgs_applied
);
// We don't want to check this ancestor node again if the
// next time we query it, the msn hasn't changed.
curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
}
}
// At this point, we know all the stale messages above this
// basement node have been applied, and any new messages will be
// fresh, so we don't need to look at stale messages for this
// basement node, unless it gets evicted (and this field becomes
// false when it's read in again).
curr_bn->stale_ancestor_messages_applied = true;
}
void void
toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool* msgs_applied) toku_apply_ancestors_messages_to_node (
FT_HANDLE t,
FTNODE node,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
bool* msgs_applied,
int child_to_read
)
// Effect: // Effect:
// Bring a leaf node up-to-date according to all the messages in the ancestors. // Bring a leaf node up-to-date according to all the messages in the ancestors.
// If the leaf node is already up-to-date then do nothing. // If the leaf node is already up-to-date then do nothing.
...@@ -4521,7 +4566,7 @@ toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ances ...@@ -4521,7 +4566,7 @@ toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ances
// The entire root-to-leaf path is pinned and appears in the ancestors list. // The entire root-to-leaf path is pinned and appears in the ancestors list.
{ {
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
invariant(node->height == 0); paranoid_invariant(node->height == 0);
TXNID oldest_referenced_xid = ancestors->node->oldest_referenced_xid_known; TXNID oldest_referenced_xid = ancestors->node->oldest_referenced_xid_known;
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) { for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
...@@ -4530,44 +4575,104 @@ toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ances ...@@ -4530,44 +4575,104 @@ toku_apply_ancestors_messages_to_node (FT_HANDLE t, FTNODE node, ANCESTORS ances
} }
} }
// know we are a leaf node if (!node->dirty && child_to_read >= 0) {
// An important invariant: paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
// We MUST bring every available basement node up to date. apply_ancestors_messages_to_bn(
// flushing on the cleaner thread depends on this. This invariant t,
// allows the cleaner thread to just pick an internal node and flush it node,
// as opposed to being forced to start from the root. child_to_read,
for (int i = 0; i < node->n_children; i++) { ancestors,
if (BP_STATE(node, i) != PT_AVAIL) { continue; } bounds,
BASEMENTNODE curr_bn = BLB(node, i); oldest_referenced_xid,
struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds); msgs_applied
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) { );
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) { }
paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); else {
bnc_apply_messages_to_basement_node( // know we are a leaf node
t, // An important invariant:
curr_bn, // We MUST bring every available basement node for a dirty node up to date.
curr_ancestors->node, // flushing on the cleaner thread depends on this. This invariant
curr_ancestors->childnum, // allows the cleaner thread to just pick an internal node and flush it
&curr_bounds, // as opposed to being forced to start from the root.
oldest_referenced_xid, for (int i = 0; i < node->n_children; i++) {
msgs_applied if (BP_STATE(node, i) != PT_AVAIL) { continue; }
); apply_ancestors_messages_to_bn(
// We don't want to check this ancestor node again if the t,
// next time we query it, the msn hasn't changed. node,
curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk; i,
} ancestors,
bounds,
oldest_referenced_xid,
msgs_applied
);
} }
// At this point, we know all the stale messages above this
// basement node have been applied, and any new messages will be
// fresh, so we don't need to look at stale messages for this
// basement node, unless it gets evicted (and this field becomes
// false when it's read in again).
curr_bn->stale_ancestor_messages_applied = true;
} }
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
} }
bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, MSN *const max_msn_in_path) static bool bn_needs_ancestors_messages(
FT ft,
FTNODE node,
int childnum,
struct pivot_bounds const * const bounds,
ANCESTORS ancestors,
MSN* max_msn_applied
)
{
BASEMENTNODE bn = BLB(node, childnum);
struct pivot_bounds curr_bounds = next_pivot_keys(node, childnum, bounds);
bool needs_ancestors_messages = false;
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) {
paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum);
if (bnc->broadcast_list.size() > 0) {
needs_ancestors_messages = true;
goto cleanup;
}
if (!bn->stale_ancestor_messages_applied) {
uint32_t stale_lbi, stale_ube;
find_bounds_within_message_tree(&ft->cmp_descriptor,
ft->compare_fun,
bnc->stale_message_tree,
bnc->buffer,
&curr_bounds,
&stale_lbi,
&stale_ube);
if (stale_lbi < stale_ube) {
needs_ancestors_messages = true;
goto cleanup;
}
}
uint32_t fresh_lbi, fresh_ube;
find_bounds_within_message_tree(&ft->cmp_descriptor,
ft->compare_fun,
bnc->fresh_message_tree,
bnc->buffer,
&curr_bounds,
&fresh_lbi,
&fresh_ube);
if (fresh_lbi < fresh_ube) {
needs_ancestors_messages = true;
goto cleanup;
}
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied->msn) {
max_msn_applied->msn = curr_ancestors->node->max_msn_applied_to_node_on_disk.msn;
}
}
}
cleanup:
return needs_ancestors_messages;
}
bool toku_ft_leaf_needs_ancestors_messages(
FT ft,
FTNODE node,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
MSN *const max_msn_in_path,
int child_to_read
)
// Effect: Determine whether there are messages in a node's ancestors // Effect: Determine whether there are messages in a node's ancestors
// which must be applied to it. These messages are in the correct // which must be applied to it. These messages are in the correct
// keyrange for any available basement nodes, and are in nodes with the // keyrange for any available basement nodes, and are in nodes with the
...@@ -4586,72 +4691,64 @@ bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancesto ...@@ -4586,72 +4691,64 @@ bool toku_ft_leaf_needs_ancestors_messages(FT ft, FTNODE node, ANCESTORS ancesto
// we should exchange it for a write lock in preparation for applying // we should exchange it for a write lock in preparation for applying
// messages. If there are no messages, we don't need the write lock. // messages. If there are no messages, we don't need the write lock.
{ {
invariant(node->height == 0); paranoid_invariant(node->height == 0);
MSN max_msn_applied = ZERO_MSN;
bool needs_ancestors_messages = false; bool needs_ancestors_messages = false;
for (int i = 0; i < node->n_children; ++i) { // child_to_read may be -1 in test cases
if (BP_STATE(node, i) != PT_AVAIL) { continue; } if (!node->dirty && child_to_read >= 0) {
BASEMENTNODE bn = BLB(node, i); paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds); needs_ancestors_messages = bn_needs_ancestors_messages(
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) { ft,
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) { node,
paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL); child_to_read,
NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum); bounds,
if (bnc->broadcast_list.size() > 0) { ancestors,
needs_ancestors_messages = true; max_msn_in_path
goto cleanup; );
} }
if (!bn->stale_ancestor_messages_applied) { else {
uint32_t stale_lbi, stale_ube; for (int i = 0; i < node->n_children; ++i) {
find_bounds_within_message_tree(&ft->cmp_descriptor, if (BP_STATE(node, i) != PT_AVAIL) { continue; }
ft->compare_fun, needs_ancestors_messages = bn_needs_ancestors_messages(
bnc->stale_message_tree, ft,
bnc->buffer, node,
&curr_bounds, i,
&stale_lbi, bounds,
&stale_ube); ancestors,
if (stale_lbi < stale_ube) { max_msn_in_path
needs_ancestors_messages = true; );
goto cleanup; if (needs_ancestors_messages) {
} goto cleanup;
}
uint32_t fresh_lbi, fresh_ube;
find_bounds_within_message_tree(&ft->cmp_descriptor,
ft->compare_fun,
bnc->fresh_message_tree,
bnc->buffer,
&curr_bounds,
&fresh_lbi,
&fresh_ube);
if (fresh_lbi < fresh_ube) {
needs_ancestors_messages = true;
goto cleanup;
}
if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied.msn) {
max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
}
} }
} }
} }
*max_msn_in_path = max_msn_applied;
cleanup: cleanup:
return needs_ancestors_messages; return needs_ancestors_messages;
} }
void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied) { void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read) {
invariant(node->height == 0); invariant(node->height == 0);
for (int i = 0; i < node->n_children; ++i) { if (!node->dirty && child_to_read >= 0) {
if (BP_STATE(node, i) != PT_AVAIL) { continue; } paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
BASEMENTNODE bn = BLB(node, i); BASEMENTNODE bn = BLB(node, child_to_read);
if (max_msn_applied.msn > bn->max_msn_applied.msn) { if (max_msn_applied.msn > bn->max_msn_applied.msn) {
// This function runs in a shared access context, so to silence tools // see comment below
// like DRD, we use a CAS and ignore the result.
// Any threads trying to update these basement nodes should be
// updating them to the same thing (since they all have a read lock on
// the same root-to-leaf path) so this is safe.
(void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn); (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
} }
} }
else {
for (int i = 0; i < node->n_children; ++i) {
if (BP_STATE(node, i) != PT_AVAIL) { continue; }
BASEMENTNODE bn = BLB(node, i);
if (max_msn_applied.msn > bn->max_msn_applied.msn) {
// This function runs in a shared access context, so to silence tools
// like DRD, we use a CAS and ignore the result.
// Any threads trying to update these basement nodes should be
// updating them to the same thing (since they all have a read lock on
// the same root-to-leaf path) so this is safe.
(void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
}
}
}
} }
struct copy_to_stale_extra { struct copy_to_stale_extra {
...@@ -4779,6 +4876,11 @@ ok: ; ...@@ -4779,6 +4876,11 @@ ok: ;
ftcursor->leaf_info.to_be.omt = bn->buffer; ftcursor->leaf_info.to_be.omt = bn->buffer;
ftcursor->leaf_info.to_be.index = idx; ftcursor->leaf_info.to_be.index = idx;
//
// IMPORTANT: bulk fetch CANNOT go past the current basement node,
// because there is no guarantee that messages have been applied
// to other basement nodes, as part of #5770
//
if (r == TOKUDB_CURSOR_CONTINUE && can_bulk_fetch) { if (r == TOKUDB_CURSOR_CONTINUE && can_bulk_fetch) {
r = ft_cursor_shortcut( r = ft_cursor_shortcut(
ftcursor, ftcursor,
...@@ -4908,7 +5010,7 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F ...@@ -4908,7 +5010,7 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F
BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum); BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
uint32_t fullhash = compute_child_fullhash(brt->ft->cf, node, childnum); uint32_t fullhash = compute_child_fullhash(brt->ft->cf, node, childnum);
FTNODE childnode; FTNODE childnode = nullptr;
// If the current node's height is greater than 1, then its child is an internal node. // If the current node's height is greater than 1, then its child is an internal node.
// Therefore, to warm the cache better (#5798), we want to read all the partitions off disk in one shot. // Therefore, to warm the cache better (#5798), we want to read all the partitions off disk in one shot.
...@@ -4931,7 +5033,6 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F ...@@ -4931,7 +5033,6 @@ ft_search_child(FT_HANDLE brt, FTNODE node, int childnum, ft_search_t *search, F
unlockers, unlockers,
&next_ancestors, bounds, &next_ancestors, bounds,
&bfe, &bfe,
PL_READ, // we try to get a read lock, but we may upgrade to a write lock on a leaf for message application.
true, true,
&childnode, &childnode,
&msgs_applied); &msgs_applied);
...@@ -5090,87 +5191,78 @@ ft_search_node( ...@@ -5090,87 +5191,78 @@ ft_search_node(
// At this point, we must have the necessary partition available to continue the search // At this point, we must have the necessary partition available to continue the search
// //
assert(BP_STATE(node,child_to_search) == PT_AVAIL); assert(BP_STATE(node,child_to_search) == PT_AVAIL);
while (child_to_search >= 0 && child_to_search < node->n_children) { const struct pivot_bounds next_bounds = next_pivot_keys(node, child_to_search, bounds);
// if (node->height > 0) {
// Normally, the child we want to use is available, as we checked r = ft_search_child(
// before entering this while loop. However, if we pass through brt,
// the loop once, getting DB_NOTFOUND for this first value node,
// of child_to_search, we enter the while loop again with a child_to_search,
// child_to_search that may not be in memory. If it is not, search,
// we need to return TOKUDB_TRY_AGAIN so the query can getf,
// read the appropriate partition into memory getf_v,
// doprefetch,
if (BP_STATE(node,child_to_search) != PT_AVAIL) { ftcursor,
return TOKUDB_TRY_AGAIN; unlockers,
} ancestors,
const struct pivot_bounds next_bounds = next_pivot_keys(node, child_to_search, bounds); &next_bounds,
if (node->height > 0) { can_bulk_fetch
r = ft_search_child( );
brt, }
node, else {
child_to_search, r = ft_search_basement_node(
search, BLB(node, child_to_search),
getf, search,
getf_v, getf,
doprefetch, getf_v,
ftcursor, doprefetch,
unlockers, ftcursor,
ancestors, can_bulk_fetch
&next_bounds, );
can_bulk_fetch }
); if (r == 0) {
} return r; //Success
else { }
r = ft_search_basement_node(
BLB(node, child_to_search),
search,
getf,
getf_v,
doprefetch,
ftcursor,
can_bulk_fetch
);
}
if (r == 0) return r; //Success
if (r != DB_NOTFOUND) { if (r != DB_NOTFOUND) {
return r; //Error (or message to quit early, such as TOKUDB_FOUND_BUT_REJECTED or TOKUDB_TRY_AGAIN) return r; //Error (or message to quit early, such as TOKUDB_FOUND_BUT_REJECTED or TOKUDB_TRY_AGAIN)
}
// not really necessary, just put this here so that reading the
// code becomes simpler. The point is at this point in the code,
// we know that we got DB_NOTFOUND and we have to continue
assert(r == DB_NOTFOUND);
// we have a new pivotkey
if (node->height == 0) {
// when we run off the end of a basement, try to lock the range up to the pivot. solves #3529
const DBT *pivot = nullptr;
if (search->direction == FT_SEARCH_LEFT) {
pivot = next_bounds.upper_bound_inclusive; // left -> right
} else {
pivot = next_bounds.lower_bound_exclusive; // right -> left
} }
// not really necessary, just put this here so that reading the if (pivot != nullptr) {
// code becomes simpler. The point is at this point in the code, int rr = getf(pivot->size, pivot->data, 0, nullptr, getf_v, true);
// we know that we got DB_NOTFOUND and we have to continue if (rr != 0) {
assert(r == DB_NOTFOUND); return rr; // lock was not granted
// we have a new pivotkey
if (node->height == 0) {
// when we run off the end of a basement, try to lock the range up to the pivot. solves #3529
const DBT *pivot = NULL;
if (search->direction == FT_SEARCH_LEFT)
pivot = next_bounds.upper_bound_inclusive; // left -> right
else
pivot = next_bounds.lower_bound_exclusive; // right -> left
if (pivot) {
int rr = getf(pivot->size, pivot->data, 0, NULL, getf_v, true);
if (rr != 0)
return rr; // lock was not granted
} }
} }
}
// If we got a DB_NOTFOUND then we have to search the next record. Possibly everything present is not visible. // If we got a DB_NOTFOUND then we have to search the next record. Possibly everything present is not visible.
// This way of doing DB_NOTFOUND is a kludge, and ought to be simplified. Something like this is needed for DB_NEXT, but // This way of doing DB_NOTFOUND is a kludge, and ought to be simplified. Something like this is needed for DB_NEXT, but
// for point queries, it's overkill. If we got a DB_NOTFOUND on a point query then we should just stop looking. // for point queries, it's overkill. If we got a DB_NOTFOUND on a point query then we should just stop looking.
// When releasing locks on I/O we must not search the same subtree again, or we won't be guaranteed to make forward progress. // When releasing locks on I/O we must not search the same subtree again, or we won't be guaranteed to make forward progress.
// If we got a DB_NOTFOUND, then the pivot is too small if searching from left to right (too large if searching from right to left). // If we got a DB_NOTFOUND, then the pivot is too small if searching from left to right (too large if searching from right to left).
// So save the pivot key in the search object. // So save the pivot key in the search object.
maybe_search_save_bound(node, child_to_search, search); maybe_search_save_bound(node, child_to_search, search);
// as part of #5770, if we can continue searching,
// We're about to pin some more nodes, but we thought we were done before. // we MUST return TOKUDB_TRY_AGAIN,
if (search->direction == FT_SEARCH_LEFT) { // because there is no guarantee that messages have been applied
child_to_search++; // on any other path.
} if ((search->direction == FT_SEARCH_LEFT && child_to_search < node->n_children-1) ||
else { (search->direction == FT_SEARCH_RIGHT && child_to_search > 0)) {
child_to_search--; r = TOKUDB_TRY_AGAIN;
}
} }
return r; return r;
} }
...@@ -5775,7 +5867,6 @@ toku_ft_keysrange_internal (FT_HANDLE brt, FTNODE node, ...@@ -5775,7 +5867,6 @@ toku_ft_keysrange_internal (FT_HANDLE brt, FTNODE node,
&next_ancestors, &next_ancestors,
bounds, bounds,
child_may_find_right ? match_bfe : min_bfe, child_may_find_right ? match_bfe : min_bfe,
PL_READ, // may_modify_node is false, because node guaranteed to not change
false, false,
&childnode, &childnode,
&msgs_applied &msgs_applied
...@@ -5986,7 +6077,7 @@ static int get_key_after_bytes_in_child(FT_HANDLE ft_h, FT ft, FTNODE node, UNLO ...@@ -5986,7 +6077,7 @@ static int get_key_after_bytes_in_child(FT_HANDLE ft_h, FT ft, FTNODE node, UNLO
uint32_t fullhash = compute_child_fullhash(ft->cf, node, childnum); uint32_t fullhash = compute_child_fullhash(ft->cf, node, childnum);
FTNODE child; FTNODE child;
bool msgs_applied = false; bool msgs_applied = false;
r = toku_pin_ftnode_batched(ft_h, childblocknum, fullhash, unlockers, &next_ancestors, bounds, bfe, PL_READ, false, &child, &msgs_applied); r = toku_pin_ftnode_batched(ft_h, childblocknum, fullhash, unlockers, &next_ancestors, bounds, bfe, false, &child, &msgs_applied);
paranoid_invariant(!msgs_applied); paranoid_invariant(!msgs_applied);
if (r == TOKUDB_TRY_AGAIN) { if (r == TOKUDB_TRY_AGAIN) {
return r; return r;
......
...@@ -696,7 +696,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) { ...@@ -696,7 +696,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL }; struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
const struct pivot_bounds infinite_bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = NULL }; const struct pivot_bounds infinite_bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = NULL };
bool msgs_applied; bool msgs_applied;
toku_apply_ancestors_messages_to_node(t, child, &ancestors, &infinite_bounds, &msgs_applied); toku_apply_ancestors_messages_to_node(t, child, &ancestors, &infinite_bounds, &msgs_applied, -1);
FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
{ {
...@@ -921,7 +921,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) { ...@@ -921,7 +921,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
.upper_bound_inclusive = toku_clone_dbt(&ubi, childkeys[7]) .upper_bound_inclusive = toku_clone_dbt(&ubi, childkeys[7])
}; };
bool msgs_applied; bool msgs_applied;
toku_apply_ancestors_messages_to_node(t, child, &ancestors, &bounds, &msgs_applied); toku_apply_ancestors_messages_to_node(t, child, &ancestors, &bounds, &msgs_applied, -1);
FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
{ {
...@@ -1104,7 +1104,7 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) { ...@@ -1104,7 +1104,7 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL }; struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
const struct pivot_bounds infinite_bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = NULL }; const struct pivot_bounds infinite_bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = NULL };
bool msgs_applied; bool msgs_applied;
toku_apply_ancestors_messages_to_node(t, child2, &ancestors, &infinite_bounds, &msgs_applied); toku_apply_ancestors_messages_to_node(t, child2, &ancestors, &infinite_bounds, &msgs_applied, -1);
FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh, FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment