Commit e311a3d5 authored by Rich Prohaska's avatar Rich Prohaska

handle cursor DB_SET into the right subtree. closes #186

git-svn-id: file:///svn/tokudb@1187 c7de825b-a66e-492c-adef-691d508d4ae1
parent b30202c6
...@@ -165,15 +165,15 @@ void toku_brtnode_free (BRTNODE *node); ...@@ -165,15 +165,15 @@ void toku_brtnode_free (BRTNODE *node);
#endif #endif
#define CURSOR_PATHLEN_LIMIT 256 #define CURSOR_PATHLEN_LIMIT 32
struct brt_cursor { struct brt_cursor {
BRT brt; BRT brt;
int path_len; /* -1 if the cursor points nowhere. */ int path_len; /* -1 if the cursor points nowhere. */
BRTNODE path[CURSOR_PATHLEN_LIMIT]; /* Include the leaf (last). These are all pinned. */ BRTNODE path[CURSOR_PATHLEN_LIMIT]; /* Include the leaf (last). These are all pinned. */
int pathcnum[CURSOR_PATHLEN_LIMIT]; /* which child did we descend to from here? */ int pathcnum[CURSOR_PATHLEN_LIMIT]; /* which child did we descend to from here? */
PMA_CURSOR pmacurs; /* The cursor into the leaf. NULL if the cursor doesn't exist. */ PMA_CURSOR pmacurs; /* The cursor into the leaf. NULL if the cursor doesn't exist. */
BRT_CURSOR prev,next; BRT_CURSOR prev,next;
int op; int op; DBT *key; DBT *val; /* needed when flushing buffers */
}; };
/* print the cursor path */ /* print the cursor path */
...@@ -200,7 +200,7 @@ void toku_brt_cursor_new_root(BRT_CURSOR cursor, BRT t, BRTNODE newroot, BRTNODE ...@@ -200,7 +200,7 @@ void toku_brt_cursor_new_root(BRT_CURSOR cursor, BRT t, BRTNODE newroot, BRTNODE
void toku_brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right); void toku_brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
/* a brt internal node has expanded. modify this cursor if it includes the old node in its path. */ /* a brt internal node has expanded. modify this cursor if it includes the old node in its path. */
void toku_brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right); void toku_brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk);
/* a brt internal node has split. modify this cursor if it includes the old node in its path. */ /* a brt internal node has split. modify this cursor if it includes the old node in its path. */
void toku_brt_cursor_nonleaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right); void toku_brt_cursor_nonleaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
......
...@@ -76,7 +76,7 @@ static long brtnode_size(BRTNODE node) { ...@@ -76,7 +76,7 @@ static long brtnode_size(BRTNODE node) {
static void brt_update_cursors_new_root(BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right); static void brt_update_cursors_new_root(BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right);
static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right); static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right); static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk);
static void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right); static void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
static void fix_up_parent_pointers_of_children (BRT t, BRTNODE node) { static void fix_up_parent_pointers_of_children (BRT t, BRTNODE node) {
...@@ -697,7 +697,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -697,7 +697,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.totalchildkeylens += childsplitk->size; node->u.n.totalchildkeylens += childsplitk->size;
node->u.n.n_children++; node->u.n.n_children++;
brt_update_cursors_nonleaf_expand(t, node, childnum, childa, childb); brt_update_cursors_nonleaf_expand(t, node, childnum, childa, childb, node->u.n.childkeys[childnum]);
if (toku_brt_debug_mode) { if (toku_brt_debug_mode) {
int i; int i;
...@@ -2182,14 +2182,14 @@ static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE left, ...@@ -2182,14 +2182,14 @@ static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE left,
} }
} }
static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE node, int childnum, BRTNODE left, BRTNODE right) { static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE node, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk) {
BRT_CURSOR cursor; BRT_CURSOR cursor;
if (brt_update_debug) printf("brt_update_cursors_nonleaf_expand %lld h=%d c=%d nc=%d %lld %lld\n", node->thisnodename, node->height, childnum, if (brt_update_debug) printf("brt_update_cursors_nonleaf_expand %lld h=%d c=%d nc=%d %lld %lld\n", node->thisnodename, node->height, childnum,
node->u.n.n_children, left->thisnodename, right->thisnodename); node->u.n.n_children, left->thisnodename, right->thisnodename);
for (cursor = t->cursors_head; cursor; cursor = cursor->next) { for (cursor = t->cursors_head; cursor; cursor = cursor->next) {
if (toku_brt_cursor_active(cursor)) { if (toku_brt_cursor_active(cursor)) {
toku_brt_cursor_nonleaf_expand(cursor, t, node, childnum, left, right); toku_brt_cursor_nonleaf_expand(cursor, t, node, childnum, left, right, splitk);
} }
} }
} }
...@@ -2266,7 +2266,7 @@ void toku_brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNO ...@@ -2266,7 +2266,7 @@ void toku_brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNO
} }
} }
void toku_brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t __attribute__((unused)), BRTNODE node, int childnum, BRTNODE left, BRTNODE right) { void toku_brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t, BRTNODE node, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk) {
int i; int i;
int oldchildnum, newchildnum; int oldchildnum, newchildnum;
...@@ -2289,8 +2289,13 @@ void toku_brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t __attribute__((unus ...@@ -2289,8 +2289,13 @@ void toku_brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t __attribute__((unus
cursor->pathcnum[i] += 1; cursor->pathcnum[i] += 1;
return; return;
} }
if (i == cursor->path_len-1 && (cursor->op == DB_PREV || cursor->op == DB_LAST)) { /* explain this, batman */ if (i == cursor->path_len-1) { /* cursor is being constructed */
goto setnewchild; if (cursor->op == DB_PREV || cursor->op == DB_LAST) /* go to the right subtree */
goto setnewchild;
if (cursor->op == DB_SET || cursor->op == DB_SET_RANGE || cursor->op == DB_GET_BOTH || cursor->op == DB_GET_BOTH_RANGE) {
if (brt_compare_pivot(t, cursor->key, cursor->val, splitk) > 0)
goto setnewchild;
}
} }
if (i+1 < cursor->path_len) { /* the cursor path traversed the old child so update it if it traverses the right child */ if (i+1 < cursor->path_len) { /* the cursor path traversed the old child so update it if it traverses the right child */
assert(cursor->path[i+1] == left || cursor->path[i+1] == right); assert(cursor->path[i+1] == left || cursor->path[i+1] == right);
...@@ -2391,9 +2396,7 @@ int toku_brt_cursor_close (BRT_CURSOR curs) { ...@@ -2391,9 +2396,7 @@ int toku_brt_cursor_close (BRT_CURSOR curs) {
return r; return r;
} }
/* /* Print the path of a cursor */
* Print the path of a cursor
*/
void toku_brt_cursor_print(BRT_CURSOR cursor) { void toku_brt_cursor_print(BRT_CURSOR cursor) {
int i; int i;
...@@ -2654,12 +2657,14 @@ static int brtcurs_set_position_prev (BRT_CURSOR cursor, DBT *key, TOKUTXN txn) ...@@ -2654,12 +2657,14 @@ static int brtcurs_set_position_prev (BRT_CURSOR cursor, DBT *key, TOKUTXN txn)
} }
static int brtcurs_dupsort_next_child(BRT_CURSOR cursor, BRTNODE node, int childnum, int op) { static int brtcurs_dupsort_next_child(BRT_CURSOR cursor, BRTNODE node, int childnum, int op) {
cursor = cursor; node = node; op = op; cursor = cursor;
if (op == DB_GET_BOTH) return node->u.n.n_children; /* no more */
return childnum + 1; return childnum + 1;
} }
static int brtcurs_nodup_next_child(BRT_CURSOR cursor, BRTNODE node, int childnum, int op) { static int brtcurs_nodup_next_child(BRT_CURSOR cursor, BRTNODE node, int childnum, int op) {
cursor = cursor; node = node; op = op; cursor = cursor;
if (op == DB_SET || op == DB_GET_BOTH) return node->u.n.n_children; /* no more */
return childnum + 1; return childnum + 1;
} }
...@@ -2688,6 +2693,7 @@ static int brtcurs_set_search(BRT_CURSOR cursor, DISKOFF off, int op, DBT *key, ...@@ -2688,6 +2693,7 @@ static int brtcurs_set_search(BRT_CURSOR cursor, DISKOFF off, int op, DBT *key,
brt_node_add_cursor(node, childnum, cursor); brt_node_add_cursor(node, childnum, cursor);
int more = node->u.n.n_bytes_in_hashtable[childnum]; int more = node->u.n.n_bytes_in_hashtable[childnum];
if (more > 0) { if (more > 0) {
cursor->key = key; cursor->val = val;
brt_flush_child(cursor->brt, node, childnum, cursor, txn); brt_flush_child(cursor->brt, node, childnum, cursor, txn);
node = cursor->path[cursor->path_len-1]; node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1]; childnum = cursor->pathcnum[cursor->path_len-1];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment