Commit f23a89f2 authored by Rich Prohaska's avatar Rich Prohaska

simplify. addresses #247 #250

git-svn-id: file:///svn/tokudb@1559 c7de825b-a66e-492c-adef-691d508d4ae1
parent cacb2342
......@@ -683,16 +683,10 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BRTNODE tochild = childa;
if (type == BRT_INSERT || type == BRT_DELETE_BOTH) {
int cmp = brt_compare_pivot(t, &skd, &svd, childsplitk->data);
if (cmp < 0) {
;
} else if (cmp > 0) {
tochildnum = childnum+1; tochild = childb;
} else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_R) {
if (cmp > 0) {
tochildnum = childnum+1; tochild = childb;
}
}
}
r=push_brt_cmd_down_only_if_it_wont_push_more_else_put_here(t, node, tochild, &brtcmd, tochildnum, txn);
//verify_local_fingerprint_nonleaf(childa); verify_local_fingerprint_nonleaf(childb);
if (type == BRT_DELETE) {
......@@ -955,38 +949,6 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
}
}
/* find the rightmost child that the key/data will be inserted */
static unsigned int brtnode_right_child (BRTNODE node, DBT *k, DBT *data, BRT t) {
assert(node->height>0);
int maybe = -1; /* last pivot that matched the key */
int i;
for (i=node->u.n.n_children-2; i >= 0; i--) {
int cmp = brt_compare_pivot(t, k, data, node->u.n.childkeys[i]);
if (cmp < 0) {
continue;
} else if (cmp > 0) {
if (maybe != -1) goto foundkeymatch;
return i+1;
} else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R)
return i+1;
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L)
return i;
maybe = i;
} else
maybe = i;
}
maybe = 0;
foundkeymatch:
if (!(node->u.n.pivotflags[maybe] & BRT_PIVOT_PRESENT_L)) {
node->u.n.pivotflags[maybe] |= BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
return maybe;
}
/* find the leftmost child that may contain the key */
static unsigned int brtnode_left_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
int i;
......@@ -995,24 +957,19 @@ static unsigned int brtnode_left_child (BRTNODE node , DBT *k, DBT *d, BRT t) {
int cmp = brt_compare_pivot(t, k, d, node->u.n.childkeys[i]);
if (cmp > 0) continue;
if (cmp < 0) return i;
if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L)
return i;
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R)
return i+1;
}
return i;
}
return node->u.n.n_children-1;
}
static inline unsigned int brtnode_which_child (BRTNODE node , DBT *k, BRT t) {
return brtnode_left_child(node, k, 0, t);
static unsigned int brtnode_right_child (BRTNODE node, DBT *k, DBT *data, BRT t) {
return brtnode_left_child(node, k, data, t);
}
static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk, int debug, TOKUTXN txn, int childnum, int maybe) {
/* put a cmd into a nodes child */
static int brt_nonleaf_put_cmd_child_node (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKUTXN txn, int childnum, int maybe) {
int r;
void *child_v;
BRTNODE child;
......@@ -1060,26 +1017,16 @@ static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
int toku_brt_do_push_cmd = 1;
static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb,
DBT *splitk,
int debug,
TOKUTXN txn) {
/* put a cmd into a node at childnum */
static int brt_nonleaf_put_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKUTXN txn, unsigned int childnum, int can_push, int *do_push_down) {
//verify_local_fingerprint_nonleaf(node);
unsigned int childnum;
int found;
int type = cmd->type;
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
childnum = brtnode_right_child(node, k, v, t);
//rfp printf("nonleaf_insert %d,%d -> %lld %d\n", htonl(*(int*)k->data), *(int*)v->data, node->thisnodename, childnum);
/* non-buffering mode when cursors are open on this child */
if (node->u.n.n_cursors[childnum] > 0) {
assert(node->u.n.n_bytes_in_hashtable[childnum] == 0);
int r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 0);
int r = brt_nonleaf_put_cmd_child_node(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 0);
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
......@@ -1089,45 +1036,20 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
return r;
}
//verify_local_fingerprint_nonleaf(node);
{
int anytype;
bytevec olddata;
ITEMLEN olddatalen;
found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen, &anytype);
//verify_local_fingerprint_nonleaf(node);
if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, "");
toku_verify_counts(node);
if (found) {
if (!(t->flags & TOKU_DB_DUP)) {
//printf("%s:%d found and deleting\n", __FILE__, __LINE__);
node->local_fingerprint -= node->rand4fingerprint * toku_calccrc32_cmd(anytype, k->data, k->size, olddata, olddatalen);
int r = toku_hash_delete(node->u.n.htables[childnum], k->data, k->size);
/* Be careful, olddata is now invalid because of the delete. */
int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
assert(r==0);
node->u.n.n_bytes_in_hashtables -= diff;
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
node->dirty = 1;
//printf("%s:%d deleted %d bytes\n", __FILE__, __LINE__, diff);
found = 0;
}
}
}
//verify_local_fingerprint_nonleaf(node);
/* if the child is in the cache table then push the cmd to it
otherwise just put it into this node's buffer */
if (!found && toku_brt_do_push_cmd) {
int r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1);
if (r == 0) {
//printf("%s:%d\n", __FILE__, __LINE__);
/* try to push the cmd to the subtree if the buffer is empty and pushes are enabled */
if (node->u.n.n_bytes_in_hashtable[childnum] == 0 && can_push && toku_brt_do_push_cmd) {
int r = brt_nonleaf_put_cmd_child_node(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1);
if (r == 0)
return r;
}
}
//verify_local_fingerprint_nonleaf(node);
/* append the cmd to the child buffer */
{
int type = cmd->type;
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
int r=toku_hash_insert(node->u.n.htables[childnum], k->data, k->size, v->data, v->size, type);
assert(r==0);
......@@ -1136,103 +1058,30 @@ static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
node->u.n.n_bytes_in_hashtable[childnum] += diff;
node->dirty = 1;
}
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//verify_local_fingerprint_nonleaf(node);
int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) {
assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
assert((*nodea)->u.n.n_children>0);
assert((*nodeb)->u.n.n_children>0);
assert(BRTNODE_CHILD_DISKOFF(*nodea, (*nodea)->u.n.n_children-1)!=0);
assert(BRTNODE_CHILD_DISKOFF(*nodeb, (*nodeb)->u.n.n_children-1)!=0);
toku_verify_counts(*nodea);
toku_verify_counts(*nodeb);
} else {
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
toku_verify_counts(node);
}
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
*do_push_down = 1;
return 0;
}
static int brt_nonleaf_delete_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
static int brt_nonleaf_insert_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
int *did_split, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
int debug, TOKUTXN txn, unsigned int childnum) {
int debug, TOKUTXN txn) {
//verify_local_fingerprint_nonleaf(node);
int found;
int type = cmd->type;
DBT *k = cmd->u.id.key;
DBT *v = cmd->u.id.val;
unsigned int childnum;
int r;
/* non-buffering mode when cursors are open on this child */
if (node->u.n.n_cursors[childnum] > 0) {
assert(node->u.n.n_bytes_in_hashtable[childnum] == 0);
int r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 0);
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
return r;
}
/* find the right subtree */
childnum = brtnode_right_child(node, cmd->u.id.key, cmd->u.id.val, t);
//verify_local_fingerprint_nonleaf(node);
{
int anytype;
bytevec olddata;
ITEMLEN olddatalen;
found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen, &anytype);
/* put the cmd in the subtree */
int do_push_down = 0;
r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1, &do_push_down);
if (r != 0) return r;
//verify_local_fingerprint_nonleaf(node);
if (debug) printf("%s:%d %*sDoing hash_insert\n", __FILE__, __LINE__, debug, "");
toku_verify_counts(node);
while (found) {
//printf("%s:%d found and deleting\n", __FILE__, __LINE__);
node->local_fingerprint -= node->rand4fingerprint * toku_calccrc32_cmd(anytype, k->data, k->size, olddata, olddatalen);
int r = toku_hash_delete(node->u.n.htables[childnum], k->data, k->size);
/* Be careful, olddata is now invalid because of the delete. */
int diff = k->size + olddatalen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
assert(r==0);
node->u.n.n_bytes_in_hashtables -= diff;
node->u.n.n_bytes_in_hashtable[childnum] -= diff;
node->dirty = 1;
//printf("%s:%d deleted %d bytes\n", __FILE__, __LINE__, diff);
found = !toku_hash_find(node->u.n.htables[childnum], k->data, k->size, &olddata, &olddatalen, &anytype);
}
}
//verify_local_fingerprint_nonleaf(node);
/* if the child is in the cache table then push the cmd to it
otherwise just put it into this node's buffer */
if (toku_brt_do_push_cmd) {
int r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, childnum, 1);
if (r == 0) {
//printf("%s:%d\n", __FILE__, __LINE__);
return r;
}
}
//verify_local_fingerprint_nonleaf(node);
{
int diff = k->size + v->size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
int r=toku_hash_insert(node->u.n.htables[childnum], k->data, k->size, v->data, v->size, type);
assert(r==0);
node->local_fingerprint += node->rand4fingerprint * toku_calccrc32_cmd(type, k->data, k->size, v->data, v->size);
node->u.n.n_bytes_in_hashtables += diff;
node->u.n.n_bytes_in_hashtable[childnum] += diff;
node->dirty = 1;
}
/* maybe push down */
if (do_push_down) {
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//verify_local_fingerprint_nonleaf(node);
int r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) {
......@@ -1240,8 +1089,8 @@ static int brt_nonleaf_delete_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
assert((*nodea)->u.n.n_children>0);
assert((*nodeb)->u.n.n_children>0);
assert(BRTNODE_CHILD_DISKOFF(*nodea,(*nodea)->u.n.n_children-1)!=0);
assert(BRTNODE_CHILD_DISKOFF(*nodeb,(*nodeb)->u.n.n_children-1)!=0);
assert(BRTNODE_CHILD_DISKOFF(*nodea, (*nodea)->u.n.n_children-1)!=0);
assert(BRTNODE_CHILD_DISKOFF(*nodeb, (*nodeb)->u.n.n_children-1)!=0);
toku_verify_counts(*nodea);
toku_verify_counts(*nodeb);
} else {
......@@ -1254,6 +1103,7 @@ static int brt_nonleaf_delete_cmd_child (BRT t, BRTNODE node, BRT_CMD *cmd,
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
}
return 0;
}
......@@ -1281,30 +1131,7 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
} else if (t->flags & TOKU_DB_DUPSORT) {
delchild_append(i);
delchild_append(i+1);
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R) {
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_R;
node->dirty = 1;
}
} else if (t->flags & TOKU_DB_DUP) {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
delchild_append(i);
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_R) {
delchild_append(i+1);
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_R;
node->dirty = 1;
}
} else {
if (node->u.n.pivotflags[i] & BRT_PIVOT_PRESENT_L) {
node->u.n.pivotflags[i] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
}
delchild_append(i);
break;
}
......@@ -1314,19 +1141,39 @@ static int brt_nonleaf_delete_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
delchild_append(node->u.n.n_children-1);
/* issue the delete cmd to all of the children found previously */
int do_push_down = 0;
for (i=0; i<delidx; i++) {
r = brt_nonleaf_delete_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, delchild[i]);
r = brt_nonleaf_put_cmd_child(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn, delchild[i], delidx == 1, &do_push_down);
assert(r == 0);
}
/* post condition: for all pk(i) == k -> assert pf(i) == 0 */
for (i=0; i < node->u.n.n_children-1; i++) {
int cmp = brt_compare_pivot(t, cmd->u.id.key, 0, node->u.n.childkeys[i]);
if (cmp == 0)
assert(node->u.n.pivotflags[i] == 0);
if (do_push_down) {
/* maybe push down */
if (debug) printf("%s:%d %*sDoing maybe_push_down\n", __FILE__, __LINE__, debug, "");
//verify_local_fingerprint_nonleaf(node);
r = brtnode_maybe_push_down(t, node, did_split, nodea, nodeb, splitk, debugp1(debug), txn);
if (r!=0) return r;
if (debug) printf("%s:%d %*sDid maybe_push_down\n", __FILE__, __LINE__, debug, "");
if (*did_split) {
assert(toku_serialize_brtnode_size(*nodea)<=(*nodea)->nodesize);
assert(toku_serialize_brtnode_size(*nodeb)<=(*nodeb)->nodesize);
assert((*nodea)->u.n.n_children>0);
assert((*nodeb)->u.n.n_children>0);
assert(BRTNODE_CHILD_DISKOFF(*nodea,(*nodea)->u.n.n_children-1)!=0);
assert(BRTNODE_CHILD_DISKOFF(*nodeb,(*nodeb)->u.n.n_children-1)!=0);
toku_verify_counts(*nodea);
toku_verify_counts(*nodeb);
} else {
assert(toku_serialize_brtnode_size(node)<=node->nodesize);
toku_verify_counts(node);
}
//if (*did_split) {
// verify_local_fingerprint_nonleaf(*nodea);
// verify_local_fingerprint_nonleaf(*nodeb);
//} else {
// verify_local_fingerprint_nonleaf(node);
//}
}
return 0;
}
static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
......@@ -1334,11 +1181,11 @@ static int brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_CMD *cmd,
DBT *splitk,
int debug,
TOKUTXN txn) {
if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH)
if (cmd->type == BRT_INSERT || cmd->type == BRT_DELETE_BOTH) {
return brt_nonleaf_insert_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn);
else if (cmd->type == BRT_DELETE)
} else if (cmd->type == BRT_DELETE) {
return brt_nonleaf_delete_cmd(t, node, cmd, did_split, nodea, nodeb, splitk, debug, txn);
else
} else
return EINVAL;
}
......@@ -1840,99 +1687,7 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
return r;
}
#if 0
static int brt_lookup_node (BRT brt, DISKOFF off, DBT *k, DBT *v) {
int result;
void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0)
return r;
BRTNODE node = node_v;
assert(node->tag == TYP_BRTNODE);
int childnum;
//printf("%s:%d pin %p height=%d children=%d\n", __FILE__, __LINE__, node_v, node->height, node->u.n.n_children);
if (node->height==0) {
result = toku_pma_lookup(node->u.l.buffer, k, v);
//printf("%s:%d looked up something, got answerlen=%d\n", __FILE__, __LINE__, answerlen);
//verify_local_fingerprint_nonleaf(node);
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
assert(r == 0);
return result;
}
childnum = brtnode_which_child(node, k, brt);
{
bytevec hanswer;
ITEMLEN hanswerlen;
int type;
if (toku_hash_find (node->u.n.htables[childnum], k->data, k->size, &hanswer, &hanswerlen, &type)==0) {
if (type == BRT_INSERT) {
if ((brt->flags & TOKU_DB_DUP)) {
result = brt_lookup_node(brt, BRTNODE_CHILD_DISKOFF(node, childnum), k, v);
if (result != 0) {
toku_dbt_set_value(v, hanswer, hanswerlen, &brt->sval);
result = 0;
}
} else {
//printf("Found %d bytes\n", *vallen);
toku_dbt_set_value(v, hanswer, hanswerlen, &brt->sval);
//printf("%s:%d Returning %p\n", __FILE__, __LINE__, v->data);
result = 0;
}
} else if (type == BRT_DELETE) {
if ((brt->flags & TOKU_DB_DUP) && toku_hash_find_idx (node->u.n.htables[childnum], k->data, k->size, 1, &hanswer, &hanswerlen, &type) == 0) {
assert(type == BRT_INSERT);
toku_dbt_set_value(v, hanswer, hanswerlen, &brt->sval);
result = 0;
} else
result = DB_NOTFOUND;
} else {
assert(0);
result = EINVAL;
}
//verify_local_fingerprint_nonleaf(node);
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
assert(r == 0);
return result;
}
}
result = brt_lookup_node(brt, BRTNODE_CHILD_DISKOFF(node, childnum), k, v);
//verify_local_fingerprint_nonleaf(node);
r = toku_cachetable_unpin(brt->cf, off, 0, 0);
assert(r == 0);
return result;
}
#endif
int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
#if 0
int r;
CACHEKEY *rootp;
//assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // That assertion isn't right. An open cursor could cause things to be pinned.
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
printf("%s:%d\n", __FILE__, __LINE__);
if (0) { died0: toku_unpin_brt_header(brt); }
// printf("%s:%d returning %d\n", __FILE__, __LINE__, r);
//assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // That assertion isn't right. An open cursor could cause things to be pinned.
return r;
}
rootp = toku_calculate_root_offset_pointer(brt);
if ((r = brt_lookup_node(brt, *rootp, k, v))) {
// printf("%s:%d\n", __FILE__, __LINE__);
goto died0;
}
//printf("%s:%d r=%d", __FILE__, __LINE__, r); if (r==0) printf(" vallen=%d", *vallen); printf("\n");
if ((r = toku_unpin_brt_header(brt))!=0) return r;
//assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // That assertion isn't right. An open cursor could cause things to be pinned.
return 0;
#else
int r, rr;
BRT_CURSOR cursor;
......@@ -1945,10 +1700,8 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
rr = toku_brt_cursor_close(cursor); assert(rr == 0);
return r;
#endif
}
int toku_brt_delete(BRT brt, DBT *key) {
int r;
BRT_CMD brtcmd;
......@@ -2425,11 +2178,12 @@ static int brtcurs_set_position_last (BRT_CURSOR cursor, DISKOFF off, DBT *key,
int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) {
if (0) { died0: toku_cachetable_unpin(brt->cf, off, 1, 0); }
return r;
}
if (r!=0) return r;
BRTNODE node = node_v;
if (0) {
died0: toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, 0); return r;
}
assert(cursor->path_len<CURSOR_PATHLEN_LIMIT);
cursor->path[cursor->path_len++] = node;
if (node->height>0) {
......@@ -2456,7 +2210,8 @@ static int brtcurs_set_position_last (BRT_CURSOR cursor, DISKOFF off, DBT *key,
r=brtcurs_set_position_last (cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0)
return 0;
assert(node == cursor->path[cursor->path_len-1]);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
if (r==DB_NOTFOUND) {
if (childnum>0) {
......@@ -2486,13 +2241,14 @@ static int brtcurs_set_position_first (BRT_CURSOR cursor, DISKOFF off, DBT *key,
int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) {
if (0) { died0: toku_cachetable_unpin(brt->cf, off, 1, 0); }
return r;
}
if (r!=0) return r;
BRTNODE node = node_v;
assert(cursor->path_len<CURSOR_PATHLEN_LIMIT);
cursor->path[cursor->path_len++] = node;
if (0) {
died0: toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, 0); return r;
}
if (node->height>0) {
int childnum
;
......@@ -2517,7 +2273,8 @@ static int brtcurs_set_position_first (BRT_CURSOR cursor, DISKOFF off, DBT *key,
r=brtcurs_set_position_first (cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0)
return r;
assert(node == cursor->path[cursor->path_len-1]);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
if (r==DB_NOTFOUND) {
if (childnum+1<node->u.n.n_children) {
......@@ -2581,7 +2338,8 @@ static int brtcurs_set_position_next2(BRT_CURSOR cursor, DBT *key, TOKUTXN txn)
r = brtcurs_set_position_first(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0)
return 0;
assert(node == cursor->path[cursor->path_len-1]);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
childnum += 1;
}
......@@ -2642,7 +2400,8 @@ static int brtcurs_set_position_prev2(BRT_CURSOR cursor, DBT *key, TOKUTXN txn)
r = brtcurs_set_position_last(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0)
return 0;
assert(node == cursor->path[cursor->path_len-1]);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
childnum -= 1;
}
......@@ -2929,76 +2688,6 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
return 0;
}
/* clear the right present flag if the key matches the pivot key */
static int brt_node_maybe_clear_right_pivot_flag(BRTNODE node, DBT *key, DBT *val, int childnum, BRT t) {
int match = 0;
if (0 <= childnum) {
if (0 == brt_compare_pivot(t, key, val, node->u.n.childkeys[childnum])) {
assert(node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_R);
node->u.n.pivotflags[childnum] &= ~BRT_PIVOT_PRESENT_R;
node->dirty = 1;
match = 1;
}
}
return match;
}
/* clear the left present flag if the key matches the pivot key */
static int brt_node_maybe_clear_left_pivot_flag(BRTNODE node, DBT *key, DBT *val, int childnum, BRT t) {
int match = 0;
if (childnum < node->u.n.n_children - 1) {
if (0 == brt_compare_pivot(t, key, val, node->u.n.childkeys[childnum])) {
assert(node->u.n.pivotflags[childnum] & BRT_PIVOT_PRESENT_L);
node->u.n.pivotflags[childnum] &= ~BRT_PIVOT_PRESENT_L;
node->dirty = 1;
match = 1;
}
}
return match;
}
/* check if any subtree of this node contains the key */
static int brt_node_any_key_present(BRTNODE node, DBT *key, DBT *val, BRT t) {
int i;
for (i=0; i<node->u.n.n_children-1; i++)
if (0 == brt_compare_pivot(t, key, val, node->u.n.childkeys[i]) && (node->u.n.pivotflags[i] & (BRT_PIVOT_PRESENT_L + BRT_PIVOT_PRESENT_R)))
return 1;
return 0;
}
/* clear the key present flags in the nodes along the cursor path */
static void brt_cursor_maybe_clear_pivot_flags(BRT_CURSOR cursor) {
int r;
DBT key, *k;
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
k = &key;
DBT val, *v;
if (cursor->brt->flags & TOKU_DB_DUPSORT) {
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
v = &val;
} else
v = 0;
r = toku_pma_cursor_get_current(cursor->pmacurs, k, v, 1); assert(r == 0);
int i;
for (i = cursor->path_len - 2; i >= 0; i -= 1) {
BRTNODE node = cursor->path[i];
int childnum = cursor->pathcnum[i];
int match;
match = brt_node_maybe_clear_left_pivot_flag(node, k, v, childnum, cursor->brt);
match += brt_node_maybe_clear_right_pivot_flag(node, k, v, childnum-1, cursor->brt);
if (!match) break;
/* if matching keys in any subtrees of this node then we are done */
if (brt_node_any_key_present(node, k, v, cursor->brt)) break;
}
toku_free(k->data);
if (v) toku_free(v->data);
}
/* delete the key and value under the cursor */
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags __attribute__((__unused__))) {
int r;
......@@ -3006,12 +2695,11 @@ int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags __attribute__((__unused_
if (cursor->path_len > 0) {
BRTNODE node = cursor->path[cursor->path_len-1];
assert(node->height == 0);
int kvsize, lastmatch;
r = toku_pma_cursor_delete_under(cursor->pmacurs, &kvsize, node->rand4fingerprint, &node->local_fingerprint, &lastmatch);
int kvsize;
r = toku_pma_cursor_delete_under(cursor->pmacurs, &kvsize, node->rand4fingerprint, &node->local_fingerprint, 0);
if (r == 0) {
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kvsize;
node->dirty = 1;
if (lastmatch) brt_cursor_maybe_clear_pivot_flags(cursor);
}
} else
r = DB_NOTFOUND;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment