From 9208298085e08fcaa86a04e52642ab3437f9bbce Mon Sep 17 00:00:00 2001
From: Leif Walsh <leif@tokutek.com>
Date: Thu, 15 Nov 2012 18:05:23 +0000
Subject: [PATCH] closes #5671 don't assert in a weird case with left-heavy
 uneven splits, beef up ybt usage

git-svn-id: file:///svn/toku/tokudb@50009 c7de825b-a66e-492c-adef-691d508d4ae1
---
 ft/ft-flusher.cc | 71 ++++++++++++++++++++++++------------------------
 ft/ybt.cc        |  8 ++++++
 ft/ybt.h         |  2 ++
 3 files changed, 46 insertions(+), 35 deletions(-)

diff --git a/ft/ft-flusher.cc b/ft/ft-flusher.cc
index 81801cdaec3..a818a1078c0 100644
--- a/ft/ft-flusher.cc
+++ b/ft/ft-flusher.cc
@@ -424,7 +424,7 @@ ct_maybe_merge_child(struct flusher_advice *fa,
 
         (void) toku_sync_fetch_and_sub(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
 
-        toku_free(ctme.target_key.data);
+        toku_destroy_dbt(&ctme.target_key);
     }
 }
 
@@ -538,10 +538,10 @@ handle_split_of_child(
     // Slide the keys over
     {
         for (cnum=node->n_children-2; cnum>childnum; cnum--) {
-            toku_copyref_dbt(&node->childkeys[cnum], node->childkeys[cnum-1]);
+            toku_copy_dbt(&node->childkeys[cnum], node->childkeys[cnum-1]);
         }
         //if (logger) assert((t->flags&TOKU_DB_DUPSORT)==0); // the setpivot is wrong for TOKU_DB_DUPSORT, so recovery will be broken.
-        toku_copyref_dbt(&node->childkeys[childnum], *splitk);
+        toku_copy_dbt(&node->childkeys[childnum], *splitk);
         node->totalchildkeylens += splitk->size;
     }
 
@@ -627,7 +627,6 @@ ftleaf_get_split_loc(
         if (*num_left_les == 0) {
             *num_left_bns = node->n_children - 1;
             *num_left_les = toku_omt_size(BLB_BUFFER(node, *num_left_bns - 1));
-            invariant(*num_left_les > 0);
         }
         goto exit;
     }
@@ -879,8 +878,6 @@ ftleaf_split(
             BP_STATE(B, curr_dest_bn_index) = PT_AVAIL;
         }
 
-        node->n_children = num_children_in_node;
-
         //
         // now handle the pivots
         //
@@ -890,30 +887,32 @@ ftleaf_split(
         int base_index = num_left_bns - (split_on_boundary ? 0 : 1);
         // make pivots in B
         for (int i=0; i < num_children_in_b-1; i++) {
-            toku_copyref_dbt(&B->childkeys[i], node->childkeys[i+base_index]);
+            toku_copy_dbt(&B->childkeys[i], node->childkeys[i+base_index]);
             B->totalchildkeylens += node->childkeys[i+base_index].size;
             node->totalchildkeylens -= node->childkeys[i+base_index].size;
             toku_init_dbt(&node->childkeys[i+base_index]);
         }
-        if (split_on_boundary && split_mode != SPLIT_LEFT_HEAVY) {
-            // destroy the extra childkey between the nodes, we'll
-            // recreate it in splitk below
-            toku_free(node->childkeys[num_left_bns - 1].data);
+        if (split_on_boundary && num_left_bns < node->n_children) {
+            if (splitk) {
+                toku_copy_dbt(splitk, node->childkeys[num_left_bns - 1]);
+            } else {
+                toku_destroy_dbt(&node->childkeys[num_left_bns - 1]);
+            }
+        } else if (splitk) {
+            OMTVALUE lev;
+            OMT buffer = BLB_BUFFER(node, num_left_bns - 1);
+            int r = toku_omt_fetch(buffer, toku_omt_size(buffer) - 1, &lev);
+            assert_zero(r); // that fetch should have worked.
+            LEAFENTRY CAST_FROM_VOIDP(le, lev);
+            uint32_t keylen;
+            void *key = le_key_and_len(le, &keylen);
+            toku_fill_dbt(splitk, toku_xmemdup(key, keylen), keylen);
+            splitk->flags = DB_DBT_MALLOC;
         }
+
+        node->n_children = num_children_in_node;
         REALLOC_N(num_children_in_node, node->bp);
         REALLOC_N(num_children_in_node-1, node->childkeys);
-
-    }
-    if (splitk) {
-        memset(splitk, 0, sizeof *splitk);
-        OMTVALUE lev;
-        OMT buffer = BLB_BUFFER(node, num_left_bns - 1);
-        int r = toku_omt_fetch(buffer, toku_omt_size(buffer) - 1, &lev);
-        assert_zero(r); // that fetch should have worked.
-        LEAFENTRY CAST_FROM_VOIDP(le, lev);
-        uint32_t keylen;
-        void *key = le_key_and_len(le, &keylen);
-        toku_fill_dbt(splitk, toku_xmemdup(key, keylen), keylen);
     }
 
     verify_all_in_mempool(node);
@@ -974,7 +973,7 @@ ft_nonleaf_split(
             {
                 paranoid_invariant(i>0);
                 if (i>n_children_in_a) {
-                    toku_copyref_dbt(&B->childkeys[targchild-1], node->childkeys[i-1]);
+                    toku_copy_dbt(&B->childkeys[targchild-1], node->childkeys[i-1]);
                     B->totalchildkeylens += node->childkeys[i-1].size;
                     node->totalchildkeylens -= node->childkeys[i-1].size;
                     toku_init_dbt(&node->childkeys[i-1]);
@@ -984,7 +983,7 @@ ft_nonleaf_split(
 
         node->n_children=n_children_in_a;
 
-        toku_copyref_dbt(splitk, node->childkeys[n_children_in_a-1]);
+        toku_copy_dbt(splitk, node->childkeys[n_children_in_a-1]);
         node->totalchildkeylens -= node->childkeys[n_children_in_a-1].size;
 
         REALLOC_N(n_children_in_a,   node->bp);
@@ -1163,7 +1162,7 @@ merge_leaf_nodes(FTNODE a, FTNODE b)
         a->bp[i+offset] = b->bp[i];
         memset(&b->bp[i],0,sizeof(b->bp[0]));
         if (i < (b->n_children-1)) {
-            toku_copyref_dbt(&a->childkeys[i+offset], b->childkeys[i]);
+            toku_copy_dbt(&a->childkeys[i+offset], b->childkeys[i]);
             toku_init_dbt(&b->childkeys[i]);
         }
     }
@@ -1219,15 +1218,15 @@ maybe_merge_pinned_leaf_nodes(
             return;
         }
         // one is less than 1/4 of a node, and together they are more than 3/4 of a node.
-        toku_free(parent_splitk->data); // We don't need the parent_splitk any more. If we need a splitk (if we don't merge) we'll malloc a new one.
+        toku_destroy_dbt(parent_splitk); // We don't need the parent_splitk any more. If we need a splitk (if we don't merge) we'll malloc a new one.
         *did_rebalance = true;
-         balance_leaf_nodes(a, b, splitk);
+        balance_leaf_nodes(a, b, splitk);
     } else {
         // we are merging them.
         *did_merge = true;
         *did_rebalance = false;
         toku_init_dbt(splitk);
-        toku_free(parent_splitk->data); // if we are merging, the splitk gets freed.
+        toku_destroy_dbt(parent_splitk); // if we are merging, the splitk gets freed.
         merge_leaf_nodes(a, b);
     }
 }
@@ -1253,11 +1252,13 @@ maybe_merge_pinned_nonleaf_nodes(
     memset(b->bp,0,b->n_children*sizeof(b->bp[0]));
 
     XREALLOC_N(new_n_children-1, a->childkeys);
-    toku_copyref_dbt(&a->childkeys[old_n_children-1], *parent_splitk);
-    memcpy(a->childkeys + old_n_children,
-           b->childkeys,
-           (b->n_children-1)*sizeof(b->childkeys[0]));
-    a->totalchildkeylens += b->totalchildkeylens + parent_splitk->size;
+    toku_copy_dbt(&a->childkeys[old_n_children-1], *parent_splitk);
+    a->totalchildkeylens += parent_splitk->size;
+    for (int i = 0; i < b->n_children; ++i) {
+        toku_copy_dbt(&a->childkeys[old_n_children + i], b->childkeys[i]);
+        a->totalchildkeylens += b->childkeys[i].size;
+        toku_init_dbt(&b->childkeys[i]);
+    }
     a->n_children = new_n_children;
 
     b->totalchildkeylens = 0;
@@ -1449,7 +1450,7 @@ ft_merge_child(
             // pretty far down the tree)
 
             // If we didn't merge the nodes, then we need the correct pivot.
-            toku_copyref_dbt(&node->childkeys[childnuma], splitk);
+            toku_copy_dbt(&node->childkeys[childnuma], splitk);
             node->totalchildkeylens += node->childkeys[childnuma].size;
             node->dirty = 1;
         }
diff --git a/ft/ybt.cc b/ft/ybt.cc
index 7596fe91ed5..81d00840f71 100644
--- a/ft/ybt.cc
+++ b/ft/ybt.cc
@@ -52,6 +52,14 @@ DBT *toku_copyref_dbt(DBT *dst, const DBT src) {
     return dst;
 }
 
+DBT *toku_copy_dbt(DBT *dst, const DBT &src) {
+    dst->flags = src.flags;
+    dst->ulen = src.ulen;
+    dst->size = src.size;
+    dst->data = src.data;
+    return dst;
+}
+
 DBT *toku_clone_dbt(DBT *dst, const DBT &src) {
     dst->flags = DB_DBT_MALLOC;
     dst->ulen = 0;
diff --git a/ft/ybt.h b/ft/ybt.h
index eb8ae798e85..280296cda03 100644
--- a/ft/ybt.h
+++ b/ft/ybt.h
@@ -25,6 +25,8 @@ DBT *toku_fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
 
 DBT *toku_copyref_dbt(DBT *dst, const DBT src);
 
+DBT *toku_copy_dbt(DBT *dst, const DBT &src);
+
 DBT *toku_clone_dbt(DBT *dst, const DBT &src);
 
 int toku_dbt_set(ITEMLEN len, bytevec val, DBT *d, struct simple_dbt *sdbt);
-- 
2.30.9