Commit 34abc6f4 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:3884] fixed the problem in brtleaf_split, added back the assert in...

[t:3884] fixed the problem in brtleaf_split, added back the assert in move_leafentries, and added a test (test3884.c).  this required exporting brtleaf_split in brt-internal.h

git-svn-id: file:///svn/toku/tokudb@34127 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1b1c3dac
......@@ -759,6 +759,9 @@ typedef struct brt_status {
void toku_brt_get_status(BRT_STATUS);
void
brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, BOOL create_new_node);
void
brt_leaf_apply_cmd_once (
BASEMENTNODE bn,
......
......@@ -1217,6 +1217,7 @@ move_leafentries(
)
//Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt
{
assert(lbi < ube);
OMTVALUE *MALLOC_N(ube-lbi, new_le);
u_int32_t i = 0;
*num_bytes_moved = 0;
......@@ -1245,7 +1246,7 @@ move_leafentries(
}
}
static void
void
brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, BOOL create_new_node)
// Effect: Split a leaf node.
{
......@@ -1279,6 +1280,8 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
&split_at_in_node
);
}
// did we split right on the boundary between basement nodes?
BOOL split_on_boundary = (split_at_in_node == ((int) toku_omt_size(BLB_BUFFER(node, split_node)) - 1));
// Now we know where we are going to break it
// the two nodes will have a total of n_children+1 basement nodes
// and n_children-1 pivots
......@@ -1289,7 +1292,7 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
//set up the basement nodes in the new node
int num_children_in_node = split_node + 1;
int num_children_in_b = node->n_children - split_node;
int num_children_in_b = node->n_children - split_node - (split_on_boundary ? 1 : 0);
if (create_new_node) {
toku_create_new_brtnode(
t,
......@@ -1317,33 +1320,39 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
// first move all the data
//
// handle the move of a subset of data in split_node from node to B
int curr_src_bn_index = split_node;
int curr_dest_bn_index = 0;
BP_STATE(B,0) = PT_AVAIL;
// handle the move of a subset of data in split_node from node to B
if (!split_on_boundary) {
BP_STATE(B,curr_dest_bn_index) = PT_AVAIL;
struct subtree_estimates se_diff = zero_estimates;
u_int32_t diff_size = 0;
destroy_basement_node (BLB(B, 0)); // Destroy B's empty OMT, so I can rebuild it from an array
set_BNULL(B, 0);
set_BLB(B, 0, toku_create_empty_bn_no_buffer());
destroy_basement_node (BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array
set_BNULL(B, curr_dest_bn_index);
set_BLB(B, curr_dest_bn_index, toku_create_empty_bn_no_buffer());
move_leafentries(
&BLB_BUFFER(B, 0),
BLB_BUFFER(node, split_node),
&BLB_BUFFER(B, curr_dest_bn_index),
BLB_BUFFER(node, curr_src_bn_index),
split_at_in_node+1,
toku_omt_size(BLB_BUFFER(node, split_node)),
toku_omt_size(BLB_BUFFER(node, curr_src_bn_index)),
&se_diff,
&diff_size
);
BLB_NBYTESINBUF(node, split_node) -= diff_size;
BLB_NBYTESINBUF(B, 0) += diff_size;
subtract_estimates(&BP_SUBTREE_EST(node,split_node), &se_diff);
add_estimates(&BP_SUBTREE_EST(B,0), &se_diff);
BLB_NBYTESINBUF(node, curr_src_bn_index) -= diff_size;
BLB_NBYTESINBUF(B, curr_dest_bn_index) += diff_size;
subtract_estimates(&BP_SUBTREE_EST(node,curr_src_bn_index), &se_diff);
add_estimates(&BP_SUBTREE_EST(B,curr_dest_bn_index), &se_diff);
curr_dest_bn_index++;
} else {
curr_src_bn_index++;
}
// move the rest of the basement nodes
int curr_dest_bn_index = 1;
for (int i = num_children_in_node; i < node->n_children; i++, curr_dest_bn_index++) {
for ( ; curr_src_bn_index < node->n_children; curr_src_bn_index++, curr_dest_bn_index++) {
destroy_basement_node(BLB(B, curr_dest_bn_index));
set_BNULL(B, curr_dest_bn_index);
B->bp[curr_dest_bn_index] = node->bp[i];
B->bp[curr_dest_bn_index] = node->bp[curr_src_bn_index];
}
node->n_children = num_children_in_node;
......@@ -1351,12 +1360,15 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
// now handle the pivots
//
// the child index in the original node that corresponds to the
// first node in the right node of the split
int base_index = (split_on_boundary ? split_node + 1 : split_node);
// make pivots in B
for (int i=0; i < num_children_in_b-1; i++) {
B->childkeys[i] = node->childkeys[i+split_node];
B->totalchildkeylens += toku_brt_pivot_key_len(node->childkeys[i+split_node]);
node->totalchildkeylens -= toku_brt_pivot_key_len(node->childkeys[i+split_node]);
node->childkeys[i+split_node] = NULL;
B->childkeys[i] = node->childkeys[i+base_index];
B->totalchildkeylens += toku_brt_pivot_key_len(node->childkeys[i+base_index]);
node->totalchildkeylens -= toku_brt_pivot_key_len(node->childkeys[i+base_index]);
node->childkeys[i+base_index] = NULL;
}
REALLOC_N(num_children_in_node, node->bp);
REALLOC_N(num_children_in_node-1, node->childkeys);
......@@ -1375,8 +1387,8 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
splitk->flags=0;
}
node->max_msn_applied_to_node_on_disk= max_msn_applied_to_node;
B ->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
node->dirty = 1;
B->dirty = 1;
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: test3856.c 33984 2011-08-17 03:03:54Z leifwalsh $"
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
// it used to be the case that we copied the left and right keys of a
// range to be prelocked but never freed them, this test checks that they
// are freed (as of this time, this happens in destroy_bfe_for_prefetch)
#include "test.h"
#include "includes.h"
static TOKUTXN const null_txn = 0;
static DB * const null_db = 0;
static const char fname[]= __FILE__ ".brt";
static int omt_long_cmp(OMTVALUE p, void *q)
{
LEAFENTRY a = p, b = q;
void *ak, *bk;
u_int32_t al, bl;
ak = le_key_and_len(a, &al);
bk = le_key_and_len(b, &bl);
assert(al == sizeof(long) && bl == sizeof(long));
long *ai = (long *) ak;
long *bi = (long *) bk;
return (*ai > *bi) - (*ai < *bi);
}
static LEAFENTRY
le_fastmalloc(char *key, int keylen, char *val, int vallen)
{
LEAFENTRY r = toku_malloc(sizeof(r->type) + sizeof(r->keylen) + sizeof(r->u.clean.vallen) +
keylen + vallen);
resource_assert(r);
r->type = LE_CLEAN;
r->keylen = keylen;
r->u.clean.vallen = vallen;
memcpy(&r->u.clean.key_val[0], key, keylen);
memcpy(&r->u.clean.key_val[keylen], val, vallen);
return r;
}
int
test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute__((__unused__))) {
toku_memory_check = 1;
const int nodesize = 1024, eltsize = 64, bnsize = 256;
const int keylen = sizeof(long), vallen = eltsize - keylen - (sizeof(((LEAFENTRY)NULL)->type) // overhead from LE_CLEAN_MEMSIZE
+sizeof(((LEAFENTRY)NULL)->keylen)
+sizeof(((LEAFENTRY)NULL)->u.clean.vallen));
const int eltsperbn = bnsize / eltsize;
struct brtnode sn;
int fd = open(__FILE__ ".brt", O_RDWR|O_CREAT|O_BINARY, S_IRWXU|S_IRWXG|S_IRWXO); assert(fd >= 0);
int r;
sn.max_msn_applied_to_node_on_disk.msn = 0;
sn.nodesize = nodesize;
sn.flags = 0x11223344;
sn.thisnodename.b = 20;
sn.layout_version = BRT_LAYOUT_VERSION;
sn.layout_version_original = BRT_LAYOUT_VERSION;
sn.height = 0;
const int nelts = 2 * nodesize / eltsize;
sn.n_children = nelts * eltsize / bnsize;
sn.dirty = 1;
LEAFENTRY elts[nelts];
MALLOC_N(sn.n_children, sn.bp);
MALLOC_N(sn.n_children - 1, sn.childkeys);
sn.totalchildkeylens = 0;
for (int bn = 0; bn < sn.n_children; ++bn) {
BP_SUBTREE_EST(&sn,bn).ndata = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,bn).nkeys = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,bn).dsize = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,bn).exact = (BOOL)(random()%2 != 0);
BP_STATE(&sn,bn) = PT_AVAIL;
set_BLB(&sn, bn, toku_create_empty_bn());
BLB_NBYTESINBUF(&sn,bn) = 0;
BLB_OPTIMIZEDFORUPGRADE(&sn, bn) = BRT_LAYOUT_VERSION;
long k;
for (int i = 0; i < eltsperbn; ++i) {
k = bn * eltsperbn + i;
char val[vallen];
memset(val, k, sizeof val);
elts[k] = le_fastmalloc((char *) &k, keylen, val, vallen);
r = toku_omt_insert(BLB_BUFFER(&sn, bn), elts[k], omt_long_cmp, elts[k], NULL); assert(r == 0);
BLB_NBYTESINBUF(&sn, bn) += OMT_ITEM_OVERHEAD + leafentry_disksize(elts[k]);
}
if (bn < sn.n_children - 1) {
sn.childkeys[bn] = kv_pair_malloc(&k, sizeof k, 0, 0);
sn.totalchildkeylens += (sizeof k);
}
}
CACHETABLE ct;
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
BRT brt;
r = toku_open_brt(fname, 1, &brt, nodesize, bnsize, ct, null_txn, toku_builtin_compare_fun, null_db); assert(r==0);
BRTNODE nodea, nodeb;
DBT splitk;
// if we haven't done it right, we should hit the assert in the top of move_leafentries
brtleaf_split(brt, &sn, &nodea, &nodeb, &splitk, TRUE);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment