Commit 57bb3437 authored by Rich Prohaska's avatar Rich Prohaska

first cut or new cursors merge to trunk. addresses #250

git-svn-id: file:///svn/tokudb@1881 c7de825b-a66e-492c-adef-691d508d4ae1
parent 637ac1a3
......@@ -8,6 +8,7 @@
#include "pma.h"
#include "brt.h"
#include "crc.h"
#include "list.h"
#ifndef BRT_FANOUT
#define BRT_FANOUT 16
......@@ -130,7 +131,7 @@ struct brt {
// The header is shared. It is also ephemeral.
struct brt_header *h;
BRT_CURSOR cursors_head, cursors_tail;
struct list cursors;
unsigned int nodesize;
unsigned int flags;
......@@ -162,47 +163,7 @@ void toku_brtnode_free (BRTNODE *node);
#define DEADBEEF ((void*)0xDEADBEEFDEADBEEF)
#endif
#define CURSOR_PATHLEN_LIMIT 32
struct brt_cursor {
BRT brt;
int path_len; /* -1 if the cursor points nowhere. */
BRTNODE path[CURSOR_PATHLEN_LIMIT]; /* Include the leaf (last). These are all pinned. */
int pathcnum[CURSOR_PATHLEN_LIMIT]; /* which child did we descend to from here? */
PMA_CURSOR pmacurs; /* The cursor into the leaf. NULL if the cursor doesn't exist. */
BRT_CURSOR prev,next;
int op; DBT *key; DBT *val; /* needed when flushing buffers */
};
/* print the cursor path */
void toku_brt_cursor_print(BRT_CURSOR cursor);
/* is the cursor path empty? */
static inline int toku_brt_cursor_path_empty(BRT_CURSOR cursor) {
return cursor->path_len == 0;
}
/*is the cursor path full? */
static inline int toku_brt_cursor_path_full(BRT_CURSOR cursor) {
return cursor->path_len == CURSOR_PATHLEN_LIMIT;
}
static inline int toku_brt_cursor_active(BRT_CURSOR cursor) {
return cursor->path_len > 0;
}
/* brt has a new root. add the root to this cursor. */
void toku_brt_cursor_new_root(BRT_CURSOR cursor, BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right);
/* a brt leaf has split. modify this cursor if it includes the old node in its path. */
void toku_brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE newright);
/* a brt internal node has expanded. modify this cursor if it includes the old node in its path. */
void toku_brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk);
/* a brt internal node has split. modify this cursor if it includes the old node in its path. */
void toku_brt_cursor_nonleaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
/* tree command types */
enum brt_cmd_type {
BRT_NONE = 0,
BRT_INSERT = 1,
......@@ -210,6 +171,7 @@ enum brt_cmd_type {
BRT_DELETE_BOTH = 3,
};
/* tree commands */
struct brt_cmd {
enum brt_cmd_type type;
union {
......@@ -245,4 +207,12 @@ extern u_int32_t toku_calccrc32_cmdstruct (BRT_CMD *cmd);
unsigned int toku_brt_pivot_key_len (BRT, struct kv_pair *); // Given the tree
unsigned int toku_brtnode_pivot_key_len (BRTNODE, struct kv_pair *); // Given the node
/* a brt cursor is represented as a kv pair in a tree */
struct brt_cursor {
struct list cursors_link;
BRT brt;
DBT key;
DBT val;
};
#endif
#ifndef BRT_SEARCH_H
#define BRT_SEARCH_H
enum {
BRT_SEARCH_LEFT = 1, /* search left -> right, finds min xy as defined by the compare function */
BRT_SEARCH_RIGHT = 2, /* search right -> left, finds max xy as defined by the compare function */
BRT_SEARCH_ONE = 4, /* look into only one subtree, used for point queries */
};
struct brt_search;
/* the search compare function should return 0 for all xy < kv and 1 for all xy >= kv
the compare function has liberty in implementing the semantics, but the result should
be a ramp */
typedef int (*brt_search_compare_func_t)(struct brt_search */*so*/, DBT */*x*/, DBT */*y*/);
/* the search object contains the compare function, search direction, and the kv pair that
is used in the compare function. the context is the user's private data */
typedef struct brt_search {
brt_search_compare_func_t compare;
int direction;
DBT *k;
DBT *v;
void *context;
} brt_search_t;
/* initialize the search compare object */
static inline brt_search_t *brt_search_init(brt_search_t *so, brt_search_compare_func_t compare, int direction, DBT *k, DBT *v, void *context) {
so->compare = compare; so->direction = direction; so->k = k; so->v = v; so->context = context;
return so;
}
#endif
......@@ -2290,13 +2290,470 @@ static void test_brt_delete() {
test_insert_delete_lookup(512); toku_memory_check_all_free();
}
static void test_new_brt_cursor_create_close() {
int r;
BRT brt;
int n = 8;
BRT_CURSOR cursors[n];
r = toku_brt_create(&brt); assert(r == 0);
int i;
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i]); assert(r == 0);
}
for (i=0; i<n; i++) {
r = toku_brt_cursor_close(cursors[i]); assert(r == 0);
}
r = toku_close_brt(brt); assert(r == 0);
}
static void test_new_brt_cursor_first(int n, int dup_mode) {
if (verbose) printf("test_brt_cursor_first:%d\n", n);
BRT t;
int r;
CACHETABLE ct;
char fname[]="testbrt.brt";
int i;
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
unlink(fname);
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
DBT key, val;
int k, v;
for (i=0; i<n; i++) {
k = htonl(i); v = htonl(i);
r = toku_brt_insert(t, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
for (i=0; ; i++) {
r = toku_brt_cursor_get(cursor, &key, &val, DB_FIRST, null_txn);
if (r != 0) break;
int kk;
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
assert(kk == (int) htonl(i));
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (int) htonl(i));
r = toku_brt_cursor_delete(cursor, 0); assert(r == 0);
}
assert(i == n);
if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data);
r = toku_brt_cursor_close(cursor); assert(r == 0);
r = toku_close_brt(t); assert(r==0);
r = toku_cachetable_close(&ct);assert(r==0);
}
static void test_new_brt_cursor_last(int n, int dup_mode) {
if (verbose) printf("test_brt_cursor_last:%d\n", n);
BRT t;
int r;
CACHETABLE ct;
char fname[]="testbrt.brt";
int i;
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
unlink(fname);
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
DBT key, val;
int k, v;
for (i=0; i<n; i++) {
k = htonl(i); v = htonl(i);
r = toku_brt_insert(t, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
for (i=n-1; ; i--) {
r = toku_brt_cursor_get(cursor, &key, &val, DB_LAST, null_txn);
if (r != 0) break;
int kk;
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
assert(kk == (int) htonl(i));
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (int) htonl(i));
r = toku_brt_cursor_delete(cursor, 0); assert(r == 0);
}
assert(i == -1);
if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data);
r = toku_brt_cursor_close(cursor); assert(r == 0);
r = toku_close_brt(t); assert(r==0);
r = toku_cachetable_close(&ct);assert(r==0);
}
static void test_new_brt_cursor_next(int n, int dup_mode) {
if (verbose) printf("test_brt_cursor_next:%d\n", n);
BRT t;
int r;
CACHETABLE ct;
char fname[]="testbrt.brt";
int i;
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
unlink(fname);
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
DBT key, val;
int k, v;
for (i=0; i<n; i++) {
k = htonl(i); v = htonl(i);
r = toku_brt_insert(t, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
for (i=0; ; i++) {
r = toku_brt_cursor_get(cursor, &key, &val, DB_NEXT, null_txn);
if (r != 0) break;
int kk;
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
assert(kk == (int) htonl(i));
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (int) htonl(i));
}
assert(i == n);
if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data);
r = toku_brt_cursor_close(cursor); assert(r == 0);
r = toku_close_brt(t); assert(r==0);
r = toku_cachetable_close(&ct);assert(r==0);
}
static void test_new_brt_cursor_prev(int n, int dup_mode) {
if (verbose) printf("test_brt_cursor_prev:%d\n", n);
BRT t;
int r;
CACHETABLE ct;
char fname[]="testbrt.brt";
int i;
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
unlink(fname);
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
DBT key, val;
int k, v;
for (i=0; i<n; i++) {
k = htonl(i); v = htonl(i);
r = toku_brt_insert(t, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
for (i=n-1; ; i--) {
r = toku_brt_cursor_get(cursor, &key, &val, DB_PREV, null_txn);
if (r != 0) break;
int kk;
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
assert(kk == (int) htonl(i));
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (int) htonl(i));
}
assert(i == -1);
if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data);
r = toku_brt_cursor_close(cursor); assert(r == 0);
r = toku_close_brt(t); assert(r==0);
r = toku_cachetable_close(&ct);assert(r==0);
}
static void test_new_brt_cursor_current(int n, int dup_mode) {
if (verbose) printf("test_brt_cursor_current:%d\n", n);
BRT t;
int r;
CACHETABLE ct;
char fname[]="testbrt.brt";
int i;
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
unlink(fname);
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_flags(t, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(t, 4096); assert(r == 0);
r = toku_brt_open(t, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
DBT key, val;
int k, v;
for (i=0; i<n; i++) {
k = htonl(i); v = htonl(i);
r = toku_brt_insert(t, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
for (i=0; ; i++) {
r = toku_brt_cursor_get(cursor, &key, &val, DB_FIRST, null_txn);
if (r != 0) break;
int kk;
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
assert(kk == (int) htonl(i));
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (int) htonl(i));
r = toku_brt_cursor_get(cursor, &key, &val, DB_CURRENT, null_txn); assert(r == 0);
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
assert(kk == (int) htonl(i));
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (int) htonl(i));
r = toku_brt_cursor_get(cursor, &key, &val, DB_CURRENT+256, null_txn); assert(r == 0);
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
assert(kk == (int) htonl(i));
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (int) htonl(i));
r = toku_brt_cursor_delete(cursor, 0); assert(r == 0);
r = toku_brt_cursor_get(cursor, &key, &val, DB_CURRENT, null_txn); assert(r == DB_KEYEMPTY);
r = toku_brt_cursor_get(cursor, &key, &val, DB_CURRENT+256, null_txn); assert(r == 0);
assert(key.size == sizeof kk);
memcpy(&kk, key.data, key.size);
assert(kk == (int) htonl(i));
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (int) htonl(i));
}
assert(i == n);
if (key.data) toku_free(key.data);
if (val.data) toku_free(val.data);
r = toku_brt_cursor_close(cursor); assert(r == 0);
r = toku_close_brt(t); assert(r==0);
r = toku_cachetable_close(&ct);assert(r==0);
}
static void test_new_brt_cursor_set_range(int n, int dup_mode) {
if (verbose) printf("test_brt_cursor_set_range:%d %d\n", n, dup_mode);
int r;
char fname[]="testbrt.brt";
CACHETABLE ct;
BRT brt;
BRT_CURSOR cursor;
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
unlink(fname);
r = toku_brt_create(&brt); assert(r == 0);
r = toku_brt_set_flags(brt, dup_mode); assert(r == 0);
r = toku_brt_set_nodesize(brt, 4096); assert(r == 0);
r = toku_brt_open(brt, fname, fname, 0, 1, 1, 0, ct, null_txn, 0); assert(r==0);
int i;
DBT key, val;
int k, v;
/* insert keys 0, 10, 20 .. 10*(n-1) */
int max_key = 10*(n-1);
for (i=0; i<n; i++) {
k = htonl(10*i);
v = 10*i;
r = toku_brt_insert(brt, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor); assert(r==0);
/* pick random keys v in 0 <= v < 10*n, the cursor should point
to the smallest key in the tree that is >= v */
for (i=0; i<n; i++) {
int vv;
v = random() % (10*n);
k = htonl(v);
toku_fill_dbt(&key, &k, sizeof k);
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = toku_brt_cursor_get(cursor, &key, &val, DB_SET_RANGE, null_txn);
if (v > max_key)
/* there is no smallest key if v > the max key */
assert(r == DB_NOTFOUND);
else {
assert(r == 0);
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == (((v+9)/10)*10));
toku_free(val.data);
}
}
r = toku_brt_cursor_close(cursor); assert(r==0);
r = toku_close_brt(brt); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
static void test_new_brt_cursor_set(int n, int cursor_op, DB *db) {
if (verbose) printf("test_brt_cursor_set:%d %d %p\n", n, cursor_op, db);
int r;
char fname[]="testbrt.brt";
CACHETABLE ct;
BRT brt;
BRT_CURSOR cursor;
unlink(fname);
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
r = toku_open_brt(fname, 0, 1, &brt, 1<<12, ct, null_txn, test_brt_cursor_keycompare, db); assert(r==0);
int i;
DBT key, val;
int k, v;
/* insert keys 0, 10, 20 .. 10*(n-1) */
for (i=0; i<n; i++) {
k = htonl(10*i);
v = 10*i;
r = toku_brt_insert(brt, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor); assert(r==0);
/* set cursor to random keys in set { 0, 10, 20, .. 10*(n-1) } */
for (i=0; i<n; i++) {
int vv;
v = 10*(random() % n);
k = htonl(v);
toku_fill_dbt(&key, &k, sizeof k);
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = toku_brt_cursor_get(cursor, &key, &val, cursor_op, null_txn);
assert(r == 0);
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == v);
toku_free(val.data);
if (cursor_op == DB_SET) assert(key.data == &k);
}
/* try to set cursor to keys not in the tree, all should fail */
for (i=0; i<10*n; i++) {
if (i % 10 == 0)
continue;
k = htonl(i);
toku_fill_dbt(&key, &k, sizeof k);
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
r = toku_brt_cursor_get(cursor, &key, &val, DB_SET, null_txn);
assert(r == DB_NOTFOUND);
assert(key.data == &k);
}
r = toku_brt_cursor_close(cursor); assert(r==0);
r = toku_close_brt(brt); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
static void test_new_brt_cursors(int dup_mode) {
test_new_brt_cursor_create_close(dup_mode); toku_memory_check_all_free();
test_new_brt_cursor_first(8, dup_mode); toku_memory_check_all_free();
test_new_brt_cursor_last(8, dup_mode); toku_memory_check_all_free();
test_new_brt_cursor_last(512, dup_mode); toku_memory_check_all_free();
test_new_brt_cursor_next(8, dup_mode); toku_memory_check_all_free();
test_new_brt_cursor_prev(8, dup_mode); toku_memory_check_all_free();
test_new_brt_cursor_current(8, dup_mode); toku_memory_check_all_free();
test_new_brt_cursor_next(512, dup_mode); toku_memory_check_all_free();
test_new_brt_cursor_set_range(512, dup_mode); toku_memory_check_all_free();
test_new_brt_cursor_set(512, DB_SET, 0); toku_memory_check_all_free();
};
static void brt_blackbox_test (void) {
toku_memory_check = 1;
test_brt_delete_both(512); toku_memory_check_all_free();
test_wrongendian_compare(0, 2); toku_memory_check_all_free();
test_wrongendian_compare(1, 2); toku_memory_check_all_free();
test_wrongendian_compare(1, 257); toku_memory_check_all_free();
test_wrongendian_compare(1, 1000); toku_memory_check_all_free();
test_new_brt_cursors(0);
test_new_brt_cursors(TOKU_DB_DUP+TOKU_DB_DUPSORT);
test_brt_delete_both(512); toku_memory_check_all_free();
test_read_what_was_written(); toku_memory_check_all_free(); if (verbose) printf("did read_what_was_written\n");
test_cursor_next(); toku_memory_check_all_free();
test_multiple_dbs_many(); toku_memory_check_all_free();
......
......@@ -74,12 +74,6 @@ static long brtnode_size(BRTNODE node) {
return size;
}
static void brt_update_cursors_new_root(BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right);
static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE newnode);
static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk);
static void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
static void toku_update_brtnode_lsn(BRTNODE node, TOKUTXN txn) {
if (txn) {
node->log_lsn = toku_txn_get_last_lsn(txn);
......@@ -372,7 +366,6 @@ static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRT
assert(node->height>0 || node->u.l.buffer!=0);
/* Remove it from the cache table, and free its storage. */
//printf("%s:%d old pma = %p\n", __FILE__, __LINE__, node->u.l.buffer);
brt_update_cursors_leaf_split(t, node, B);
*nodea = node;
*nodeb = B;
......@@ -471,7 +464,6 @@ static void brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nod
/* Remove it from the cache table, and free its storage. */
//printf("%s:%d removing %lld\n", __FILE__, __LINE__, node->thisnodename);
brt_update_cursors_nonleaf_split(t, node, A, B);
delete_node(t, node);
assert(toku_serialize_brtnode_size(A)<A->nodesize);
assert(toku_serialize_brtnode_size(B)<B->nodesize);
......@@ -647,8 +639,6 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
node->u.n.totalchildkeylens += childsplitk->size;
node->u.n.n_children++;
brt_update_cursors_nonleaf_expand(t, node, childnum, childa, childb, node->u.n.childkeys[childnum]);
if (toku_brt_debug_mode) {
int i;
printf("%s:%d splitkeys:", __FILE__, __LINE__);
......@@ -1284,6 +1274,7 @@ int toku_brt_create(BRT *brt_ptr) {
if (brt == 0)
return ENOMEM;
memset(brt, 0, sizeof *brt);
list_init(&brt->cursors);
brt->flags = 0;
brt->nodesize = BRT_DEFAULT_NODE_SIZE;
brt->compare_fun = toku_default_compare_fun;
......@@ -1543,8 +1534,8 @@ int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *ne
int toku_close_brt (BRT brt) {
int r;
while (brt->cursors_head) {
BRT_CURSOR c = brt->cursors_head;
while (!list_empty(&brt->cursors)) {
BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link);
r=toku_brt_cursor_close(c);
if (r!=0) return r;
}
......@@ -1634,7 +1625,6 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
//printf("%s:%d put %lld\n", __FILE__, __LINE__, brt->root);
toku_cachetable_put(brt->cf, newroot_diskoff, newroot, brtnode_size(newroot),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
brt_update_cursors_new_root(brt, newroot, nodea, nodeb);
*newrootp = newroot;
return 0;
}
......@@ -1847,836 +1837,488 @@ int show_brt_blocknumbers (BRT brt) {
}
#endif
static int brt_flush_debug = 0;
/*
* Flush the buffer for a child of a node.
* If the node split when pushing kvpairs to a child of the node
* then reflect the node split up the cursor path towards the tree root.
* If the root is reached then create a new root
*/
static void brt_flush_child(BRT t, BRTNODE node, int childnum, BRT_CURSOR cursor, TOKUTXN txn) {
int r;
int child_did_split;
BRTNODE childa, childb;
DBT child_splitk;
if (brt_flush_debug) {
printf("brt_flush_child %lld %d\n", node->thisnodename, childnum);
toku_brt_cursor_print(cursor);
}
int toku_brt_dbt_set_key(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) {
int r = toku_dbt_set_value(ybt, val, vallen, &brt->skey);
return r;
}
toku_init_dbt(&child_splitk);
r = push_some_brt_cmds_down(t, node, childnum,
&child_did_split, &childa, &childb, &child_splitk, brt_flush_debug, txn);
assert(r == 0);
if (brt_flush_debug) {
printf("brt_flush_child done %lld %d\n", node->thisnodename, childnum);
toku_brt_cursor_print(cursor);
}
if (child_did_split) {
int i;
int toku_brt_dbt_set_value(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) {
int r = toku_dbt_set_value(ybt, val, vallen, &brt->sval);
return r;
}
for (i=cursor->path_len-1; i >= 0; i--) {
if (cursor->path[i] == childa || cursor->path[i] == childb)
break;
}
assert(i == cursor->path_len-1);
while (child_did_split) {
child_did_split = 0;
typedef struct brt_split {
int did_split;
BRTNODE nodea;
BRTNODE nodeb;
DBT splitk;
} BRT_SPLIT;
if (0) printf("child_did_split %lld %lld\n", childa->thisnodename, childb->thisnodename);
if (i == 0) {
CACHEKEY *rootp = toku_calculate_root_offset_pointer(t);
BRTNODE newnode;
r = brt_init_new_root(t, childa, childb, child_splitk, rootp, txn, &newnode);
assert(r == 0);
r = unpin_brtnode(t, newnode);
assert(r == 0);
} else {
BRTNODE upnode;
assert(i > 0);
i = i-1;
upnode = cursor->path[i];
childnum = cursor->pathcnum[i];
r = handle_split_of_child(t, upnode, childnum,
childa, childb, &child_splitk,
&child_did_split, &childa, &childb, &child_splitk,
txn);
assert(r == 0);
}
}
}
static inline void brt_split_init(BRT_SPLIT *split) {
split->did_split = 0;
split->nodea = split->nodeb = 0;
toku_init_dbt(&split->splitk);
}
/*
* Add a cursor to child of a node. Increment the cursor count on the child. Flush the buffer associated with the child.
*/
static void brt_node_add_cursor(BRTNODE node, int childnum, BRT_CURSOR cursor) {
if (node->height > 0) {
if (0) printf("brt_node_add_cursor %lld %d %p\n", node->thisnodename, childnum, cursor);
node->u.n.n_cursors[childnum] += 1;
}
}
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split);
/*
* Remove a cursor from the child of a node. Decrement the cursor count on the child.
*/
static void brt_node_remove_cursor(BRTNODE node, int childnum, BRT_CURSOR cursor __attribute__((unused))) {
if (node->height > 0) {
if (0) printf("brt_node_remove_cursor %lld %d %p\n", node->thisnodename, childnum, cursor);
assert(node->u.n.n_cursors[childnum] > 0);
node->u.n.n_cursors[childnum] -= 1;
/* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split) {
int r, rr;
/* if the child's buffer is not empty then try to empty it */
if (node->u.n.n_bytes_in_buffer[childnum] > 0) {
rr = push_some_brt_cmds_down(brt, node, childnum, &split->did_split, &split->nodea, &split->nodeb, &split->splitk, 0, 0);
assert(rr == 0);
/* push down may cause a child split, so childnum may not be appropriate, and the node itself may split, so retry */
return EAGAIN;
}
}
static int brt_update_debug = 0;
void *node_v;
rr = toku_cachetable_get_and_pin(brt->cf, node->u.n.children[childnum], &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(rr == 0);
void brt_update_cursors_new_root(BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right) {
BRT_CURSOR cursor;
for (;;) {
BRTNODE childnode = node_v;
BRT_SPLIT childsplit; brt_split_init(&childsplit);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit);
if (brt_update_debug) printf("brt_update_cursors_new_root %lld %lld %lld\n", newroot->thisnodename,
left->thisnodename, right->thisnodename);
for (cursor = t->cursors_head; cursor; cursor = cursor->next) {
if (toku_brt_cursor_active(cursor)) {
toku_brt_cursor_new_root(cursor, t, newroot, left, right);
if (childsplit.did_split) {
rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk,
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, 0);
assert(rr == 0);
break;
} else {
if (r == EAGAIN)
continue;
rr = toku_cachetable_unpin(brt->cf, childnode->thisnodename, childnode->dirty, brtnode_size(childnode));
assert(rr == 0);
break;
}
}
return r;
}
static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE newnode) {
BRT_CURSOR cursor;
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split) {
int r = DB_NOTFOUND;
int c;
if (brt_update_debug) printf("brt_update_cursors_leaf_split %lld %lld\n", oldnode->thisnodename, newnode->thisnodename);
for (cursor = t->cursors_head; cursor; cursor = cursor->next) {
if (toku_brt_cursor_active(cursor)) {
toku_brt_cursor_leaf_split(cursor, t, oldnode, newnode);
}
}
}
/* binary search is overkill for a small array */
int child[node->u.n.n_children];
static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE node, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk) {
BRT_CURSOR cursor;
/* scan left to right or right to left depending on the search direction */
for (c = 0; c < node->u.n.n_children; c++)
child[c] = search->direction & BRT_SEARCH_LEFT ? c : node->u.n.n_children - 1 - c;
if (brt_update_debug) printf("brt_update_cursors_nonleaf_expand %lld h=%d c=%d nc=%d %lld %lld\n", node->thisnodename, node->height, childnum,
node->u.n.n_children, left->thisnodename, right->thisnodename);
for (cursor = t->cursors_head; cursor; cursor = cursor->next) {
if (toku_brt_cursor_active(cursor)) {
toku_brt_cursor_nonleaf_expand(cursor, t, node, childnum, left, right, splitk);
for (c = 0; c < node->u.n.n_children-1; c++) {
int p = search->direction & BRT_SEARCH_LEFT ? child[c] : child[c] - 1;
struct kv_pair *pivot = node->u.n.childkeys[p];
DBT pivotkey, pivotval;
if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
r = brt_search_child(brt, node, child[c], search, newkey, newval, split);
if (r == 0 || r == EAGAIN)
break;
}
}
}
static void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right) {
BRT_CURSOR cursor;
/* check the first (left) or last (right) node if nothing has been found */
if (r == DB_NOTFOUND && c == node->u.n.n_children-1)
r = brt_search_child(brt, node, child[c], search, newkey, newval, split);
if (brt_update_debug) printf("brt_update_cursors_nonleaf_split %lld %lld %lld\n", oldnode->thisnodename,
left->thisnodename, right->thisnodename);
for (cursor = t->cursors_head; cursor; cursor = cursor->next) {
if (toku_brt_cursor_active(cursor)) {
toku_brt_cursor_nonleaf_split(cursor, t, oldnode, left, right);
}
}
return r;
}
void toku_brt_cursor_new_root(BRT_CURSOR cursor, BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right) {
int i;
int childnum;
int r;
void *v;
assert(!toku_brt_cursor_path_full(cursor));
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split) {
brt = brt; split = split;
PMA pma = node->u.l.buffer;
int r = toku_pma_search(pma, search, newkey, newval);
return r;
}
if (0) printf("toku_brt_cursor_new_root %p %lld newroot %lld\n", cursor, cursor->path[0]->thisnodename, newroot->thisnodename);
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split) {
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split);
else
return brt_search_leaf_node(brt, node, search, newkey, newval, split);
}
assert(cursor->path[0] == left || cursor->path[0] == right);
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval) {
int r, rr;
/* make room for the newroot at the path base */
for (i=cursor->path_len; i>0; i--) {
cursor->path[i] = cursor->path[i-1];
cursor->pathcnum[i] = cursor->pathcnum[i-1];
}
cursor->path_len++;
rr = toku_read_and_pin_brt_header(brt->cf, &brt->h);
assert(rr == 0);
/* shift the newroot */
cursor->path[0] = newroot;
childnum = cursor->path[1] == left ? 0 : 1;
cursor->pathcnum[0] = childnum;
r = toku_cachetable_maybe_get_and_pin(t->cf, newroot->thisnodename, &v);
assert(r == 0 && v == newroot);
brt_node_add_cursor(newroot, childnum, cursor);
}
CACHEKEY *rootp;
rootp = toku_calculate_root_offset_pointer(brt);
void toku_brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE newright) {
int r;
PMA pma;
void *v;
for (;;) {
void *node_v;
rr = toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
assert(rr == 0);
assert(oldnode->height == 0);
if (cursor->path[cursor->path_len-1] == oldnode) {
assert(newright->height == 0);
BRTNODE node = node_v;
BRT_SPLIT split; brt_split_init(&split);
r = brt_search_node(brt, node, search, newkey, newval, &split);
r = toku_pma_cursor_get_pma(cursor->pmacurs, &pma);
assert(r == 0);
if (pma == newright->u.l.buffer) {
r = toku_cachetable_unpin(t->cf, oldnode->thisnodename, oldnode->dirty, brtnode_size(oldnode));
assert(r == 0);
r = toku_cachetable_maybe_get_and_pin(t->cf, newright->thisnodename, &v);
assert(r == 0 && v == newright);
cursor->path[cursor->path_len-1] = newright;
if (split.did_split) {
rr = brt_init_new_root(brt, split.nodea, split.nodeb, split.splitk, rootp, 0, &node);
assert(rr == 0);
}
if (0) printf("toku_brt_cursor_leaf_split %p oldnode %lld newnode %lld\n", cursor,
oldnode->thisnodename, newright->thisnodename);
rr = unpin_brtnode(brt, node);
assert(rr == 0);
//verify_local_fingerprint_nonleaf(oldnode);
if (r != EAGAIN)
break;
}
}
void toku_brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t, BRTNODE node, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk) {
int i;
int oldchildnum, newchildnum;
assert(node->height > 0);
rr = toku_unpin_brt_header(brt);
assert(rr == 0);
// i = cursor->path_len - node->height - 1;
// if (i < 0)
// i = cursor->path_len - 1;
// if (i >= 0 && cursor->path[i] == node) {
// }
if (0) toku_brt_cursor_print(cursor);
/* see if the cursor path references the node */
for (i = 0; i < cursor->path_len; i++)
if (cursor->path[i] == node)
break;
if (i < cursor->path_len) {
if (cursor->pathcnum[i] < childnum) /* cursor is left of the split so nothing to do */
return;
if (cursor->pathcnum[i] > childnum) { /* cursor is right of the split so just increment the cursor childnum */
cursor->pathcnum[i] += 1;
return;
}
if (i == cursor->path_len-1) { /* cursor is being constructed */
if (cursor->op == DB_PREV || cursor->op == DB_LAST) /* go to the right subtree */
goto setnewchild;
if (cursor->op == DB_SET || cursor->op == DB_SET_RANGE || cursor->op == DB_GET_BOTH || cursor->op == DB_GET_BOTH_RANGE) {
if (brt_compare_pivot(t, cursor->key, cursor->val, splitk) > 0)
goto setnewchild;
}
}
if (i+1 < cursor->path_len) { /* the cursor path traversed the old child so update it if it traverses the right child */
assert(cursor->path[i+1] == left || cursor->path[i+1] == right);
if (cursor->path[i+1] == right) {
setnewchild:
oldchildnum = cursor->pathcnum[i];
newchildnum = oldchildnum + 1;
brt_node_remove_cursor(node, oldchildnum, cursor);
brt_node_add_cursor(node, newchildnum, cursor);
cursor->pathcnum[i] = newchildnum;
return;
}
}
}
return r;
}
void toku_brt_cursor_nonleaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right) {
int i;
BRTNODE newnode;
int r;
void *v;
int childnum;
assert(oldnode->height > 0 && left->height > 0 && right->height > 0);
// i = cursor->path_len - oldnode->height - 1;
// if (i < 0)
// i = cursor->path_len - 1;
// if (i >= 0 && cursor->path[i] == oldnode) {
for (i = 0; i < cursor->path_len; i++)
if (cursor->path[i] == oldnode)
break;
if (i < cursor->path_len) {
childnum = cursor->pathcnum[i];
brt_node_remove_cursor(oldnode, childnum, cursor);
if (childnum < left->u.n.n_children) {
newnode = left;
} else {
newnode = right;
childnum -= left->u.n.n_children;
static inline void dbt_cleanup(DBT *dbt) {
if (dbt->data && (dbt->flags & DB_DBT_MALLOC)) {
toku_free_n(dbt->data, dbt->size); dbt->data = 0;
}
}
if (0) printf("toku_brt_cursor_nonleaf_split %p oldnode %lld newnode %lld\n",
cursor, oldnode->thisnodename, newnode->thisnodename);
static inline void brt_cursor_cleanup(BRT_CURSOR cursor) {
dbt_cleanup(&cursor->key);
dbt_cleanup(&cursor->val);
}
// The oldnode is probably dead. But we say it is dirty? ???
r = toku_cachetable_unpin(t->cf, oldnode->thisnodename, oldnode->dirty, brtnode_size(oldnode));
assert(r == 0);
r = toku_cachetable_maybe_get_and_pin(t->cf, newnode->thisnodename, &v);
assert(r == 0 && v == newnode);
brt_node_add_cursor(newnode, childnum, cursor);
cursor->path[i] = newnode;
cursor->pathcnum[i] = childnum;
}
static inline int brt_cursor_not_set(BRT_CURSOR cursor) {
return cursor->key.data == 0 || cursor->val.data == 0;
}
int toku_brt_cursor (BRT brt, BRT_CURSOR*cursor) {
BRT_CURSOR MALLOC(result);
assert(result);
result->brt = brt;
result->path_len = 0;
result->pmacurs = 0;
static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *newval) {
brt_cursor_cleanup(cursor);
cursor->key = *newkey; memset(newkey, 0, sizeof *newkey);
cursor->val = *newval; memset(newval, 0, sizeof *newval);
}
if (brt->cursors_head) {
brt->cursors_head->prev = result;
} else {
brt->cursors_tail = result;
}
result->next = brt->cursors_head;
result->prev = 0;
brt->cursors_head = result;
*cursor = result;
int toku_brt_cursor(BRT brt, BRT_CURSOR *cursorptr) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0)
return ENOMEM;
cursor->brt = brt;
toku_init_dbt(&cursor->key);
toku_init_dbt(&cursor->val);
list_push(&brt->cursors, &cursor->cursors_link);
*cursorptr = cursor;
return 0;
}
int toku_brt_cursor_close(BRT_CURSOR cursor) {
brt_cursor_cleanup(cursor);
list_remove(&cursor->cursors_link);
toku_free_n(cursor, sizeof *cursor);
return 0;
}
static int unpin_cursor(BRT_CURSOR);
static inline int compare_k_x(BRT brt, DBT *k, DBT *x) {
return brt->compare_fun(brt->db, k, x);
}
int toku_brt_cursor_close (BRT_CURSOR curs) {
BRT brt = curs->brt;
int r=unpin_cursor(curs);
if (curs->prev==0) {
assert(brt->cursors_head==curs);
brt->cursors_head = curs->next;
} else {
curs->prev->next = curs->next;
}
if (curs->next==0) {
assert(brt->cursors_tail==curs);
brt->cursors_tail = curs->prev;
} else {
curs->next->prev = curs->prev;
}
if (curs->pmacurs) {
int r2=toku_pma_cursor_free(&curs->pmacurs);
if (r==0) r=r2;
}
toku_free(curs);
return r;
static inline int compare_v_y(BRT brt, DBT *v, DBT *y) {
return brt->dup_compare(brt->db, v, y);
}
/* Print the path of a cursor */
void toku_brt_cursor_print(BRT_CURSOR cursor) {
int i;
static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) {
int cmp = brt->compare_fun(brt->db, k, x);
if (cmp == 0 && v && y)
cmp = brt->dup_compare(brt->db, v, y);
return cmp;
}
printf("cursor %p: ", cursor);
for (i=0; i<cursor->path_len; i++) {
printf("%lld", cursor->path[i]->thisnodename);
if (cursor->path[i]->height > 0)
printf(",%d:%d ", cursor->pathcnum[i], cursor->path[i]->u.n.n_children);
else
printf(" ");
}
printf("\n");
static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
int r = 0;
if (key)
r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, &cursor->brt->skey);
if (r == 0 && val)
r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, &cursor->brt->sval);
return r;
}
static int brtcurs_set_position_last (BRT_CURSOR cursor, DISKOFF off, DBT *key, TOKUTXN txn) {
BRT brt=cursor->brt;
void *node_v;
static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */
}
int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
static int brt_cursor_current(BRT_CURSOR cursor, int get_flags, DBT *outkey, DBT *outval) {
if (brt_cursor_not_set(cursor))
return EINVAL;
if ((get_flags & 256) == 0) {
DBT newkey; toku_init_dbt(&newkey);
DBT newval; toku_init_dbt(&newval);
BRTNODE node = node_v;
if (0) {
died0: toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, 0); return r;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
int r = toku_brt_search(cursor->brt, &search, &newkey, &newval);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
return DB_KEYEMPTY;
}
assert(cursor->path_len<CURSOR_PATHLEN_LIMIT);
cursor->path[cursor->path_len++] = node;
if (node->height>0) {
int childnum;
return brt_cursor_copyout(cursor, outkey, outval);
}
try_last_child:
childnum = node->u.n.n_children-1;
/* search for the first kv pair that matches the search object */
static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
try_prev_child:
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
if (node->u.n.n_bytes_in_buffer[childnum] > 0) {
brt_flush_child(cursor->brt, node, childnum, cursor, txn);
/*
* the flush may have been partially successfull. it may have also
* changed the tree such that the current node have expanded or been
* replaced. lets start over.
*/
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
goto try_last_child;
}
r=brtcurs_set_position_last (cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0)
return 0;
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
if (r==DB_NOTFOUND) {
if (childnum>0) {
childnum--;
goto try_prev_child;
}
}
/* we ran out of children without finding anything, or had some other trouble. */
cursor->path_len--;
goto died0;
} else {
r=toku_pma_cursor(node->u.l.buffer, &cursor->pmacurs, &cursor->brt->skey, &cursor->brt->sval);
if (r!=0) {
if (0) { died10: toku_pma_cursor_free(&cursor->pmacurs); }
cursor->path_len--;
goto died0;
}
r=toku_pma_cursor_set_position_last(cursor->pmacurs);
if (r!=0) goto died10; /* we'll deallocate this cursor, and unpin this node, and go back up. */
return 0;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval);
if (r == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
}
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r;
}
static int brtcurs_set_position_first (BRT_CURSOR cursor, DISKOFF off, DBT *key, TOKUTXN txn) {
BRT brt=cursor->brt;
void *node_v;
int r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
/* search for the kv pair that matches the search object and is equal to kv */
static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
BRTNODE node = node_v;
assert(cursor->path_len<CURSOR_PATHLEN_LIMIT);
cursor->path[cursor->path_len++] = node;
if (0) {
died0: toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, 0); return r;
}
if (node->height>0) {
int childnum
;
try_first_child:
childnum = 0;
try_next_child:
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
if (node->u.n.n_bytes_in_buffer[childnum] > 0) {
brt_flush_child(cursor->brt, node, childnum, cursor, txn);
/*
* the flush may have been partially successfull. it may have also
* changed the tree such that the current node have expanded or been
* replaced. lets start over.
*/
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
goto try_first_child;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval);
if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &newkey, &newval) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
} else
r = DB_NOTFOUND;
}
r=brtcurs_set_position_first (cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0)
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r;
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
if (r==DB_NOTFOUND) {
if (childnum+1<node->u.n.n_children) {
childnum++;
goto try_next_child;
}
}
/* we ran out of children without finding anything, or had some other trouble. */
cursor->path_len--;
goto died0;
} else {
r=toku_pma_cursor(node->u.l.buffer, &cursor->pmacurs, &cursor->brt->skey, &cursor->brt->sval);
if (r!=0) {
if (0) { died10: toku_pma_cursor_free(&cursor->pmacurs); }
cursor->path_len--;
goto died0;
}
r=toku_pma_cursor_set_position_first(cursor->pmacurs);
if (r!=0) goto died10; /* we'll deallocate this cursor, and unpin this node, and go back up. */
return 0;
}
}
static int brtcurs_set_position_next2(BRT_CURSOR cursor, DBT *key, TOKUTXN txn) {
BRTNODE node;
int childnum;
int r;
int more;
assert(cursor->path_len > 0);
/* pop the node and childnum from the cursor path */
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
cursor->path_len -= 1;
//verify_local_fingerprint_nonleaf(node);
toku_cachetable_unpin(cursor->brt->cf, node->thisnodename, node->dirty, brtnode_size(node));
/* search for the kv pair that matches the search object and is equal to k */
static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
if (toku_brt_cursor_path_empty(cursor))
return DB_NOTFOUND;
/* set position first in the next right tree */
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
assert(node->height > 0);
brt_node_remove_cursor(node, childnum, cursor);
childnum += 1;
while (childnum < node->u.n.n_children) {
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
for (;;) {
more = node->u.n.n_bytes_in_buffer[childnum];
if (more == 0)
break;
brt_flush_child(cursor->brt, node, childnum, cursor, txn);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
}
r = brtcurs_set_position_first(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0)
return 0;
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
childnum += 1;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval);
if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &newkey) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval);
} else
r = DB_NOTFOUND;
}
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r;
}
return brtcurs_set_position_next2(cursor, key, txn);
static int brt_cursor_compare_one(brt_search_t *search, DBT *x, DBT *y) {
search = search; x = x; y = y;
return 1;
}
/* requires that the cursor is initialized. */
static int brtcurs_set_position_next (BRT_CURSOR cursor, DBT *key, TOKUTXN txn) {
int r = toku_pma_cursor_set_position_next(cursor->pmacurs);
if (r==DB_NOTFOUND) {
/* We fell off the end of the pma. */
if (cursor->path_len==1) return DB_NOTFOUND;
/* Part of the trickyness is we need to leave the cursor pointing at the current (possibly deleted) value if there is no next value. */
r = toku_pma_cursor_free(&cursor->pmacurs);
assert(r == 0);
return brtcurs_set_position_next2(cursor, key, txn);
}
return 0;
static int brt_cursor_first(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
}
static int brtcurs_set_position_prev2(BRT_CURSOR cursor, DBT *key, TOKUTXN txn) {
BRTNODE node;
int childnum;
int r;
int more;
static int brt_cursor_last(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_RIGHT, 0, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
}
assert(cursor->path_len > 0);
static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */
}
/* pop the node and childnum from the cursor path */
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
cursor->path_len -= 1;
//verify_local_fingerprint_nonleaf(node);
toku_cachetable_unpin(cursor->brt->cf, node->thisnodename, node->dirty, brtnode_size(node));
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
}
if (toku_brt_cursor_path_empty(cursor))
return DB_NOTFOUND;
static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) < 0; /* return min x: k < x */
}
/* set position last in the next left tree */
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
assert(node->height > 0);
brt_node_remove_cursor(node, childnum, cursor);
static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_nodup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
}
childnum -= 1;
while (childnum >= 0) {
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
for (;;) {
more = node->u.n.n_bytes_in_buffer[childnum];
if (more == 0)
break;
brt_flush_child(cursor->brt, node, childnum, cursor, txn);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
}
r = brtcurs_set_position_last(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), key, txn);
if (r == 0)
return 0;
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
childnum -= 1;
}
static int brt_cursor_compare_next_dup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp < 0)
return 1;
else
return keycmp == 0 && compare_v_y(brt, search->v, y) < 0; /* return min xy: k <= x && v < y */
}
return brtcurs_set_position_prev2(cursor, key, txn);
static int brt_cursor_next_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_dup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval);
}
static int brtcurs_set_position_prev (BRT_CURSOR cursor, DBT *key, TOKUTXN txn) {
int r = toku_pma_cursor_set_position_prev(cursor->pmacurs);
if (r==DB_NOTFOUND) {
if (cursor->path_len==1)
return DB_NOTFOUND;
r = toku_pma_cursor_free(&cursor->pmacurs);
assert(r == 0);
return brtcurs_set_position_prev2(cursor, key, txn);
}
return 0;
static int brt_cursor_compare_get_both_range(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp < 0)
return 1;
else
return keycmp == 0 && compare_v_y(brt, search->v, y) <= 0; /* return min xy: k <= x && v <= y */
}
static int brtcurs_dupsort_next_child(BRT_CURSOR cursor, BRTNODE node, int childnum, int op) {
cursor = cursor;
if (op == DB_GET_BOTH) return node->u.n.n_children; /* no more */
return childnum + 1;
static int brt_cursor_get_both_range(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_get_both_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval);
}
static int brtcurs_nodup_next_child(BRT_CURSOR cursor, BRTNODE node, int childnum, int op) {
cursor = cursor;
if (op == DB_SET || op == DB_GET_BOTH) return node->u.n.n_children; /* no more */
return childnum + 1;
static int brt_cursor_compare_prev(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) > 0; /* return max xy: kv > xy */
}
static int brtcurs_set_search(BRT_CURSOR cursor, DISKOFF off, int op, DBT *key, DBT *val, TOKUTXN txn) {
BRT brt = cursor->brt;
void *node_v;
int r;
r = toku_cachetable_get_and_pin(brt->cf, off, &node_v, NULL, toku_brtnode_flush_callback,
toku_brtnode_fetch_callback, brt);
if (r != 0)
return r;
static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
}
BRTNODE node = node_v;
int childnum;
static int brt_cursor_compare_prev_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) > 0; /* return max x: k > x */
}
if (node->height > 0) {
cursor->path_len += 1;
/* select the leftmost subtree that may contain the key and val */
childnum = brtnode_left_child(node, key, val, brt);
for (;;) {
/* flush the buffer for the child subtree */
for (;;) {
cursor->path[cursor->path_len-1] = node;
cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor);
int more = node->u.n.n_bytes_in_buffer[childnum];
if (more > 0) {
cursor->key = key; cursor->val = val;
brt_flush_child(cursor->brt, node, childnum, cursor, txn);
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
continue;
}
break;
}
/* search in the child subtree */
r = brtcurs_set_search(cursor, BRTNODE_CHILD_DISKOFF(node, childnum), op, key, val, txn);
if (r == 0)
break;
/* not found in the child subtree, look elsewhere */
node = cursor->path[cursor->path_len-1];
childnum = cursor->pathcnum[cursor->path_len-1];
brt_node_remove_cursor(node, childnum, cursor);
static int brt_cursor_prev_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_nodup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
}
if (brt->flags & TOKU_DB_DUPSORT)
childnum = brtcurs_dupsort_next_child(cursor, node, childnum, op);
else
childnum = brtcurs_nodup_next_child(cursor, node, childnum, op);
#ifdef DB_PREV_DUP
if (childnum >= node->u.n.n_children) {
r = DB_NOTFOUND;
break;
}
}
} else {
cursor->path_len += 1;
cursor->path[cursor->path_len-1] = node;
r = toku_pma_cursor(node->u.l.buffer, &cursor->pmacurs, &cursor->brt->skey, &cursor->brt->sval);
if (r == 0) {
if (op == DB_SET || op == DB_GET_BOTH)
r = toku_pma_cursor_set_both(cursor->pmacurs, key, val);
else if (op == DB_SET_RANGE || op == DB_GET_BOTH_RANGE)
r = toku_pma_cursor_set_range_both(cursor->pmacurs, key, val);
static int brt_cursor_compare_prev_dup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
int keycmp = compare_k_x(brt, search->k, x);
if (keycmp > 0)
return 1;
else
assert(0);
if (r != 0) {
int rr = toku_pma_cursor_free(&cursor->pmacurs);
assert(rr == 0);
}
}
}
return keycmp == 0 && compare_v_y(brt, search->v, y) > 0; /* return max xy: k >= x && v > y */
}
if (r != 0) {
cursor->path_len -= 1;
//verify_local_fingerprint_nonleaf(node);
toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnode_size(node));
}
return r;
static int brt_cursor_prev_dup(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev_dup, BRT_SEARCH_RIGHT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search_eq_k_x(cursor, &search, outkey, outval);
}
static int unpin_cursor (BRT_CURSOR cursor) {
BRT brt=cursor->brt;
int i;
int r=0;
for (i=0; i<cursor->path_len; i++) {
BRTNODE node = cursor->path[i];
brt_node_remove_cursor(node, cursor->pathcnum[i], cursor);
//verify_local_fingerprint_nonleaf(node);
int r2 = toku_cachetable_unpin(brt->cf, node->thisnodename, node->dirty, brtnode_size(node));
if (r==0) r=r2;
}
if (cursor->pmacurs) {
r = toku_pma_cursor_free(&cursor->pmacurs);
assert(r == 0);
}
cursor->path_len=0;
return r;
#endif
static int brt_cursor_compare_set_range(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return kv <= xy */
}
static void assert_cursor_path(BRT_CURSOR cursor) {
int i;
BRTNODE node;
int child;
static int brt_cursor_set(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search_eq_kv_xy(cursor, &search, outkey, outval);
}
if (cursor->path_len <= 0)
return;
for (i=0; i<cursor->path_len-1; i++) {
node = cursor->path[i];
child = cursor->pathcnum[i];
assert(node->height > 0);
assert(node->u.n.n_bytes_in_buffer[child] == 0);
assert(node->u.n.n_cursors[child] > 0);
}
node = cursor->path[i];
assert(node->height == 0);
static int brt_cursor_set_range(BRT_CURSOR cursor, DBT *key, DBT *outkey, DBT *outval) {
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set_range, BRT_SEARCH_LEFT, key, 0, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval);
}
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int flags, TOKUTXN txn) {
int do_rmw=0;
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, TOKUTXN txn) {
assert(txn == 0);
int r;
CACHEKEY *rootp;
//dump_brt(cursor->brt);
//fprintf(stderr, "%s:%d in brt_c_get(...)\n", __FILE__, __LINE__);
if ((r = toku_read_and_pin_brt_header(cursor->brt->cf, &cursor->brt->h))) {
if (0) { died0: toku_unpin_brt_header(cursor->brt); }
return r;
}
rootp = toku_calculate_root_offset_pointer(cursor->brt);
if (flags&DB_RMW) {
do_rmw=1;
flags &= ~DB_RMW;
}
cursor->op = flags;
switch (flags) {
case DB_LAST:
do_db_last:
r=unpin_cursor(cursor); if (r!=0) goto died0;
assert(cursor->pmacurs == 0);
r=brtcurs_set_position_last(cursor, *rootp, kbt, txn); if (r!=0) goto died0;
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r == 0) assert_cursor_path(cursor);
if ((get_flags & ~(DB_OPFLAGS_MASK+256)))
return EINVAL;
switch (get_flags) {
case DB_CURRENT:
case DB_CURRENT+256:
r = brt_cursor_current(cursor, get_flags, key, val);
break;
case DB_FIRST:
do_db_first:
r=unpin_cursor(cursor); if (r!=0) goto died0;
assert(cursor->pmacurs == 0);
r=brtcurs_set_position_first(cursor, *rootp, kbt, txn); if (r!=0) goto died0;
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r == 0) assert_cursor_path(cursor);
r = brt_cursor_first(cursor, key, val);
break;
case DB_LAST:
r = brt_cursor_last(cursor, key, val);
break;
case DB_NEXT:
if (cursor->path_len<=0)
goto do_db_first;
r=brtcurs_set_position_next(cursor, kbt, txn); if (r!=0) goto died0;
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0); if (r!=0) goto died0;
if (r == 0) assert_cursor_path(cursor);
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val);
else
r = brt_cursor_next(cursor, key, val);
break;
case DB_NEXT_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_next_dup(cursor, key, val);
break;
case DB_NEXT_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_first(cursor, key, val);
else
r = brt_cursor_next_nodup(cursor, key, val);
break;
case DB_PREV:
if (cursor->path_len<= 0)
goto do_db_last;
r = brtcurs_set_position_prev(cursor, kbt, txn); if (r!=0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0); if (r!=0) goto died0;
if (r == 0) assert_cursor_path(cursor);
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val);
else
r = brt_cursor_prev(cursor, key, val);
break;
case DB_CURRENT:
case DB_CURRENT+256:
if (cursor->path_len<=0) {
r = EINVAL; goto died0;
}
r=toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, (flags&256)!=0); if (r!=0) goto died0;
if (r == 0) assert_cursor_path(cursor);
#ifdef DB_PREV_DUP
case DB_PREV_DUP:
if (brt_cursor_not_set(cursor))
r = EINVAL;
else
r = brt_cursor_prev_dup(cursor, key, val);
break;
case DB_SET:
r = unpin_cursor(cursor);
assert(r == 0);
r = brtcurs_set_search(cursor, *rootp, DB_SET, kbt, 0, txn);
if (r != 0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, 0, vbt, 0);
if (r != 0) goto died0;
#endif
case DB_PREV_NODUP:
if (brt_cursor_not_set(cursor))
r = brt_cursor_last(cursor, key, val);
else
r = brt_cursor_prev_nodup(cursor, key, val);
break;
case DB_GET_BOTH:
r = unpin_cursor(cursor);
assert(r == 0);
r = brtcurs_set_search(cursor, *rootp, DB_GET_BOTH, kbt, vbt, txn);
if (r != 0) goto died0;
case DB_SET:
r = brt_cursor_set(cursor, key, 0, 0, val);
break;
case DB_SET_RANGE:
r = unpin_cursor(cursor);
assert(r == 0);
r = brtcurs_set_search(cursor, *rootp, DB_SET_RANGE, kbt, 0, txn);
if (r != 0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r != 0) goto died0;
r = brt_cursor_set_range(cursor, key, key, val);
break;
case DB_GET_BOTH:
r = brt_cursor_set(cursor, key, val, 0, 0);
break;
case DB_GET_BOTH_RANGE:
r = EINVAL; goto died0; /* does not work yet */
r = unpin_cursor(cursor); assert(r == 0);
r = brtcurs_set_search(cursor, *rootp, DB_GET_BOTH_RANGE, kbt, vbt, txn);
if (r != 0) goto died0;
r = toku_pma_cursor_get_current(cursor->pmacurs, kbt, vbt, 0);
if (r != 0) goto died0;
r = brt_cursor_get_both_range(cursor, key, val, 0, val);
break;
default:
toku_unpin_brt_header(cursor->brt);
return EINVAL;
fprintf(stderr, "%s:%d c_get(...,%d) not ready\n", __FILE__, __LINE__, flags);
abort();
}
//printf("%s:%d unpinning header\n", __FILE__, __LINE__);
if ((r = toku_unpin_brt_header(cursor->brt))!=0) return r;
return 0;
}
/* delete the key and value under the cursor */
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags __attribute__((__unused__))) {
int r;
if (cursor->path_len > 0) {
BRTNODE node = cursor->path[cursor->path_len-1];
assert(node->height == 0);
int kvsize;
r = toku_pma_cursor_delete_under(cursor->pmacurs, &kvsize, node->rand4fingerprint, &node->local_fingerprint);
if (r == 0) {
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kvsize;
node->dirty = 1;
r = EINVAL;
break;
}
} else
r = DB_NOTFOUND;
return r;
}
int toku_brt_dbt_set_key(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) {
int r = toku_dbt_set_value(ybt, val, vallen, &brt->skey);
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags) {
if ((flags & ~DB_DELETE_ANY) != 0)
return EINVAL;
if (brt_cursor_not_set(cursor))
return EINVAL;
int r = 0;
if (!(flags & DB_DELETE_ANY))
r = brt_cursor_current(cursor, DB_CURRENT, 0, 0);
if (r == 0)
r = toku_brt_delete_both(cursor->brt, &cursor->key, &cursor->val);
return r;
}
int toku_brt_dbt_set_value(BRT brt, DBT *ybt, bytevec val, ITEMLEN vallen) {
int r = toku_dbt_set_value(ybt, val, vallen, &brt->sval);
return r;
}
......@@ -10,6 +10,7 @@
#include "../include/db.h"
#include "cachetable.h"
#include "log.h"
#include "brt-search.h"
int toku_open_brt (const char *fname, const char *dbname, int is_create, BRT *, int nodesize, CACHETABLE, TOKUTXN, int(*)(DB*,const DBT*,const DBT*), DB*);
......
#ifndef _TOKUDB_LIST_H
#define _TOKUDB_LIST_H
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
// This list is intended to be embedded in other data structures.
......@@ -59,7 +62,7 @@ static inline struct list *list_pop_head(struct list *head) {
static inline void list_move(struct list *newhead, struct list *oldhead) {
struct list *first = oldhead->next;
struct list *last = oldhead->prev;
assert(!list_empty(oldhead));
// assert(!list_empty(oldhead));
newhead->next = first;
newhead->prev = last;
last->next = first->prev = newhead;
......@@ -75,6 +78,4 @@ static inline void list_move(struct list *newhead, struct list *oldhead) {
#define list_struct(p, t, f) ((t*)((char*)(p) - ((char*)&((t*)0)->f)))
#endif
#endif
......@@ -297,6 +297,42 @@ static unsigned int pma_search(PMA pma, DBT *k, DBT *v, int lo, int hi, int *fou
}
}
static unsigned int pma_search_func(PMA pma, brt_search_t *search, int lo, int hi, int *found) {
assert(0 <= lo && lo <= hi);
if (lo >= hi) {
*found = 0;
return lo;
} else {
int mi = (lo + hi)/2;
assert(lo <= mi && mi < hi);
int omi = mi;
while (mi < hi && !kv_pair_inuse(pma->pairs[mi]))
mi++;
if (mi >= hi)
return pma_search_func(pma, search, lo, omi, found);
struct kv_pair *kv = kv_pair_ptr(pma->pairs[mi]);
DBT x, y;
int cmp = search->compare(search, search->k ? toku_fill_dbt(&x, kv_pair_key(kv), kv_pair_keylen(kv)) : 0, search->v ? toku_fill_dbt(&y, kv_pair_val(kv), kv_pair_vallen(kv)) : 0);
if (cmp == 0) {
if (search->direction == BRT_SEARCH_LEFT)
return pma_search_func(pma, search, mi+1, hi, found);
else
return pma_search_func(pma, search, lo, mi, found);
}
/* we have a match, try to find a better match on the left or right subtrees */
int here;
if (search->direction == BRT_SEARCH_LEFT)
here = pma_search_func(pma, search, lo, mi, found);
else
here = pma_search_func(pma, search, mi+1, hi, found);
if (*found == 0)
here = mi;
*found = 1;
return here;
}
}
// Return the smallest index such that no lower index contains a larger key.
// This will be in the range 0 (inclusive) to toku_pma_index_limit(pma) (inclusive).
// Thus the returned index may not be a valid index into the array if it is == toku_pma_index_limit(pma)
......@@ -840,6 +876,21 @@ enum pma_errors toku_pma_lookup (PMA pma, DBT *k, DBT *v) {
return DB_NOTFOUND;
}
int toku_pma_search(PMA pma, brt_search_t *search, DBT *foundk, DBT *foundv) {
int found;
unsigned int here = pma_search_func(pma, search, 0, pma->N, &found);
struct kv_pair *kv = pma->pairs[here];
if (found && kv_pair_valid(kv)) {
int r = 0;
if (foundk)
r = toku_dbt_set_value(foundk, kv_pair_key(kv), kv_pair_keylen(kv), &pma->skey);
if (r == 0 && foundv)
r = toku_dbt_set_value(foundv, kv_pair_val(kv), kv_pair_vallen(kv), &pma->sval);
return r;
} else
return DB_NOTFOUND;
}
/* returns 0 if OK.
* You must have freed all the cursors, otherwise returns nonzero and does nothing. */
int toku_pma_free (PMA *pmap) {
......
......@@ -8,6 +8,7 @@
#include "yerror.h"
#include "../include/db.h"
#include "log.h"
#include "brt-search.h"
/* An in-memory Packed Memory Array dictionary. */
/* There is a built-in-cursor. */
......@@ -69,6 +70,8 @@ int toku_pma_insert_or_replace (PMA /*pma*/, DBT */*k*/, DBT */*v*/,
* Don't modify the returned data. Don't free it. */
enum pma_errors toku_pma_lookup (PMA, DBT*, DBT*);
int toku_pma_search(PMA, brt_search_t *, DBT *, DBT *);
/*
* The kv pairs in PMA are split into two (nearly) equal sized sets.
* THe ones in the left half are left in PMA, the ones in the right half are put into NEWPMA.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment