Commit cc90498e authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

More surgery. The newbrt.o file now builds. Addresess #1195.

git-svn-id: file:///svn/toku/tokudb.1195@7273 c7de825b-a66e-492c-adef-691d508d4ae1
parent 96baa98d
......@@ -248,145 +248,6 @@ int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logg
return 0;
}
static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v, int type, TXNID xid) {
unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + k->size + v->size;
int r = toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, xid);
if (r!=0) return r;
// printf("%s:%d fingerprint %08x -> ", __FILE__, __LINE__, node->local_fingerprint);
node->local_fingerprint += node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, k->data, k->size, v->data, v->size);
// printf(" %08x\n", node->local_fingerprint);
BNC_NBYTESINBUF(node,childnum) += n_bytes_added;
node->u.n.n_bytes_in_buffers += n_bytes_added;
node->dirty = 1;
return 0;
}
static int fill_buf (OMTVALUE lev, u_int32_t idx, void *varray) {
LEAFENTRY le=lev;
LEAFENTRY *array=varray;
array[idx]=le;
return 0;
}
static int
brtleaf_split (TOKULOGGER logger, FILENUM filenum, BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk)
// Effect: Split a leaf node.
{
BRTNODE B;
int r;
//printf("%s:%d splitting leaf %" PRIu64 " which is size %u (targetsize = %u)\", __FILE__, __LINE__, node->thisnodename.b, toku_serialize_brtnode_size(node), node->nodesize);
assert(node->height==0);
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
toku_create_new_brtnode(t, &B, 0, logger);
assert(B->nodesize>0);
assert(node->nodesize>0);
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
//printf("%s:%d B is at %lld nodesize=%d\n", __FILE__, __LINE__, B->thisnodename, B->nodesize);
assert(node->height>0 || node->u.l.buffer!=0);
toku_verify_all_in_mempool(node);
u_int32_t n_leafentries = toku_omt_size(node->u.l.buffer);
u_int32_t break_at = 0;
node->u.l.seqinsert = 0;
// Don't mess around with splitting specially for sequential insertions any more.
{
OMTVALUE *MALLOC_N(n_leafentries, leafentries);
assert(leafentries);
toku_omt_iterate(node->u.l.buffer, fill_buf, leafentries);
break_at = 0;
{
u_int32_t i;
u_int32_t sumlesizes=0;
for (i=0; i<n_leafentries; i++) sumlesizes += leafentry_disksize(leafentries[i]);
u_int32_t sumsofar=0;
for (i=0; i<n_leafentries; i++) {
assert(toku_mempool_inrange(&node->u.l.buffer_mempool, leafentries[i], leafentry_memsize(leafentries[i])));
sumsofar += leafentry_disksize(leafentries[i]);
if (sumsofar*2 >= sumlesizes) {
break_at = i;
break;
}
}
}
// Now we know where we are going to break it
OMT old_omt = node->u.l.buffer;
toku_omt_destroy(&B->u.l.buffer); // Destroy B's empty OMT, so I can rebuild it from an array
{
u_int32_t i;
u_int32_t diff_fp = 0;
u_int32_t diff_size = 0;
for (i=break_at; i<n_leafentries; i++) {
LEAFENTRY oldle = leafentries[i];
LEAFENTRY newle = toku_mempool_malloc(&B->u.l.buffer_mempool, leafentry_memsize(oldle), 1);
assert(newle!=0); // it's a fresh mpool, so this should always work.
diff_fp += toku_le_crc(oldle);
diff_size += OMT_ITEM_OVERHEAD + leafentry_disksize(oldle);
memcpy(newle, oldle, leafentry_memsize(oldle));
toku_mempool_mfree(&node->u.l.buffer_mempool, oldle, leafentry_memsize(oldle));
leafentries[i] = newle;
}
node->local_fingerprint -= node->rand4fingerprint * diff_fp;
B ->local_fingerprint += B ->rand4fingerprint * diff_fp;
node->u.l.n_bytes_in_buffer -= diff_size;
B ->u.l.n_bytes_in_buffer += diff_size;
}
if ((r = toku_omt_create_from_sorted_array(&B->u.l.buffer, leafentries+break_at, n_leafentries-break_at))) return r;
if ((r = toku_omt_create_from_sorted_array(&node->u.l.buffer, leafentries, break_at))) return r;
toku_free(leafentries);
toku_verify_all_in_mempool(node);
toku_verify_all_in_mempool(B);
toku_omt_destroy(&old_omt);
}
LSN lsn={0};
r = toku_log_leafsplit(logger, &lsn, 0, filenum, node->thisnodename, B->thisnodename, n_leafentries, break_at, node->nodesize, B->rand4fingerprint, (u_int8_t)((t->flags&TOKU_DB_DUPSORT)!=0));
if (logger) {
node->log_lsn = lsn;
B->log_lsn = lsn;
}
//toku_verify_gpma(node->u.l.buffer);
//toku_verify_gpma(B->u.l.buffer);
if (splitk) {
memset(splitk, 0, sizeof *splitk);
OMTVALUE lev = 0;
r=toku_omt_fetch(node->u.l.buffer, toku_omt_size(node->u.l.buffer)-1, &lev, NULL);
assert(r==0); // that fetch should have worked.
LEAFENTRY le=lev;
if (node->flags&TOKU_DB_DUPSORT) {
splitk->size = le_any_keylen(le)+le_any_vallen(le);
splitk->data = kv_pair_malloc(le_any_key(le), le_any_keylen(le), le_any_val(le), le_any_vallen(le));
} else {
splitk->size = le_any_keylen(le);
splitk->data = kv_pair_malloc(le_any_key(le), le_any_keylen(le), 0, 0);
}
splitk->flags=0;
}
assert(r == 0);
assert(node->height>0 || node->u.l.buffer!=0);
/* Remove it from the cache table, and free its storage. */
//printf("%s:%d old pma = %p\n", __FILE__, __LINE__, node->u.l.buffer);
node->dirty = 1;
B ->dirty = 1;
*nodea = node;
*nodeb = B;
//printf("%s:%d new sizes Node %" PRIu64 " size=%u omtsize=%d dirty=%d; Node %" PRIu64 " size=%u omtsize=%d dirty=%d\n", __FILE__, __LINE__,
// node->thisnodename.b, toku_serialize_brtnode_size(node), node->height==0 ? (int)(toku_omt_size(node->u.l.buffer)) : -1, node->dirty,
// B ->thisnodename.b, toku_serialize_brtnode_size(B ), B ->height==0 ? (int)(toku_omt_size(B ->u.l.buffer)) : -1, B->dirty);
//toku_dump_brtnode(t, node->thisnodename, 0, NULL, 0, NULL, 0);
//toku_dump_brtnode(t, B ->thisnodename, 0, NULL, 0, NULL, 0);
return 0;
}
static int
handle_split_of_child_simple (BRT t, BRTNODE node, int childnum,
BRTNODE childa, BRTNODE childb,
......@@ -394,195 +255,9 @@ handle_split_of_child_simple (BRT t, BRTNODE node, int childnum,
TOKULOGGER logger);
static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger);
static int
brt_split_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger) {
if (0) {
printf("%s:%d Node %" PRIu64 "->u.n.n_children=%d estimates=", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children);
int i;
for (i=0; i<node->u.n.n_children; i++) printf(" %" PRId64, BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i));
printf("\n");
}
assert(node->height>0);
BRTNODE child;
{
void *childnode_v;
int r = toku_cachetable_get_and_pin(t->cf,
BNC_BLOCKNUM(node, childnum),
compute_child_fullhash(t->cf, node, childnum),
&childnode_v,
NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback,
t->h);
assert(r==0); // REMOVE LATER
if (r!=0) return r;
child = childnode_v;
assert(child->thisnodename.b!=0);
VERIFY_NODE(t,child);
}
BRTNODE nodea, nodeb;
DBT splitk;
// printf("%s:%d node %" PRIu64 "->u.n.n_children=%d height=%d\n", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children, node->height);
if (child->height==0) {
int r = brtleaf_split(logger, toku_cachefile_filenum(t->cf), t, child, &nodea, &nodeb, &splitk);
assert(r==0); // REMOVE LATER
if (r!=0) return r;
} else {
int r = brt_nonleaf_split(t, child, &nodea, &nodeb, &splitk, logger);
assert(r==0); // REMOVE LATER
if (r!=0) return r;
}
// printf("%s:%d child did split\n", __FILE__, __LINE__);
{
int r = handle_split_of_child_simple (t, node, childnum, nodea, nodeb, &splitk, logger);
if (0) {
printf("%s:%d Node %" PRIu64 "->u.n.n_children=%d estimates=", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children);
int i;
for (i=0; i<node->u.n.n_children; i++) printf(" %" PRId64, BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i));
printf("\n");
}
return r;
}
}
//#define MAX_PATHLEN_TO_ROOT 40
/* Side effect: sets splitk->data pointer to a malloc'd value */
static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger) {
int old_n_children = node->u.n.n_children;
int n_children_in_a = old_n_children/2;
int n_children_in_b = old_n_children-n_children_in_a;
BRTNODE B;
FILENUM fnum = toku_cachefile_filenum(t->cf);
assert(node->height>0);
assert(node->u.n.n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
toku_create_new_brtnode(t, &B, node->height, logger);
MALLOC_N(n_children_in_b+1, B->u.n.childinfos);
MALLOC_N(n_children_in_b, B->u.n.childkeys);
B->u.n.n_children =n_children_in_b;
if (0) {
printf("%s:%d %p (%" PRIu64 ") splits, old estimates:", __FILE__, __LINE__, node, node->thisnodename.b);
int i;
for (i=0; i<node->u.n.n_children; i++) printf(" %" PRId64, BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i));
printf("\n");
}
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
{
/* The first n_children_in_a go into node a.
* That means that the first n_children_in_a-1 keys go into node a.
* The splitter key is key number n_children_in_a */
int i;
for (i=0; i<n_children_in_b; i++) {
int r = toku_fifo_create(&BNC_BUFFER(B,i));
if (r!=0) return r;
BNC_NBYTESINBUF(B,i)=0;
BNC_SUBTREE_FINGERPRINT(B,i)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(B,i)=0;
}
for (i=n_children_in_a; i<old_n_children; i++) {
int targchild = i-n_children_in_a;
FIFO from_htab = BNC_BUFFER(node,i);
FIFO to_htab = BNC_BUFFER(B, targchild);
BLOCKNUM thischildblocknum = BNC_BLOCKNUM(node, i);
BNC_BLOCKNUM(B, targchild) = thischildblocknum;
BNC_HAVE_FULLHASH(B,targchild) = BNC_HAVE_FULLHASH(node,i);
BNC_FULLHASH(B,targchild) = BNC_FULLHASH(node, i);
int r = toku_log_addchild(logger, (LSN*)0, 0, fnum, B->thisnodename, targchild, thischildblocknum, BNC_SUBTREE_FINGERPRINT(node, i));
if (r!=0) return r;
while (1) {
bytevec key, data;
unsigned int keylen, datalen;
u_int32_t type;
TXNID xid;
int fr = toku_fifo_peek(from_htab, &key, &keylen, &data, &datalen, &type, &xid);
if (fr!=0) break;
int n_bytes_moved = keylen+datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
u_int32_t old_from_fingerprint = node->local_fingerprint;
u_int32_t delta = toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
if (r!=0) return r;
if (t->txn_that_created != xid) {
r = toku_log_brtdeq(logger, &node->log_lsn, 0, fnum, node->thisnodename, n_children_in_a);
if (r!=0) return r;
}
r = log_and_save_brtenq(logger, t, B, targchild, xid, type, key, keylen, data, datalen, &B->local_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
if (r!=0) return r;
toku_fifo_deq(from_htab);
// key and data will no longer be valid
node->local_fingerprint = new_from_fingerprint;
B->u.n.n_bytes_in_buffers += n_bytes_moved;
BNC_NBYTESINBUF(B, targchild) += n_bytes_moved;
node->u.n.n_bytes_in_buffers -= n_bytes_moved;
BNC_NBYTESINBUF(node, i) -= n_bytes_moved;
// verify_local_fingerprint_nonleaf(B);
// verify_local_fingerprint_nonleaf(node);
}
// Delete a child, removing it's fingerprint, and also the preceeding pivot key. The child number must be > 0
{
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
assert(i>0);
r = toku_log_delchild(logger, (LSN*)0, 0, fnum, node->thisnodename, n_children_in_a, thischildblocknum, BNC_SUBTREE_FINGERPRINT(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(logger, (LSN*)0, 0, fnum, B->thisnodename, targchild-1, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.childkeys[i-1] = 0;
}
}
BNC_BLOCKNUM(node, i) = make_blocknum(0);
BNC_HAVE_FULLHASH(node, i) = FALSE;
BNC_SUBTREE_FINGERPRINT(B, targchild) = BNC_SUBTREE_FINGERPRINT(node, i);
BNC_SUBTREE_FINGERPRINT(node, i) = 0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(B, targchild) = BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i);
BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i) = 0;
assert(BNC_NBYTESINBUF(node, i) == 0);
}
// Drop the n_children now (not earlier) so that we can do the fingerprint verification at any time.
node->u.n.n_children=n_children_in_a;
for (i=n_children_in_a; i<old_n_children; i++) {
toku_fifo_free(&BNC_BUFFER(node,i));
}
splitk->data = (void*)(node->u.n.childkeys[n_children_in_a-1]);
splitk->size = toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
REALLOC_N(n_children_in_a+1, node->u.n.childinfos);
REALLOC_N(n_children_in_a, node->u.n.childkeys);
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(B);
}
node->dirty = 1;
B ->dirty = 1;
*nodea = node;
*nodeb = B;
assert(toku_serialize_brtnode_size(node) <= node->nodesize);
assert(toku_serialize_brtnode_size(B) <= B->nodesize);
return 0;
}
//#define MAX_PATHLEN_TO_ROOT 40
static void find_heaviest_child (BRTNODE node, int *childnum) {
int max_child = 0;
......@@ -766,179 +441,7 @@ static int brtnode_maybe_push_down(BRT t, BRTNODE node, int *did_split, BRTNODE
static int split_count=0;
/* NODE is a node with a child.
* childnum was split into two nodes childa, and childb. childa is the same as the original child. childb is a new child.
* We must slide things around, & move things from the old table to the new tables.
* We also move things to the new children as much as we can without doing any pushdowns or splitting of the child.
* We must delete the old buffer (but the old child is already deleted.)
* We also unpin the new children.
*/
static int
handle_split_of_child_simple (BRT t, BRTNODE node, int childnum,
BRTNODE childa, BRTNODE childb,
DBT *splitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */
TOKULOGGER logger) {
assert(node->height>0);
assert(0 <= childnum && childnum < node->u.n.n_children);
FIFO old_h = BNC_BUFFER(node,childnum);
int old_count = BNC_NBYTESINBUF(node, childnum);
int cnum;
int r;
if (toku_brt_debug_mode) {
int i;
printf("%s:%d Child %d splitting on %s\n", __FILE__, __LINE__, childnum, (char*)splitk->data);
printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
for(i=0; i<node->u.n.n_children-1; i++) printf(" %s", (char*)node->u.n.childkeys[i]);
printf("\n");
}
node->dirty = 1;
//verify_local_fingerprint_nonleaf(node);
REALLOC_N(node->u.n.n_children+2, node->u.n.childinfos);
REALLOC_N(node->u.n.n_children+1, node->u.n.childkeys);
// Slide the children over.
BNC_SUBTREE_FINGERPRINT (node, node->u.n.n_children+1)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, node->u.n.n_children+1)=0;
for (cnum=node->u.n.n_children; cnum>childnum+1; cnum--) {
node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1];
}
r = toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
node->u.n.n_children++;
assert(BNC_BLOCKNUM(node, childnum).b==childa->thisnodename.b); // use the same child
BNC_BLOCKNUM(node, childnum+1) = childb->thisnodename;
BNC_HAVE_FULLHASH(node, childnum+1) = TRUE;
BNC_FULLHASH(node, childnum+1) = childb->fullhash;
// BNC_SUBTREE_FINGERPRINT(node, childnum)=0; // leave the subtreefingerprint alone for the child, so we can log the change
BNC_SUBTREE_FINGERPRINT (node, childnum+1)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, childnum+1)=0;
fixup_child_fingerprint(node, childnum, childa, t, logger);
fixup_child_fingerprint(node, childnum+1, childb, t, logger);
r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert(r==0);
//verify_local_fingerprint_nonleaf(node); // The fingerprint hasn't changed and everhything is still there.
r=toku_fifo_create(&BNC_BUFFER(node,childnum)); assert(r==0); // ??? SHould handle this error case
BNC_NBYTESINBUF(node, childnum) = 0;
BNC_NBYTESINBUF(node, childnum+1) = 0;
// Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child.
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid,
{
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, skey, skeylen, sval, svallen);
if (t->txn_that_created != xid) {
r = toku_log_brtdeq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum);
assert(r==0);
}
node->local_fingerprint = new_fingerprint;
});
//verify_local_fingerprint_nonleaf(node);
// Slide the keys over
{
struct kv_pair *pivot = splitk->data;
BYTESTRING bs = { .len = splitk->size,
.data = kv_pair_key(pivot) };
r = toku_log_setpivot(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
for (cnum=node->u.n.n_children-2; cnum>childnum; cnum--) {
node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1];
}
//if (logger) assert((t->flags&TOKU_DB_DUPSORT)==0); // the setpivot is wrong for TOKU_DB_DUPSORT, so recovery will be broken.
node->u.n.childkeys[childnum]= pivot;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, pivot);
}
if (toku_brt_debug_mode) {
int i;
printf("%s:%d splitkeys:", __FILE__, __LINE__);
for(i=0; i<node->u.n.n_children-2; i++) printf(" %s", (char*)node->u.n.childkeys[i]);
printf("\n");
}
//verify_local_fingerprint_nonleaf(node);
node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */
/* Keep pushing to the children, but not if the children would require a pushdown */
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid, {
DBT skd; DBT svd;
BRT_CMD_S brtcmd = build_brt_cmd((enum brt_cmd_type)type, xid, toku_fill_dbt(&skd, skey, skeylen), toku_fill_dbt(&svd, sval, svallen));
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
int pusha = 0; int pushb = 0;
switch (type) {
case BRT_INSERT:
case BRT_DELETE_BOTH:
case BRT_DELETE_ANY:
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
if ((type!=BRT_DELETE_ANY && type!=BRT_ABORT_ANY && type!=BRT_COMMIT_ANY) || 0==(t->flags&TOKU_DB_DUPSORT)) {
// If it's an INSERT or DELETE_BOTH or there are no duplicates then we just put the command into one subtree
int cmp = brt_compare_pivot(t, &skd, &svd, splitk->data);
if (cmp <= 0) pusha = 1;
else pushb = 1;
} else {
assert((type==BRT_DELETE_ANY || type==BRT_ABORT_ANY || type==BRT_COMMIT_ANY) && t->flags&TOKU_DB_DUPSORT);
// It is a DELETE or ABORT_ANY and it's a DUPSORT database,
// in which case if the comparison function comes up 0 we must write the command to both children. (See #201)
int cmp = brt_compare_pivot(t, &skd, 0, splitk->data);
if (cmp<=0) pusha=1;
if (cmp>=0) pushb=1; // Could be that both pusha and pushb are set
}
if (pusha) {
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childa, &brtcmd, childnum, logger);
} else {
r=insert_to_buffer_in_nonleaf(node, childnum, &skd, &svd, type, xid);
}
}
if (pushb) {
// If we already have something in the buffer, we must add the new command to the buffer so that commands don't get out of order.
if (toku_fifo_n_entries(BNC_BUFFER(node,childnum+1))==0) {
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, childb, &brtcmd, childnum+1, logger);
} else {
r=insert_to_buffer_in_nonleaf(node, childnum+1, &skd, &svd, type, xid);
}
}
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (r!=0) printf("r=%d\n", r);
assert(r==0);
goto ok;
case BRT_NONE:
// Don't have to do anything in this case, can just drop the command
goto ok;
}
printf("Bad type %d\n", type); // Don't use default: because I want a compiler warning if I forget a enum case, and I want a runtime error if the type isn't one of the expected ones.
assert(0);
ok: /*nothing*/;
});
toku_fifo_free(&old_h);
//verify_local_fingerprint_nonleaf(childa);
//verify_local_fingerprint_nonleaf(childb);
//verify_local_fingerprint_nonleaf(node);
VERIFY_NODE(t, node);
VERIFY_NODE(t, childa);
VERIFY_NODE(t, childb);
r=toku_unpin_brtnode(t, childa);
assert(r==0);
r=toku_unpin_brtnode(t, childb);
assert(r==0);
return 0;
}
/* NODE is a node with a child.
* childnum was split into two nodes childa, and childb. childa is the same as the original child. childb is a new child.
......@@ -1926,18 +1429,6 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
return EINVAL;
}
static void verify_local_fingerprint_nonleaf (BRTNODE node) {
u_int32_t fp=0;
int i;
if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
fp += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
);
assert(fp==node->local_fingerprint);
}
static int brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
TOKULOGGER logger) {
......
......@@ -254,6 +254,19 @@ fixup_child_fingerprint (BRTNODE node, int childnum_of_node, BRTNODE child, BRT
node->dirty=1;
}
static void
verify_local_fingerprint_nonleaf (BRTNODE node)
{
u_int32_t fp=0;
int i;
if (node->height==0) return;
for (i=0; i<node->u.n.n_children; i++)
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
fp += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
);
assert(fp==node->local_fingerprint);
}
static u_int32_t
mp_pool_size_for_nodesize (u_int32_t nodesize)
// Effect: Calculate how big the mppool should be for a node of size NODESIZE. Leave a little extra space for expansion.
......@@ -282,6 +295,38 @@ brtnode_memory_size (BRTNODE node)
}
}
// If you pass in data==0 then it only compares the key, not the data (even if is a DUPSORT database)
static int
brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck)
{
int cmp;
DBT mydbt;
struct kv_pair *kv = (struct kv_pair *) ck;
if (brt->flags & TOKU_DB_DUPSORT) {
cmp = brt->compare_fun(brt->db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
if (cmp == 0 && data != 0)
cmp = brt->dup_compare(brt->db, data, toku_fill_dbt(&mydbt, kv_pair_val(kv), kv_pair_vallen(kv)));
} else {
cmp = brt->compare_fun(brt->db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
}
return cmp;
}
static int log_and_save_brtenq(TOKULOGGER logger, BRT t, BRTNODE node, int childnum, TXNID xid, int type, const char *key, int keylen, const char *data, int datalen, u_int32_t *fingerprint) {
BYTESTRING keybs = {.len=keylen, .data=(char*)key};
BYTESTRING databs = {.len=datalen, .data=(char*)data};
u_int32_t old_fingerprint = *fingerprint;
u_int32_t fdiff=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_fingerprint = old_fingerprint + fdiff;
//printf("%s:%d node=%lld fingerprint old=%08x new=%08x diff=%08x xid=%lld\n", __FILE__, __LINE__, node->thisnodename, old_fingerprint, new_fingerprint, fdiff, (long long)xid);
*fingerprint = new_fingerprint;
if (t->txn_that_created != xid) {
int r = toku_log_brtenq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, xid, type, keybs, databs);
if (r!=0) return r;
}
return 0;
}
static int
allocate_diskblocknumber (BLOCKNUM *res, BRT brt, TOKULOGGER logger __attribute__((__unused__))) {
assert(brt->h->free_blocks.b == -1); // no blocks in the free list
......@@ -408,6 +453,509 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
return 0;
}
static int
fill_buf (OMTVALUE lev, u_int32_t idx, void *varray)
{
LEAFENTRY le=lev;
LEAFENTRY *array=varray;
array[idx]=le;
return 0;
}
static int
brtleaf_split (TOKULOGGER logger, FILENUM filenum, BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk)
// Effect: Split a leaf node.
{
BRTNODE B;
int r;
//printf("%s:%d splitting leaf %" PRIu64 " which is size %u (targetsize = %u)\", __FILE__, __LINE__, node->thisnodename.b, toku_serialize_brtnode_size(node), node->nodesize);
assert(node->height==0);
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
toku_create_new_brtnode(t, &B, 0, logger);
assert(B->nodesize>0);
assert(node->nodesize>0);
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
//printf("%s:%d B is at %lld nodesize=%d\n", __FILE__, __LINE__, B->thisnodename, B->nodesize);
assert(node->height>0 || node->u.l.buffer!=0);
toku_verify_all_in_mempool(node);
u_int32_t n_leafentries = toku_omt_size(node->u.l.buffer);
u_int32_t break_at = 0;
node->u.l.seqinsert = 0;
// Don't mess around with splitting specially for sequential insertions any more.
{
OMTVALUE *MALLOC_N(n_leafentries, leafentries);
assert(leafentries);
toku_omt_iterate(node->u.l.buffer, fill_buf, leafentries);
break_at = 0;
{
u_int32_t i;
u_int32_t sumlesizes=0;
for (i=0; i<n_leafentries; i++) sumlesizes += leafentry_disksize(leafentries[i]);
u_int32_t sumsofar=0;
for (i=0; i<n_leafentries; i++) {
assert(toku_mempool_inrange(&node->u.l.buffer_mempool, leafentries[i], leafentry_memsize(leafentries[i])));
sumsofar += leafentry_disksize(leafentries[i]);
if (sumsofar*2 >= sumlesizes) {
break_at = i;
break;
}
}
}
// Now we know where we are going to break it
OMT old_omt = node->u.l.buffer;
toku_omt_destroy(&B->u.l.buffer); // Destroy B's empty OMT, so I can rebuild it from an array
{
u_int32_t i;
u_int32_t diff_fp = 0;
u_int32_t diff_size = 0;
for (i=break_at; i<n_leafentries; i++) {
LEAFENTRY oldle = leafentries[i];
LEAFENTRY newle = toku_mempool_malloc(&B->u.l.buffer_mempool, leafentry_memsize(oldle), 1);
assert(newle!=0); // it's a fresh mpool, so this should always work.
diff_fp += toku_le_crc(oldle);
diff_size += OMT_ITEM_OVERHEAD + leafentry_disksize(oldle);
memcpy(newle, oldle, leafentry_memsize(oldle));
toku_mempool_mfree(&node->u.l.buffer_mempool, oldle, leafentry_memsize(oldle));
leafentries[i] = newle;
}
node->local_fingerprint -= node->rand4fingerprint * diff_fp;
B ->local_fingerprint += B ->rand4fingerprint * diff_fp;
node->u.l.n_bytes_in_buffer -= diff_size;
B ->u.l.n_bytes_in_buffer += diff_size;
}
if ((r = toku_omt_create_from_sorted_array(&B->u.l.buffer, leafentries+break_at, n_leafentries-break_at))) return r;
if ((r = toku_omt_create_from_sorted_array(&node->u.l.buffer, leafentries, break_at))) return r;
toku_free(leafentries);
toku_verify_all_in_mempool(node);
toku_verify_all_in_mempool(B);
toku_omt_destroy(&old_omt);
}
LSN lsn={0};
r = toku_log_leafsplit(logger, &lsn, 0, filenum, node->thisnodename, B->thisnodename, n_leafentries, break_at, node->nodesize, B->rand4fingerprint, (u_int8_t)((t->flags&TOKU_DB_DUPSORT)!=0));
if (logger) {
node->log_lsn = lsn;
B->log_lsn = lsn;
}
//toku_verify_gpma(node->u.l.buffer);
//toku_verify_gpma(B->u.l.buffer);
if (splitk) {
memset(splitk, 0, sizeof *splitk);
OMTVALUE lev = 0;
r=toku_omt_fetch(node->u.l.buffer, toku_omt_size(node->u.l.buffer)-1, &lev, NULL);
assert(r==0); // that fetch should have worked.
LEAFENTRY le=lev;
if (node->flags&TOKU_DB_DUPSORT) {
splitk->size = le_any_keylen(le)+le_any_vallen(le);
splitk->data = kv_pair_malloc(le_any_key(le), le_any_keylen(le), le_any_val(le), le_any_vallen(le));
} else {
splitk->size = le_any_keylen(le);
splitk->data = kv_pair_malloc(le_any_key(le), le_any_keylen(le), 0, 0);
}
splitk->flags=0;
}
assert(r == 0);
assert(node->height>0 || node->u.l.buffer!=0);
/* Remove it from the cache table, and free its storage. */
//printf("%s:%d old pma = %p\n", __FILE__, __LINE__, node->u.l.buffer);
node->dirty = 1;
B ->dirty = 1;
*nodea = node;
*nodeb = B;
//printf("%s:%d new sizes Node %" PRIu64 " size=%u omtsize=%d dirty=%d; Node %" PRIu64 " size=%u omtsize=%d dirty=%d\n", __FILE__, __LINE__,
// node->thisnodename.b, toku_serialize_brtnode_size(node), node->height==0 ? (int)(toku_omt_size(node->u.l.buffer)) : -1, node->dirty,
// B ->thisnodename.b, toku_serialize_brtnode_size(B ), B ->height==0 ? (int)(toku_omt_size(B ->u.l.buffer)) : -1, B->dirty);
//toku_dump_brtnode(t, node->thisnodename, 0, NULL, 0, NULL, 0);
//toku_dump_brtnode(t, B ->thisnodename, 0, NULL, 0, NULL, 0);
return 0;
}
static int
brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, TOKULOGGER logger)
// Effect: node must be a node-leaf node. It is split into two nodes, and the fanout is split between them.
// Sets splitk->data pointer to a malloc'd value
// Sets nodea, and nodeb to the two new nodes.
// The caller must replace the old node with the two new nodes.
{
int old_n_children = node->u.n.n_children;
int n_children_in_a = old_n_children/2;
int n_children_in_b = old_n_children-n_children_in_a;
BRTNODE B;
FILENUM fnum = toku_cachefile_filenum(t->cf);
assert(node->height>0);
assert(node->u.n.n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
toku_create_new_brtnode(t, &B, node->height, logger);
MALLOC_N(n_children_in_b+1, B->u.n.childinfos);
MALLOC_N(n_children_in_b, B->u.n.childkeys);
B->u.n.n_children =n_children_in_b;
if (0) {
printf("%s:%d %p (%" PRIu64 ") splits, old estimates:", __FILE__, __LINE__, node, node->thisnodename.b);
int i;
for (i=0; i<node->u.n.n_children; i++) printf(" %" PRId64, BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i));
printf("\n");
}
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
{
/* The first n_children_in_a go into node a.
* That means that the first n_children_in_a-1 keys go into node a.
* The splitter key is key number n_children_in_a */
int i;
for (i=0; i<n_children_in_b; i++) {
int r = toku_fifo_create(&BNC_BUFFER(B,i));
if (r!=0) return r;
BNC_NBYTESINBUF(B,i)=0;
BNC_SUBTREE_FINGERPRINT(B,i)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(B,i)=0;
}
for (i=n_children_in_a; i<old_n_children; i++) {
int targchild = i-n_children_in_a;
FIFO from_htab = BNC_BUFFER(node,i);
FIFO to_htab = BNC_BUFFER(B, targchild);
BLOCKNUM thischildblocknum = BNC_BLOCKNUM(node, i);
BNC_BLOCKNUM(B, targchild) = thischildblocknum;
BNC_HAVE_FULLHASH(B,targchild) = BNC_HAVE_FULLHASH(node,i);
BNC_FULLHASH(B,targchild) = BNC_FULLHASH(node, i);
int r = toku_log_addchild(logger, (LSN*)0, 0, fnum, B->thisnodename, targchild, thischildblocknum, BNC_SUBTREE_FINGERPRINT(node, i));
if (r!=0) return r;
while (1) {
bytevec key, data;
unsigned int keylen, datalen;
u_int32_t type;
TXNID xid;
int fr = toku_fifo_peek(from_htab, &key, &keylen, &data, &datalen, &type, &xid);
if (fr!=0) break;
int n_bytes_moved = keylen+datalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
u_int32_t old_from_fingerprint = node->local_fingerprint;
u_int32_t delta = toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
if (r!=0) return r;
if (t->txn_that_created != xid) {
r = toku_log_brtdeq(logger, &node->log_lsn, 0, fnum, node->thisnodename, n_children_in_a);
if (r!=0) return r;
}
r = log_and_save_brtenq(logger, t, B, targchild, xid, type, key, keylen, data, datalen, &B->local_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
if (r!=0) return r;
toku_fifo_deq(from_htab);
// key and data will no longer be valid
node->local_fingerprint = new_from_fingerprint;
B->u.n.n_bytes_in_buffers += n_bytes_moved;
BNC_NBYTESINBUF(B, targchild) += n_bytes_moved;
node->u.n.n_bytes_in_buffers -= n_bytes_moved;
BNC_NBYTESINBUF(node, i) -= n_bytes_moved;
// verify_local_fingerprint_nonleaf(B);
// verify_local_fingerprint_nonleaf(node);
}
// Delete a child, removing it's fingerprint, and also the preceeding pivot key. The child number must be > 0
{
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
assert(i>0);
r = toku_log_delchild(logger, (LSN*)0, 0, fnum, node->thisnodename, n_children_in_a, thischildblocknum, BNC_SUBTREE_FINGERPRINT(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(logger, (LSN*)0, 0, fnum, B->thisnodename, targchild-1, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
node->u.n.childkeys[i-1] = 0;
}
}
BNC_BLOCKNUM(node, i) = make_blocknum(0);
BNC_HAVE_FULLHASH(node, i) = FALSE;
BNC_SUBTREE_FINGERPRINT(B, targchild) = BNC_SUBTREE_FINGERPRINT(node, i);
BNC_SUBTREE_FINGERPRINT(node, i) = 0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(B, targchild) = BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i);
BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i) = 0;
assert(BNC_NBYTESINBUF(node, i) == 0);
}
// Drop the n_children now (not earlier) so that we can do the fingerprint verification at any time.
node->u.n.n_children=n_children_in_a;
for (i=n_children_in_a; i<old_n_children; i++) {
toku_fifo_free(&BNC_BUFFER(node,i));
}
splitk->data = (void*)(node->u.n.childkeys[n_children_in_a-1]);
splitk->size = toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
node->u.n.totalchildkeylens -= toku_brt_pivot_key_len(t, node->u.n.childkeys[n_children_in_a-1]);
REALLOC_N(n_children_in_a+1, node->u.n.childinfos);
REALLOC_N(n_children_in_a, node->u.n.childkeys);
verify_local_fingerprint_nonleaf(node);
verify_local_fingerprint_nonleaf(B);
}
node->dirty = 1;
B ->dirty = 1;
*nodea = node;
*nodeb = B;
assert(toku_serialize_brtnode_size(node) <= node->nodesize);
assert(toku_serialize_brtnode_size(B) <= B->nodesize);
return 0;
}
static int
insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v, int type, TXNID xid)
{
unsigned int n_bytes_added = BRT_CMD_OVERHEAD + KEY_VALUE_OVERHEAD + k->size + v->size;
int r = toku_fifo_enq(BNC_BUFFER(node,childnum), k->data, k->size, v->data, v->size, type, xid);
if (r!=0) return r;
// printf("%s:%d fingerprint %08x -> ", __FILE__, __LINE__, node->local_fingerprint);
node->local_fingerprint += node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, k->data, k->size, v->data, v->size);
// printf(" %08x\n", node->local_fingerprint);
BNC_NBYTESINBUF(node,childnum) += n_bytes_added;
node->u.n.n_bytes_in_buffers += n_bytes_added;
node->dirty = 1;
return 0;
}
/* NODE is a node with a child.
* childnum was split into two nodes childa, and childb. childa is the same as the original child. childb is a new child.
* We must slide things around, & move things from the old table to the new tables.
* We don't push anything down to children. We split the node, and things land wherever they land.
* We must delete the old buffer (but the old child is already deleted.)
* On return, the new children are unpinned.
*/
static int
handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRTNODE childa, BRTNODE childb,
DBT *splitk, /* the data in the childsplitk is alloc'd and is consumed by this call. */
TOKULOGGER logger)
{
assert(node->height>0);
assert(0 <= childnum && childnum < node->u.n.n_children);
FIFO old_h = BNC_BUFFER(node,childnum);
int old_count = BNC_NBYTESINBUF(node, childnum);
int cnum;
int r;
if (toku_brt_debug_mode) {
int i;
printf("%s:%d Child %d splitting on %s\n", __FILE__, __LINE__, childnum, (char*)splitk->data);
printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
for(i=0; i<node->u.n.n_children-1; i++) printf(" %s", (char*)node->u.n.childkeys[i]);
printf("\n");
}
node->dirty = 1;
//verify_local_fingerprint_nonleaf(node);
REALLOC_N(node->u.n.n_children+2, node->u.n.childinfos);
REALLOC_N(node->u.n.n_children+1, node->u.n.childkeys);
// Slide the children over.
BNC_SUBTREE_FINGERPRINT (node, node->u.n.n_children+1)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, node->u.n.n_children+1)=0;
for (cnum=node->u.n.n_children; cnum>childnum+1; cnum--) {
node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1];
}
r = toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
node->u.n.n_children++;
assert(BNC_BLOCKNUM(node, childnum).b==childa->thisnodename.b); // use the same child
BNC_BLOCKNUM(node, childnum+1) = childb->thisnodename;
BNC_HAVE_FULLHASH(node, childnum+1) = TRUE;
BNC_FULLHASH(node, childnum+1) = childb->fullhash;
// BNC_SUBTREE_FINGERPRINT(node, childnum)=0; // leave the subtreefingerprint alone for the child, so we can log the change
BNC_SUBTREE_FINGERPRINT (node, childnum+1)=0;
BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, childnum+1)=0;
fixup_child_fingerprint(node, childnum, childa, t, logger);
fixup_child_fingerprint(node, childnum+1, childb, t, logger);
r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert(r==0);
//verify_local_fingerprint_nonleaf(node); // The fingerprint hasn't changed and everhything is still there.
r=toku_fifo_create(&BNC_BUFFER(node,childnum)); assert(r==0); // ??? SHould handle this error case
BNC_NBYTESINBUF(node, childnum) = 0;
BNC_NBYTESINBUF(node, childnum+1) = 0;
// Remove all the cmds from the local fingerprint. Some may get added in again when we try to push to the child.
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid,
{
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, skey, skeylen, sval, svallen);
if (t->txn_that_created != xid) {
r = toku_log_brtdeq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum);
assert(r==0);
}
node->local_fingerprint = new_fingerprint;
});
//verify_local_fingerprint_nonleaf(node);
// Slide the keys over
{
struct kv_pair *pivot = splitk->data;
BYTESTRING bs = { .len = splitk->size,
.data = kv_pair_key(pivot) };
r = toku_log_setpivot(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
for (cnum=node->u.n.n_children-2; cnum>childnum; cnum--) {
node->u.n.childkeys[cnum] = node->u.n.childkeys[cnum-1];
}
//if (logger) assert((t->flags&TOKU_DB_DUPSORT)==0); // the setpivot is wrong for TOKU_DB_DUPSORT, so recovery will be broken.
node->u.n.childkeys[childnum]= pivot;
node->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, pivot);
}
if (toku_brt_debug_mode) {
int i;
printf("%s:%d splitkeys:", __FILE__, __LINE__);
for(i=0; i<node->u.n.n_children-2; i++) printf(" %s", (char*)node->u.n.childkeys[i]);
printf("\n");
}
//verify_local_fingerprint_nonleaf(node);
node->u.n.n_bytes_in_buffers -= old_count; /* By default, they are all removed. We might add them back in. */
/* Keep pushing to the children, but not if the children would require a pushdown */
FIFO_ITERATE(old_h, skey, skeylen, sval, svallen, type, xid, {
DBT skd; DBT svd;
toku_fill_dbt(&skd, skey, skeylen);
toku_fill_dbt(&svd, sval, svallen);
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
int pusha = 0; int pushb = 0;
switch (type) {
case BRT_INSERT:
case BRT_DELETE_BOTH:
case BRT_DELETE_ANY:
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
if ((type!=BRT_DELETE_ANY && type!=BRT_ABORT_ANY && type!=BRT_COMMIT_ANY) || 0==(t->flags&TOKU_DB_DUPSORT)) {
// If it's an INSERT or DELETE_BOTH or there are no duplicates then we just put the command into one subtree
int cmp = brt_compare_pivot(t, &skd, &svd, splitk->data);
if (cmp <= 0) pusha = 1;
else pushb = 1;
} else {
assert((type==BRT_DELETE_ANY || type==BRT_ABORT_ANY || type==BRT_COMMIT_ANY) && t->flags&TOKU_DB_DUPSORT);
// It is a DELETE or ABORT_ANY and it's a DUPSORT database,
// in which case if the comparison function comes up 0 we must write the command to both children. (See #201)
int cmp = brt_compare_pivot(t, &skd, 0, splitk->data);
if (cmp<=0) pusha=1;
if (cmp>=0) pushb=1; // Could be that both pusha and pushb are set
}
if (pusha) {
r=insert_to_buffer_in_nonleaf(node, childnum, &skd, &svd, type, xid);
}
if (pushb) {
r=insert_to_buffer_in_nonleaf(node, childnum+1, &skd, &svd, type, xid);
}
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (r!=0) printf("r=%d\n", r);
assert(r==0);
goto ok;
case BRT_NONE:
// Don't have to do anything in this case, can just drop the command
goto ok;
}
printf("Bad type %d\n", type); // Don't use default: because I want a compiler warning if I forget a enum case, and I want a runtime error if the type isn't one of the expected ones.
assert(0);
ok: /*nothing*/;
});
toku_fifo_free(&old_h);
//verify_local_fingerprint_nonleaf(childa);
//verify_local_fingerprint_nonleaf(childb);
//verify_local_fingerprint_nonleaf(node);
VERIFY_NODE(t, node);
VERIFY_NODE(t, childa);
VERIFY_NODE(t, childb);
r=toku_unpin_brtnode(t, childa);
assert(r==0);
r=toku_unpin_brtnode(t, childb);
assert(r==0);
return 0;
}
static int
brt_split_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger)
{
if (0) {
printf("%s:%d Node %" PRIu64 "->u.n.n_children=%d estimates=", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children);
int i;
for (i=0; i<node->u.n.n_children; i++) printf(" %" PRId64, BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i));
printf("\n");
}
assert(node->height>0);
BRTNODE child;
{
void *childnode_v;
int r = toku_cachetable_get_and_pin(t->cf,
BNC_BLOCKNUM(node, childnum),
compute_child_fullhash(t->cf, node, childnum),
&childnode_v,
NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback,
t->h);
assert(r==0); // REMOVE LATER
if (r!=0) return r;
child = childnode_v;
assert(child->thisnodename.b!=0);
VERIFY_NODE(t,child);
}
BRTNODE nodea, nodeb;
DBT splitk;
// printf("%s:%d node %" PRIu64 "->u.n.n_children=%d height=%d\n", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children, node->height);
if (child->height==0) {
int r = brtleaf_split(logger, toku_cachefile_filenum(t->cf), t, child, &nodea, &nodeb, &splitk);
assert(r==0); // REMOVE LATER
if (r!=0) return r;
} else {
int r = brt_nonleaf_split(t, child, &nodea, &nodeb, &splitk, logger);
assert(r==0); // REMOVE LATER
if (r!=0) return r;
}
// printf("%s:%d child did split\n", __FILE__, __LINE__);
{
int r = handle_split_of_child (t, node, childnum, nodea, nodeb, &splitk, logger);
if (0) {
printf("%s:%d Node %" PRIu64 "->u.n.n_children=%d estimates=", __FILE__, __LINE__, node->thisnodename.b, node->u.n.n_children);
int i;
for (i=0; i<node->u.n.n_children; i++) printf(" %" PRId64, BNC_SUBTREE_LEAFENTRY_ESTIMATE(node, i));
printf("\n");
}
return r;
}
}
static int
should_compare_both_keys (BRTNODE node, BRT_CMD cmd)
// Effect: Return nonzero if we need to compare both the key and the value.
......@@ -873,21 +1421,6 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
return 0;
}
static int log_and_save_brtenq(TOKULOGGER logger, BRT t, BRTNODE node, int childnum, TXNID xid, int type, const char *key, int keylen, const char *data, int datalen, u_int32_t *fingerprint) {
BYTESTRING keybs = {.len=keylen, .data=(char*)key};
BYTESTRING databs = {.len=datalen, .data=(char*)data};
u_int32_t old_fingerprint = *fingerprint;
u_int32_t fdiff=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_fingerprint = old_fingerprint + fdiff;
//printf("%s:%d node=%lld fingerprint old=%08x new=%08x diff=%08x xid=%lld\n", __FILE__, __LINE__, node->thisnodename, old_fingerprint, new_fingerprint, fdiff, (long long)xid);
*fingerprint = new_fingerprint;
if (t->txn_that_created != xid) {
int r = toku_log_brtenq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, xid, type, keybs, databs);
if (r!=0) return r;
}
return 0;
}
static int brt_nonleaf_cmd_once_to_child (BRT t, BRTNODE node, unsigned int childnum, BRT_CMD cmd, TOKULOGGER logger,
enum reactivity re_array[], BOOL *did_io)
{
......@@ -944,23 +1477,6 @@ static int brt_nonleaf_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER lo
return brt_nonleaf_cmd_once_to_child (t, node, childnum, cmd, logger, re_array, did_io);
}
// If you pass in data==0 then it only compares the key, not the data (even if is a DUPSORT database)
static int
brt_compare_pivot(BRT brt, DBT *key, DBT *data, bytevec ck)
{
int cmp;
DBT mydbt;
struct kv_pair *kv = (struct kv_pair *) ck;
if (brt->flags & TOKU_DB_DUPSORT) {
cmp = brt->compare_fun(brt->db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
if (cmp == 0 && data != 0)
cmp = brt->dup_compare(brt->db, data, toku_fill_dbt(&mydbt, kv_pair_val(kv), kv_pair_vallen(kv)));
} else {
cmp = brt->compare_fun(brt->db, key, toku_fill_dbt(&mydbt, kv_pair_key(kv), kv_pair_keylen(kv)));
}
return cmp;
}
static int
brt_nonleaf_cmd_many (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
enum reactivity re_array[], BOOL *did_io)
......@@ -1035,6 +1551,18 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
return EINVAL;
}
static int
brt_merge_child (BRT t, BRTNODE node, int childnum, BOOL *did_io)
{
t = t; node = node; childnum = childnum; did_io=did_io;
static int printcount=0;
printcount++;
if (0==(printcount & (printcount-1))) {// is printcount a power of two?
printf("%s:%d %s not ready (%d invocations)\n", __FILE__, __LINE__, __func__, printcount);
}
return 0;
}
static int
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum reactivity *re, BOOL *did_io)
// Effect: Push CMD into the subtree rooted at NODE, and indicate whether as a result NODE should split or should merge.
......@@ -1058,7 +1586,7 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum react
switch (child_re[childnum]) {
case RE_STABLE: goto next_child; // Could be a continue, but it seems fragile
case RE_FISSIBLE:
r = brt_split_child(t, node, childnum, did_io);
r = brt_split_child(t, node, childnum, logger);
if (r!=0) goto return_r;
goto reacted;
case RE_FUSIBLE:
......@@ -1096,22 +1624,22 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
// Note: During the initial descent, we may gorged many nonleaf nodes. We wish to flush only one nonleaf node at each level.
{
BRTNODE node = *nodep;
enum should_status should;
enum reactivity re;
BOOL did_io = FALSE;
BOOL should_split =-1;
BOOL should_merge =-1;
{
int r = brtnode_put_cmd(brt, node, cmd, logger, &should, &did_io);
int r = brtnode_put_cmd(brt, node, cmd, logger, &re, &did_io);
if (r!=0) return r;
//if (should_split) printf("%s:%d Pushed something simple, should_split=1\n", __FILE__, __LINE__);
}
assert(should_split!=(BOOL)-1 && should_merge!=(BOOL)-1);
assert(!(should_split && should_merge));
//printf("%s:%d should_split=%d node_size=%" PRIu64 "\n", __FILE__, __LINE__, should_split, brtnode_memory_size(node));
if (should_split) {
switch (re) {
case RE_STABLE:
return 0;
case RE_FUSIBLE:
// The root node should split, so make a new root.
{
BRTNODE nodea,nodeb;
DBT splitk;
if (node->height==0) {
......@@ -1122,11 +1650,12 @@ static int push_something_at_root (BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT
if (r!=0) return r;
}
return brt_init_new_root(brt, nodea, nodeb, splitk, rootp, logger, nodep);
} else if (should_merge) {
}
case RE_FISSIBLE:
return 0; // Cannot merge anything at the root, so return happy.
} else {
return 0;
}
assert(0); // cannot happen
return -1;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment