Commit f9d49ff9 authored by Rich Prohaska's avatar Rich Prohaska

unified the cursor search for cursor set, get_both, set_range, and...

unified the cursor search for cursor set, get_both, set_range, and get_both_range.  still need to prune the search for some of these operations.


addresses  #186


git-svn-id: file:///svn/tokudb@1149 c7de825b-a66e-492c-adef-691d508d4ae1
parent ae4b7d10
......@@ -378,7 +378,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode, int fl
goto died1;
}
toku_pma_set_dup_mode(result->u.l.buffer, flags);
if (flags & TOKU_DB_DUPSORT) toku_pma_set_dup_compare(result->u.l.buffer, dup_compare);
toku_pma_set_dup_compare(result->u.l.buffer, dup_compare);
//printf("%s:%d r PMA= %p\n", __FILE__, __LINE__, result->u.l.buffer);
toku_verify_counts(result);
#define BRT_USE_PMA_BULK_INSERT 1
......
......@@ -328,8 +328,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
int r = toku_pma_create(&n->u.l.buffer, t->compare_fun, t->db, toku_cachefile_filenum(t->cf), n->nodesize);
assert(r==0);
toku_pma_set_dup_mode(n->u.l.buffer, t->flags & (TOKU_DB_DUP+TOKU_DB_DUPSORT));
if (t->flags & TOKU_DB_DUPSORT)
toku_pma_set_dup_compare(n->u.l.buffer, t->dup_compare);
toku_pma_set_dup_compare(n->u.l.buffer, t->dup_compare);
static int rcount=0;
//printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount);
rcount++;
......@@ -2654,69 +2653,17 @@ static int brtcurs_set_position_prev (BRT_CURSOR cursor, DBT *key, TOKUTXN txn)
return 0;
}
static int brtcurs_set_key(BRT_CURSOR cursor, DISKOFF off, DBT *key, DBT *val, int flag, TOKUTXN txn, BRTNODE parent_brtnode) {
BRT brt = cursor->brt;
void *node_v;
int r;
r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL, toku_brtnode_flush_callback,
toku_brtnode_fetch_callback, brt);
if (r != 0)
return r;
BRTNODE node = node_v;
int childnum;
node->parent_brtnode = parent_brtnode;
if (node->height > 0) {
cursor->path_len += 1;
for (;;) {
childnum = brtnode_left_child(node, key, flag == DB_GET_BOTH ? val : 0, brt);
cursor->path[cursor->path_len-1] = node;
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
int more = node->u.n.n_bytes_in_hashtable[childnum];
if (more > 0) {
brt_flush_child(cursor->brt, node, childnum, cursor, txn);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
/* the node may have split. search the node keys again */
continue;
}
break;
}
r = brtcurs_set_key(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, val, flag, txn, node);
if (r != 0)
brt_node_remove_cursor(node, childnum, cursor);
} else {
cursor->path_len += 1;
cursor->path[cursor->path_len-1] = node;
r = toku_pma_cursor(node->u.l.buffer, &cursor->pmacurs, &cursor->brt->skey, &cursor->brt->sval);
if (r == 0) {
if (flag == DB_SET)
r = toku_pma_cursor_set_key(cursor->pmacurs, key);
else if (flag == DB_GET_BOTH)
r = toku_pma_cursor_set_both(cursor->pmacurs, key, val);
else {
assert(0);
r = DB_NOTFOUND;
}
if (r != 0) {
int rr = toku_pma_cursor_free(&cursor->pmacurs);
assert(rr == 0);
}
}
}
if (r != 0) {
cursor->path_len -= 1;
//verify_local_fingerprint_nonleaf(node);
toku_cachetable_unpin(brt->cf, off, node->dirty, brtnode_size(node));
}
return r;
static int brtcurs_dupsort_next_child(BRT_CURSOR cursor, BRTNODE node, int childnum, int op) {
cursor = cursor; node = node; op = op;
return childnum + 1;
}
static int brtcurs_set_range(BRT_CURSOR cursor, DISKOFF off, DBT *key, TOKUTXN txn, BRTNODE parent_brtnode) {
static int brtcurs_nodup_next_child(BRT_CURSOR cursor, BRTNODE node, int childnum, int op) {
cursor = cursor; node = node; op = op;
return childnum + 1;
}
static int brtcurs_set_search(BRT_CURSOR cursor, DISKOFF off, int op, DBT *key, DBT *val, TOKUTXN txn, BRTNODE parent_brtnode) {
BRT brt = cursor->brt;
void *node_v;
int r;
......@@ -2731,40 +2678,54 @@ static int brtcurs_set_range(BRT_CURSOR cursor, DISKOFF off, DBT *key, TOKUTXN t
if (node->height > 0) {
cursor->path_len += 1;
/* select a subtree by key */
childnum = brtnode_which_child(node, key, brt);
next_child:
/* select the leftmost subtree that may contain the key and val */
childnum = brtnode_left_child(node, key, val, brt);
for (;;) {
cursor->path[cursor->path_len-1] = node;
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
int more = node->u.n.n_bytes_in_hashtable[childnum];
if (more > 0) {
brt_flush_child(cursor->brt, node, childnum, cursor, txn);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
continue;
/* flush the buffer for the child subtree */
for (;;) {
cursor->path[cursor->path_len-1] = node;
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
int more = node->u.n.n_bytes_in_hashtable[childnum];
if (more > 0) {
brt_flush_child(cursor->brt, node, childnum, cursor, txn);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
continue;
}
break;
}
break;
}
r = brtcurs_set_range(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn, node);
if (r != 0) {
/* search in the child subtree */
r = brtcurs_set_search(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), op, key, val, txn, node);
if (r == 0)
break;
/* not found in the child subtree, look elsewhere */
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
/* no key in the child subtree is >= key, need to search the next child */
childnum += 1;
if (0) printf("set_range %d %d\n", childnum, node->u.n.n_children);
if (childnum < node->u.n.n_children)
goto next_child;
}
if (brt->flags & TOKU_DB_DUPSORT)
childnum = brtcurs_dupsort_next_child(cursor, node, childnum, op);
else
childnum = brtcurs_nodup_next_child(cursor, node, childnum, op);
if (childnum >= node->u.n.n_children) {
r = DB_NOTFOUND;
break;
}
}
} else {
cursor->path_len += 1;
cursor->path[cursor->path_len-1] = node;
r = toku_pma_cursor(node->u.l.buffer, &cursor->pmacurs, &cursor->brt->skey, &cursor->brt->sval);
if (r == 0) {
r = toku_pma_cursor_set_range(cursor->pmacurs, key);
if (op == DB_SET || op == DB_GET_BOTH)
r = toku_pma_cursor_set_both(cursor->pmacurs, key, val);
else if (op == DB_SET_RANGE /* || op == DB_GET_BOTH_RANGE */ )
r = toku_pma_cursor_set_range_both(cursor->pmacurs, key, val);
else
assert(0);
if (r != 0) {
int rr = toku_pma_cursor_free(&cursor->pmacurs);
assert(rr == 0);
......@@ -2919,7 +2880,7 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
case DB_SET:
r = unpin_cursor(cursor);
assert(r == 0);
r = brtcurs_set_key(cursor, *rootp, kbt, vbt, DB_SET, txn, null_brtnode);
r = brtcurs_set_search(cursor, *rootp, DB_SET, kbt, 0, txn, null_brtnode);
if (r != 0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, 0, vbt, 0);
if (r != 0) goto died0;
......@@ -2927,13 +2888,13 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUT
case DB_GET_BOTH:
r = unpin_cursor(cursor);
assert(r == 0);
r = brtcurs_set_key(cursor, *rootp, kbt, vbt, DB_GET_BOTH, txn, null_brtnode);
r = brtcurs_set_search(cursor, *rootp, DB_GET_BOTH, kbt, vbt, txn, null_brtnode);
if (r != 0) goto died0;
break;
case DB_SET_RANGE:
r = unpin_cursor(cursor);
assert(r == 0);
r = brtcurs_set_range(cursor, *rootp, kbt, txn, null_brtnode);
r = brtcurs_set_search(cursor, *rootp, DB_SET_RANGE, kbt, 0, txn, null_brtnode);
if (r != 0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r != 0) goto died0;
......
......@@ -1799,7 +1799,7 @@ static void test_pma_cursor_set_key() {
for (i=0; i<n; i += 1) {
k = htonl(i);
toku_fill_dbt(&key, &k, sizeof k);
error = toku_pma_cursor_set_key(cursor, &key);
error = toku_pma_cursor_set_both(cursor, &key, 0);
if (i % 10 == 0) {
assert(error == 0);
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
......@@ -1856,7 +1856,7 @@ static void test_pma_cursor_set_range() {
for (i=0; i<100; i += 1) {
k = htonl(i);
toku_fill_dbt(&key, &k, sizeof k);
error = toku_pma_cursor_set_range(cursor, &key);
error = toku_pma_cursor_set_range_both(cursor, &key, 0);
if (error != 0) {
assert(error == DB_NOTFOUND);
assert(i > largest_key);
......@@ -2039,6 +2039,7 @@ static void test_pma_cursor_set_both() {
error = toku_pma_create(&pma, toku_default_compare_fun, null_db, null_filenum, n * (8 + sizeof (int) + sizeof (int)));
assert(error == 0);
error = toku_pma_set_dup_compare(pma, toku_default_compare_fun); assert(error == 0);
PMA_CURSOR cursor;
error = toku_pma_cursor(pma, &cursor, &skey, &sval);
......@@ -2184,7 +2185,7 @@ static void test_dup_key_insert(int n) {
k = htonl(2);
toku_fill_dbt(&key, &k, sizeof k);
r = toku_pma_cursor_set_key(cursor, &key);
r = toku_pma_cursor_set_both(cursor, &key, 0);
if (r != 0) {
assert(n == 0);
} else {
......@@ -2376,7 +2377,7 @@ static void test_dupsort_key_insert(int n, int dup_data) {
assert(r == 0);
toku_fill_dbt(&key, &k, sizeof k);
r = toku_pma_cursor_set_key(cursor, &key);
r = toku_pma_cursor_set_both(cursor, &key, 0);
if (r != 0) {
assert(n == 0);
} else {
......
This diff is collapsed.
......@@ -107,14 +107,11 @@ int toku_pma_cursor_set_position_prev (PMA_CURSOR c);
it has been deleted previously */
int toku_pma_cursor_get_current(PMA_CURSOR c, DBT *key, DBT *val, int even_deleted);
/* set the cursor to the matching key and value pair */
/* move the cursor to the kv pair matching the key and the val if nonzero*/
int toku_pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val);
/* move the cursor to the kv pair matching the key */
int toku_pma_cursor_set_key(PMA_CURSOR c, DBT *key);
/* set the cursor to the smallest key in the pma >= key */
int toku_pma_cursor_set_range(PMA_CURSOR c, DBT *key);
/* set the cursor to the smallest key in the pma >= key and the val if nonzero*/
int toku_pma_cursor_set_range_both(PMA_CURSOR c, DBT *key, DBT *val);
/* delete the key value pair under the cursor, return the size of the pair */
int toku_pma_cursor_delete_under(PMA_CURSOR /*c*/, int */*kvsize*/, u_int32_t /*rand4sem*/, u_int32_t */*fingerprint*/, int */*lastkeymatch*/);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment