Commit d2f1a06a authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

{t:4160] merging to mainline

git-svn-id: file:///svn/toku/tokudb@37230 c7de825b-a66e-492c-adef-691d508d4ae1
parent f77aadb5
...@@ -249,7 +249,7 @@ static void brtnode_put_cmd ( ...@@ -249,7 +249,7 @@ static void brtnode_put_cmd (
static void static void
flush_this_child (struct brt_header* h, BRTNODE node, BRTNODE child, int childnum); flush_this_child (struct brt_header* h, BRTNODE node, BRTNODE child, int childnum, bool started_at_root);
static void brt_verify_flags(BRT brt, BRTNODE node) { static void brt_verify_flags(BRT brt, BRTNODE node) {
assert(brt->flags == node->flags); assert(brt->flags == node->flags);
...@@ -873,7 +873,33 @@ int toku_brtnode_pe_callback (void *brtnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATT ...@@ -873,7 +873,33 @@ int toku_brtnode_pe_callback (void *brtnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATT
} }
static void flush_some_child(struct brt_header* h, BRTNODE node, int *n_dirtied, int cascades, bool suppress_leaf_merge); // The parameter "started_at_root" is needed to resolve #4147 and #4160,
// which are subtle interactions of background flushing (cleaner and
// flusher threads) and MSN logic.
//
// When we rebalance basement nodes to write out a leaf, we can't have two
// basement nodes with different max_msn_applieds. When we flush to a
// basement node, it may have stale ancestors' messages applied already.
//
// If we've flushed everything down from the root recursively, then there
// is no problem. Anything that was applied to the leaf node by a query
// already must be in the batch of stuff we're flushing, so it's okay to
// do whatever we want, the MSNs will be consistent.
//
// But if we started somewhere in the middle (as a cleaner thread does),
// then we might not have all the messages that were applied to the leaf,
// and some basement nodes may be in a different state than others. So
// before we flush to it, we have to destroy and re-read (off disk) the
// basement nodes which have messages applied. Similarly, if a flush
// started in the middle wants to merge two leaf nodes, we can't do that
// because we might create a leaf node in a bad state.
//
// We use "started_at_root" to decide what to do about this problem in
// code further down. For now, anything started by the cleaner thread
// will have started_at_root==false and anything started by the flusher
// thread will have started_at_root==true, but future mechanisms need to
// be mindful of this issue.
static void flush_some_child(struct brt_header* h, BRTNODE node, int *n_dirtied, int cascades, bool started_at_root);
static void bring_node_fully_into_memory(BRTNODE node, struct brt_header *h); static void bring_node_fully_into_memory(BRTNODE node, struct brt_header *h);
// TODO 3988 Leif set cleaner_nodes_dirtied // TODO 3988 Leif set cleaner_nodes_dirtied
...@@ -928,7 +954,7 @@ toku_brtnode_cleaner_callback(void *brtnode_pv, BLOCKNUM blocknum, u_int32_t ful ...@@ -928,7 +954,7 @@ toku_brtnode_cleaner_callback(void *brtnode_pv, BLOCKNUM blocknum, u_int32_t ful
// Either flush_some_child will unlock the node, or we do it here. // Either flush_some_child will unlock the node, or we do it here.
if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) { if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) {
int n_dirtied = 0; int n_dirtied = 0;
flush_some_child(h, node, &n_dirtied, 0, true); flush_some_child(h, node, &n_dirtied, 0, false);
brt_status.cleaner_nodes_dirtied += n_dirtied; brt_status.cleaner_nodes_dirtied += n_dirtied;
} else { } else {
toku_unpin_brtnode_off_client_thread(h, node); toku_unpin_brtnode_off_client_thread(h, node);
...@@ -1978,7 +2004,7 @@ static void call_flusher_thread_callback(int ft_state) { ...@@ -1978,7 +2004,7 @@ static void call_flusher_thread_callback(int ft_state) {
// - possibly flush either new children created from split, otherwise unlock children // - possibly flush either new children created from split, otherwise unlock children
// //
static void static void
brt_split_child (struct brt_header* h, BRTNODE node, int childnum, BRTNODE child, bool suppress_leaf_merge) brt_split_child (struct brt_header* h, BRTNODE node, int childnum, BRTNODE child, bool started_at_root)
{ {
assert(node->height>0); assert(node->height>0);
assert(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty assert(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty
...@@ -2011,11 +2037,11 @@ brt_split_child (struct brt_header* h, BRTNODE node, int childnum, BRTNODE child ...@@ -2011,11 +2037,11 @@ brt_split_child (struct brt_header* h, BRTNODE node, int childnum, BRTNODE child
toku_unpin_brtnode_off_client_thread(h, node); toku_unpin_brtnode_off_client_thread(h, node);
if (nodea->height > 0 && nonleaf_node_is_gorged(nodea)) { if (nodea->height > 0 && nonleaf_node_is_gorged(nodea)) {
toku_unpin_brtnode_off_client_thread(h, nodeb); toku_unpin_brtnode_off_client_thread(h, nodeb);
flush_some_child(h, nodea, NULL, 0, suppress_leaf_merge); flush_some_child(h, nodea, NULL, 0, started_at_root);
} }
else if (nodeb->height > 0 && nonleaf_node_is_gorged(nodeb)) { else if (nodeb->height > 0 && nonleaf_node_is_gorged(nodeb)) {
toku_unpin_brtnode_off_client_thread(h, nodea); toku_unpin_brtnode_off_client_thread(h, nodea);
flush_some_child(h, nodeb, NULL, 0, suppress_leaf_merge); flush_some_child(h, nodeb, NULL, 0, started_at_root);
} }
else { else {
toku_unpin_brtnode_off_client_thread(h, nodea); toku_unpin_brtnode_off_client_thread(h, nodea);
...@@ -2899,7 +2925,7 @@ maybe_merge_pinned_nodes (BRTNODE parent, struct kv_pair *parent_splitk, ...@@ -2899,7 +2925,7 @@ maybe_merge_pinned_nodes (BRTNODE parent, struct kv_pair *parent_splitk,
// As output, two of node's children are merged or rebalanced, and node is unlocked // As output, two of node's children are merged or rebalanced, and node is unlocked
// //
static void static void
brt_merge_child (struct brt_header* h, BRTNODE node, int childnum_to_merge, BOOL *did_react, bool suppress_leaf_merge) brt_merge_child (struct brt_header* h, BRTNODE node, int childnum_to_merge, BOOL *did_react, bool started_at_root)
{ {
if (node->n_children < 2) { if (node->n_children < 2) {
toku_unpin_brtnode_off_client_thread(h, node); toku_unpin_brtnode_off_client_thread(h, node);
...@@ -2944,10 +2970,10 @@ brt_merge_child (struct brt_header* h, BRTNODE node, int childnum_to_merge, BOOL ...@@ -2944,10 +2970,10 @@ brt_merge_child (struct brt_header* h, BRTNODE node, int childnum_to_merge, BOOL
} }
if (toku_bnc_n_entries(BNC(node,childnuma))>0) { if (toku_bnc_n_entries(BNC(node,childnuma))>0) {
flush_this_child(h, node, childa, childnuma); flush_this_child(h, node, childa, childnuma, started_at_root);
} }
if (toku_bnc_n_entries(BNC(node,childnumb))>0) { if (toku_bnc_n_entries(BNC(node,childnumb))>0) {
flush_this_child(h, node, childb, childnumb); flush_this_child(h, node, childb, childnumb, started_at_root);
} }
...@@ -3030,7 +3056,7 @@ brt_merge_child (struct brt_header* h, BRTNODE node, int childnum_to_merge, BOOL ...@@ -3030,7 +3056,7 @@ brt_merge_child (struct brt_header* h, BRTNODE node, int childnum_to_merge, BOOL
toku_unpin_brtnode_off_client_thread(h, childb); toku_unpin_brtnode_off_client_thread(h, childb);
} }
if (childa->height > 0 && nonleaf_node_is_gorged(childa)) { if (childa->height > 0 && nonleaf_node_is_gorged(childa)) {
flush_some_child(h, childa, NULL, 0, suppress_leaf_merge); flush_some_child(h, childa, NULL, 0, started_at_root);
} }
else { else {
toku_unpin_brtnode_off_client_thread(h, childa); toku_unpin_brtnode_off_client_thread(h, childa);
...@@ -3228,7 +3254,7 @@ update_flush_status(BRTNODE UU(parent), BRTNODE child, int cascades) ...@@ -3228,7 +3254,7 @@ update_flush_status(BRTNODE UU(parent), BRTNODE child, int cascades)
} }
static void static void
flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int cascades, bool suppress_leaf_merge) flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int cascades, bool started_at_root)
// Effect: This function does the following: // Effect: This function does the following:
// - Pick a child of parent (the heaviest child), // - Pick a child of parent (the heaviest child),
// - flush from parent to child, // - flush from parent to child,
...@@ -3273,7 +3299,9 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc ...@@ -3273,7 +3299,9 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc
// for test // for test
call_flusher_thread_callback(ft_flush_after_child_pin); call_flusher_thread_callback(ft_flush_after_child_pin);
if (!started_at_root) {
maybe_destroy_child_blbs(parent, child); maybe_destroy_child_blbs(parent, child);
}
//Note that at this point, we don't have the entire child in. //Note that at this point, we don't have the entire child in.
// Let's do a quick check to see if the child may be reactive // Let's do a quick check to see if the child may be reactive
...@@ -3337,7 +3365,7 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc ...@@ -3337,7 +3365,7 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc
// it is possible that the flush got rid of some values // it is possible that the flush got rid of some values
// and now the parent is no longer reactive // and now the parent is no longer reactive
child_re = get_node_reactivity(child); child_re = get_node_reactivity(child);
if (suppress_leaf_merge && child->height == 0 && child_re == RE_FUSIBLE) { if (!started_at_root && child->height == 0 && child_re == RE_FUSIBLE) {
// prevent merging leaf nodes, sometimes (when the cleaner thread // prevent merging leaf nodes, sometimes (when the cleaner thread
// called us) // called us)
child_re = RE_STABLE; child_re = RE_STABLE;
...@@ -3354,7 +3382,7 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc ...@@ -3354,7 +3382,7 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc
// it is the responsibility of flush_some_child to unpin parent // it is the responsibility of flush_some_child to unpin parent
// //
if (child->height > 0 && nonleaf_node_is_gorged(child)) { if (child->height > 0 && nonleaf_node_is_gorged(child)) {
flush_some_child(h, child, n_dirtied, cascades+1, suppress_leaf_merge); flush_some_child(h, child, n_dirtied, cascades+1, started_at_root);
} }
else { else {
toku_unpin_brtnode_off_client_thread(h, child); toku_unpin_brtnode_off_client_thread(h, child);
...@@ -3365,7 +3393,7 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc ...@@ -3365,7 +3393,7 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc
// it is responsibility of brt_split_child to unlock nodes // it is responsibility of brt_split_child to unlock nodes
// of parent and child as it sees fit // of parent and child as it sees fit
// //
brt_split_child(h, parent, childnum, child, suppress_leaf_merge); brt_split_child(h, parent, childnum, child, started_at_root);
} }
else if (child_re == RE_FUSIBLE) { else if (child_re == RE_FUSIBLE) {
BOOL did_react; BOOL did_react;
...@@ -3380,7 +3408,7 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc ...@@ -3380,7 +3408,7 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc
// //
// it is responsibility of brt_merge_child to unlock parent // it is responsibility of brt_merge_child to unlock parent
// //
brt_merge_child(h, parent, childnum, &did_react, suppress_leaf_merge); brt_merge_child(h, parent, childnum, &did_react, started_at_root);
} }
else { else {
assert(FALSE); assert(FALSE);
...@@ -3388,13 +3416,15 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc ...@@ -3388,13 +3416,15 @@ flush_some_child (struct brt_header* h, BRTNODE parent, int *n_dirtied, int casc
} }
static void static void
flush_this_child (struct brt_header* h, BRTNODE node, BRTNODE child, int childnum) flush_this_child (struct brt_header* h, BRTNODE node, BRTNODE child, int childnum, bool started_at_root)
// Effect: Push everything in the CHILDNUMth buffer of node down into the child. // Effect: Push everything in the CHILDNUMth buffer of node down into the child.
{ {
update_flush_status(node, child, 0); update_flush_status(node, child, 0);
int r; int r;
toku_assert_entire_node_in_memory(node); toku_assert_entire_node_in_memory(node);
if (!started_at_root) {
maybe_destroy_child_blbs(node, child); maybe_destroy_child_blbs(node, child);
}
bring_node_fully_into_memory(child, h); bring_node_fully_into_memory(child, h);
toku_assert_entire_node_in_memory(child); toku_assert_entire_node_in_memory(child);
assert(node->height>0); assert(node->height>0);
...@@ -3673,7 +3703,7 @@ static void flush_node_fun(void *fe_v) ...@@ -3673,7 +3703,7 @@ static void flush_node_fun(void *fe_v)
// of flush_some_child to unlock the node // of flush_some_child to unlock the node
// otherwise, we unlock the node here. // otherwise, we unlock the node here.
if (fe->node->height > 0 && nonleaf_node_is_gorged(fe->node)) { if (fe->node->height > 0 && nonleaf_node_is_gorged(fe->node)) {
flush_some_child(fe->h, fe->node, NULL, 0, false); flush_some_child(fe->h, fe->node, NULL, 0, true);
} }
else { else {
toku_unpin_brtnode_off_client_thread(fe->h,fe->node); toku_unpin_brtnode_off_client_thread(fe->h,fe->node);
...@@ -3684,7 +3714,7 @@ static void flush_node_fun(void *fe_v) ...@@ -3684,7 +3714,7 @@ static void flush_node_fun(void *fe_v)
// bnc, which means we are tasked with flushing some // bnc, which means we are tasked with flushing some
// buffer in the node. // buffer in the node.
// It is the responsibility of flush_some_child to unlock the node // It is the responsibility of flush_some_child to unlock the node
flush_some_child(fe->h, fe->node, NULL, 0, false); flush_some_child(fe->h, fe->node, NULL, 0, true);
} }
remove_background_job(fe->h->cf, false); remove_background_job(fe->h->cf, false);
toku_free(fe); toku_free(fe);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment