Commit c7cce078 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Integrate the omtcursors into the BRT. So far only DB_NEXT is implemented. Addresses #855, #856.

git-svn-id: file:///svn/tokudb@4325 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6f6b6594
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "kv-pair.h" #include "kv-pair.h"
#include "leafentry.h" #include "leafentry.h"
typedef void *OMTVALUE;
#include "omt.h" #include "omt.h"
#ifndef BRT_FANOUT #ifndef BRT_FANOUT
...@@ -111,6 +112,8 @@ struct brt_header { ...@@ -111,6 +112,8 @@ struct brt_header {
unsigned int *flags_array; // an array of flags. Element 0 holds the element if no subdatabases allowed. unsigned int *flags_array; // an array of flags. Element 0 holds the element if no subdatabases allowed.
FIFO fifo; // all the abort and commit commands. If the header gets flushed to disk, we write the fifo contents beyond the unused_memory. FIFO fifo; // all the abort and commit commands. If the header gets flushed to disk, we write the fifo contents beyond the unused_memory.
uint64_t root_put_counter;
}; };
struct brt { struct brt {
...@@ -190,6 +193,8 @@ struct brt_cursor { ...@@ -190,6 +193,8 @@ struct brt_cursor {
DBT val; DBT val;
int is_temporary_cursor; // If it is a temporary cursor then use the following skey and sval to return tokudb-managed values in dbts. Otherwise use the brt's skey and skval. int is_temporary_cursor; // If it is a temporary cursor then use the following skey and sval to return tokudb-managed values in dbts. Otherwise use the brt's skey and skval.
void *skey, *sval; void *skey, *sval;
OMTCURSOR omtcursor;
uint64_t root_put_counter; // what was the count on the BRT when we validated the cursor?
}; };
// logs the memory allocation, but not the creation of the new node // logs the memory allocation, but not the creation of the new node
......
...@@ -54,6 +54,13 @@ extern long long n_items_malloced; ...@@ -54,6 +54,13 @@ extern long long n_items_malloced;
static int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER); static int malloc_diskblock (DISKOFF *res, BRT brt, int size, TOKULOGGER);
static void verify_local_fingerprint_nonleaf (BRTNODE node); static void verify_local_fingerprint_nonleaf (BRTNODE node);
// We invalidate all the OMTCURSORS any time we push into the root of the BRT for that OMT.
// We keep a counter on each brt header, but if the brt header is evicted from the cachetable
// then we lose that counter. So we also keep a global counter.
// An alternative would be to keep only the global counter. But that would invalidate all OMTCURSORS
// even from unrelated BRTs. This way we only invalidate an OMTCURSOR if
static uint64_t global_root_put_counter = 0;
/* Frees a node, including all the stuff in the hash table. */ /* Frees a node, including all the stuff in the hash table. */
void toku_brtnode_free (BRTNODE *nodep) { void toku_brtnode_free (BRTNODE *nodep) {
BRTNODE node=*nodep; BRTNODE node=*nodep;
...@@ -224,6 +231,7 @@ int toku_brtheader_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void * ...@@ -224,6 +231,7 @@ int toku_brtheader_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void *
if ((r = toku_deserialize_fifo_at(toku_cachefile_fd(cachefile), (*h)->unused_memory, &(*h)->fifo))) return r; if ((r = toku_deserialize_fifo_at(toku_cachefile_fd(cachefile), (*h)->unused_memory, &(*h)->fifo))) return r;
//printf("%s:%d fifo=%p\nn", __FILE__, __LINE__, (*h)->fifo); //printf("%s:%d fifo=%p\nn", __FILE__, __LINE__, (*h)->fifo);
written_lsn->lsn = 0; // !!! WRONG. This should be stored or kept redundantly or something. written_lsn->lsn = 0; // !!! WRONG. This should be stored or kept redundantly or something.
(*h)->root_put_counter = global_root_put_counter++;
return 0; return 0;
} }
...@@ -1461,7 +1469,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, ...@@ -1461,7 +1469,7 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
FILENUM filenum = toku_cachefile_filenum(t->cf); FILENUM filenum = toku_cachefile_filenum(t->cf);
LEAFENTRY storeddata; LEAFENTRY storeddata;
OMTVALUE storeddatav=NULL; // TODO BBB This is not entirely safe. Verify initialization needed. OMTVALUE storeddatav=NULL;
u_int32_t idx; u_int32_t idx;
int r; int r;
...@@ -2033,6 +2041,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -2033,6 +2041,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->h->freelist=-1; t->h->freelist=-1;
t->h->unused_memory=2*t->nodesize; t->h->unused_memory=2*t->nodesize;
toku_fifo_create(&t->h->fifo); toku_fifo_create(&t->h->fifo);
t->h->root_put_counter = global_root_put_counter++;
if (dbname) { if (dbname) {
t->h->n_named_roots = 1; t->h->n_named_roots = 1;
if ((MALLOC_N(1, t->h->names))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died4: if (dbname) toku_free(t->h->names); } goto died3; } if ((MALLOC_N(1, t->h->names))==0) { assert(errno==ENOMEM); r=ENOMEM; if (0) { died4: if (dbname) toku_free(t->h->names); } goto died3; }
...@@ -2369,6 +2378,7 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) { ...@@ -2369,6 +2378,7 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) {
if (0) { died0: toku_unpin_brt_header(brt); } if (0) { died0: toku_unpin_brt_header(brt); }
return r; return r;
} }
brt->h->root_put_counter = global_root_put_counter++;
rootp = toku_calculate_root_offset_pointer(brt); rootp = toku_calculate_root_offset_pointer(brt);
if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) { toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
...@@ -2579,10 +2589,10 @@ static inline void brt_split_init(BRT_SPLIT *split) { ...@@ -2579,10 +2589,10 @@ static inline void brt_split_init(BRT_SPLIT *split) {
toku_init_dbt(&split->splitk); toku_init_dbt(&split->splitk);
} }
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger); static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR);
/* search in a node's child */ /* search in a node's child */
static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) { static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR omtcursor) {
int r, rr; int r, rr;
/* if the child's buffer is not empty then try to empty it */ /* if the child's buffer is not empty then try to empty it */
...@@ -2600,7 +2610,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s ...@@ -2600,7 +2610,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
for (;;) { for (;;) {
BRTNODE childnode = node_v; BRTNODE childnode = node_v;
BRT_SPLIT childsplit; brt_split_init(&childsplit); BRT_SPLIT childsplit; brt_split_init(&childsplit);
r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger); r = brt_search_node(brt, childnode, search, newkey, newval, &childsplit, logger, omtcursor);
if (childsplit.did_split) { if (childsplit.did_split) {
rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk, rr = handle_split_of_child(brt, node, childnum, childsplit.nodea, childsplit.nodeb, &childsplit.splitk,
&split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger); &split->did_split, &split->nodea, &split->nodeb, &split->splitk, logger);
...@@ -2618,7 +2628,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s ...@@ -2618,7 +2628,7 @@ static int brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *s
return r; return r;
} }
static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) { static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR omtcursor) {
int r; int r;
int c; int c;
...@@ -2636,7 +2646,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, ...@@ -2636,7 +2646,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
if (search->compare(search, if (search->compare(search,
toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)), toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot)),
brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) { brt->flags & TOKU_DB_DUPSORT ? toku_fill_dbt(&pivotval, kv_pair_val(pivot), kv_pair_vallen(pivot)): 0)) {
r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger); r = brt_search_child(brt, node, child[c], search, newkey, newval, split, logger, omtcursor);
if (r == 0 || r == EAGAIN) { if (r == 0 || r == EAGAIN) {
return r; return r;
} }
...@@ -2644,7 +2654,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search, ...@@ -2644,7 +2654,7 @@ static int brt_search_nonleaf_node(BRT brt, BRTNODE node, brt_search_t *search,
} }
/* check the first (left) or last (right) node if nothing has been found */ /* check the first (left) or last (right) node if nothing has been found */
return brt_search_child(brt, node, child[c], search, newkey, newval, split, logger); return brt_search_child(brt, node, child[c], search, newkey, newval, split, logger, omtcursor);
} }
int pair_leafval_bessel_le_committed (u_int32_t klen, void *kval, int pair_leafval_bessel_le_committed (u_int32_t klen, void *kval,
...@@ -2693,7 +2703,7 @@ static int bessel_from_search_t (OMTVALUE lev, void *extra) { ...@@ -2693,7 +2703,7 @@ static int bessel_from_search_t (OMTVALUE lev, void *extra) {
LESWITCHCALL(leafval, pair_leafval_bessel, search); LESWITCHCALL(leafval, pair_leafval_bessel, search);
} }
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval) { static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, OMTCURSOR omtcursor) {
// Now we have to convert from brt_search_t to the bessel function with a direction. What a pain... // Now we have to convert from brt_search_t to the bessel function with a direction. What a pain...
int direction; int direction;
switch (search->direction) { switch (search->direction) {
...@@ -2703,12 +2713,12 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT ...@@ -2703,12 +2713,12 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
return EINVAL; // This return and the goto are a hack to get both compile-time and run-time checking on enum return EINVAL; // This return and the goto are a hack to get both compile-time and run-time checking on enum
ok: ; ok: ;
OMTVALUE datav; OMTVALUE datav;
u_int32_t idx; u_int32_t idx = 0;
int r = toku_omt_find(node->u.l.buffer, int r = toku_omt_find(node->u.l.buffer,
bessel_from_search_t, bessel_from_search_t,
search, search,
direction, direction,
&datav, &idx, NULL); &datav, &idx, omtcursor);
if (r!=0) return r; if (r!=0) return r;
LEAFENTRY le = datav; LEAFENTRY le = datav;
...@@ -2746,19 +2756,23 @@ got_a_good_value: ...@@ -2746,19 +2756,23 @@ got_a_good_value:
return 0; return 0;
} }
static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger) { static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, BRT_SPLIT *split, TOKULOGGER logger, OMTCURSOR omtcursor) {
if (node->height > 0) if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger); return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger, omtcursor);
else else
return brt_search_leaf_node(brt, node, search, newkey, newval); return brt_search_leaf_node(brt, node, search, newkey, newval, omtcursor);
} }
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger) { int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor, uint64_t *root_put_counter)
// Effect: Perform a search. Associate cursor with a leaf if possible.
{
int r, rr; int r, rr;
rr = toku_read_and_pin_brt_header(brt->cf, &brt->h); rr = toku_read_and_pin_brt_header(brt->cf, &brt->h);
assert(rr == 0); assert(rr == 0);
*root_put_counter = brt->h->root_put_counter;
CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt); CACHEKEY *rootp = toku_calculate_root_offset_pointer(brt);
void *node_v; void *node_v;
...@@ -2780,7 +2794,7 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOK ...@@ -2780,7 +2794,7 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOK
for (;;) { for (;;) {
BRT_SPLIT split; brt_split_init(&split); BRT_SPLIT split; brt_split_init(&split);
r = brt_search_node(brt, node, search, newkey, newval, &split, logger); r = brt_search_node(brt, node, search, newkey, newval, &split, logger, omtcursor);
if (split.did_split) { if (split.did_split) {
rr = brt_init_new_root(brt, split.nodea, split.nodeb, split.splitk, rootp, 0, &node); rr = brt_init_new_root(brt, split.nodea, split.nodeb, split.splitk, rootp, 0, &node);
...@@ -2840,6 +2854,9 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) { ...@@ -2840,6 +2854,9 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
list_push(&brt->cursors, &cursor->cursors_link); list_push(&brt->cursors, &cursor->cursors_link);
cursor->is_temporary_cursor=is_temporary_cursor; cursor->is_temporary_cursor=is_temporary_cursor;
cursor->skey = cursor->sval = 0; cursor->skey = cursor->sval = 0;
int r = toku_omt_cursor_create(&cursor->omtcursor);
assert(r==0);
cursor->root_put_counter=0;
*cursorptr = cursor; *cursorptr = cursor;
return 0; return 0;
} }
...@@ -2849,6 +2866,7 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) { ...@@ -2849,6 +2866,7 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) {
if (cursor->skey) toku_free(cursor->skey); if (cursor->skey) toku_free(cursor->skey);
if (cursor->sval) toku_free(cursor->sval); if (cursor->sval) toku_free(cursor->sval);
list_remove(&cursor->cursors_link); list_remove(&cursor->cursors_link);
toku_omt_cursor_destroy(&cursor->omtcursor);
toku_free_n(cursor, sizeof *cursor); toku_free_n(cursor, sizeof *cursor);
return 0; return 0;
} }
...@@ -2953,7 +2971,7 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva ...@@ -2953,7 +2971,7 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger); r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0) if (r != 0 || compare_kv_xy(cursor->brt, &cursor->key, &cursor->val, &newkey, &newval) != 0)
r = DB_KEYEMPTY; r = DB_KEYEMPTY;
dbt_cleanup(&newkey); dbt_cleanup(&newkey);
...@@ -2968,7 +2986,7 @@ static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outke ...@@ -2968,7 +2986,7 @@ static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outke
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC; DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger); int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval); brt_cursor_set_key_val(cursor, &newkey, &newval);
r = brt_cursor_copyout(cursor, outkey, outval); r = brt_cursor_copyout(cursor, outkey, outval);
...@@ -2983,7 +3001,7 @@ static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, D ...@@ -2983,7 +3001,7 @@ static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, D
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC; DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger); int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &newkey, &newval) == 0) { if (compare_kv_xy(cursor->brt, search->k, search->v, &newkey, &newval) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval); brt_cursor_set_key_val(cursor, &newkey, &newval);
...@@ -3001,7 +3019,7 @@ static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT ...@@ -3001,7 +3019,7 @@ static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC; DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC;
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger); int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &newkey) == 0) { if (compare_k_x(cursor->brt, search->k, &newkey) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval); brt_cursor_set_key_val(cursor, &newkey, &newval);
...@@ -3034,7 +3052,42 @@ static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) { ...@@ -3034,7 +3052,42 @@ static int brt_cursor_compare_next(brt_search_t *search, DBT *x, DBT *y) {
return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */ return compare_kv_xy(brt, search->k, search->v, x, y) < 0; /* return min xy: kv < xy */
} }
static int brt_cursor_next_shortcut (BRT_CURSOR cursor, DBT *outkey, DBT *outval)
// Effect: If possible, increment the cursor and return the key-value pair
// (i.e., the next one from what the cursor pointed to before.)
// That is, do DB_NEXT on DUP databases, and do DB_NEXT_NODUP on NODUP databases.
{
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
int rr = toku_read_and_pin_brt_header(cursor->brt->cf, &cursor->brt->h);
if (rr!=0) return rr;
uint64_t h_counter = cursor->brt->h->root_put_counter;
rr = toku_unpin_brt_header(cursor->brt);
assert(rr==0);
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
int r = toku_omt_cursor_next(cursor->omtcursor, &le);
if (r==0) {
DBT key,val;
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
bytevec keyb = le_latest_key(le);
bytevec valb = le_latest_val(le);
r = toku_dbt_set_two_values(&key, &keyb, le_latest_keylen(le), NULL, FALSE,
&val, &valb, le_latest_vallen(le), NULL, FALSE);
assert(r==0);
brt_cursor_set_key_val(cursor, &key, &val);
return brt_cursor_copyout(cursor, outkey, outval);
}
}
return -1;
}
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0!=(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_next_shortcut(cursor, outkey, outval)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger); return brt_cursor_search(cursor, &search, outkey, outval, logger);
} }
...@@ -3045,6 +3098,9 @@ static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) { ...@@ -3045,6 +3098,9 @@ static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
} }
static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) { static int brt_cursor_next_nodup(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0==(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_next_shortcut(cursor, outkey, outval)==0)
return 0;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_nodup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_next_nodup, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger); return brt_cursor_search(cursor, &search, outkey, outval, logger);
} }
...@@ -3247,8 +3303,6 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, DBT *key, u_ ...@@ -3247,8 +3303,6 @@ static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, DBT *key, u_
struct cmd_leafval_bessel_extra be = {brt, &cmd, 0}; struct cmd_leafval_bessel_extra be = {brt, &cmd, 0};
u_int32_t idx; u_int32_t idx;
int r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be, 0, &idx, NULL); int r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be, 0, &idx, NULL);
// TODO: Check for r==ENOMEM if the last argument (cursor) is not NULL
// (history: we changed find_zero to support cursor, and now find_zero can fail if the cursor cannot grow)
*less += idx; *less += idx;
if (r==0 && (brt->flags & TOKU_DB_DUP)) { if (r==0 && (brt->flags & TOKU_DB_DUP)) {
// There is something, and so we now want to find the rightmost extent. // There is something, and so we now want to find the rightmost extent.
...@@ -3340,13 +3394,9 @@ static int move_it (OMTVALUE lev, u_int32_t idx, void *v) { ...@@ -3340,13 +3394,9 @@ static int move_it (OMTVALUE lev, u_int32_t idx, void *v) {
return 0; return 0;
} }
u_int32_t last_total_size_needed;
struct mempool original_mp;
// Compress things, and grow the mempool if needed. // Compress things, and grow the mempool if needed.
static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_size) { static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_size) {
u_int32_t total_size_needed = memp->free_offset-memp->frag_size + added_size; u_int32_t total_size_needed = memp->free_offset-memp->frag_size + added_size;
last_total_size_needed = total_size_needed;
if (total_size_needed+total_size_needed/4 >= memp->size) { if (total_size_needed+total_size_needed/4 >= memp->size) {
memp->size = total_size_needed+total_size_needed/4; memp->size = total_size_needed+total_size_needed/4;
} }
...@@ -3366,11 +3416,10 @@ static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_siz ...@@ -3366,11 +3416,10 @@ static int omt_compress_kvspace (OMT omt, struct mempool *memp, size_t added_siz
void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size) { void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size) {
void *v = toku_mempool_malloc(mp, size, 1); void *v = toku_mempool_malloc(mp, size, 1);
if (v==0) { if (v==0) {
original_mp = *mp; if (0 == omt_compress_kvspace(omt, mp, size)) {
int r = omt_compress_kvspace(omt, mp, size);
assert(r==0);
v = toku_mempool_malloc(mp, size, 1); v = toku_mempool_malloc(mp, size, 1);
assert(v); assert(v);
} }
}
return v; return v;
} }
...@@ -1039,7 +1039,7 @@ int toku_txn_note_brt (TOKUTXN txn, BRT brt) { ...@@ -1039,7 +1039,7 @@ int toku_txn_note_brt (TOKUTXN txn, BRT brt) {
static int remove_brt (OMTVALUE txnv, u_int32_t UU(idx), void *brtv) { static int remove_brt (OMTVALUE txnv, u_int32_t UU(idx), void *brtv) {
TOKUTXN txn = txnv; TOKUTXN txn = txnv;
BRT brt = brtv; BRT brt = brtv;
OMTVALUE brtv_again=0; // TODO BBB This is not entirely safe. Verify initialization needed. OMTVALUE brtv_again=NULL;
u_int32_t index; u_int32_t index;
int r = toku_omt_find_zero(txn->open_brts, find_filenum, brt, &brtv_again, &index, NULL); int r = toku_omt_find_zero(txn->open_brts, find_filenum, brt, &brtv_again, &index, NULL);
assert(r==0); assert(r==0);
...@@ -1058,7 +1058,7 @@ int toku_txn_note_close_brt (BRT brt) { ...@@ -1058,7 +1058,7 @@ int toku_txn_note_close_brt (BRT brt) {
static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) { static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
BRT brt = brtv; BRT brt = brtv;
TOKUTXN txn = txnv; TOKUTXN txn = txnv;
OMTVALUE txnv_again=0; // TODO BBB This is not entirely safe. Verify initialization needed. OMTVALUE txnv_again=NULL;
u_int32_t index; u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_ptr, txn, &txnv_again, &index, NULL); int r = toku_omt_find_zero(brt->txns, find_ptr, txn, &txnv_again, &index, NULL);
assert(r==0); assert(r==0);
......
...@@ -39,7 +39,7 @@ struct omt { ...@@ -39,7 +39,7 @@ struct omt {
}; };
//Initial max size of root-to-leaf path //Initial max size of root-to-leaf path
#define TOKU_OMTCURSOR_INITIAL_SIZE 64 #define TOKU_OMTCURSOR_INITIAL_SIZE 4
// Cursor for order maintenance tree // Cursor for order maintenance tree
struct omtcursor { struct omtcursor {
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <errno.h> #include <errno.h>
#include <sys/types.h> #include <sys/types.h>
typedef void *OMTVALUE;
#include "omt.h" #include "omt.h"
#include "omt-internal.h" #include "omt-internal.h"
#include "../newbrt/memory.h" #include "../newbrt/memory.h"
......
...@@ -116,7 +116,6 @@ ...@@ -116,7 +116,6 @@
// The programming API: // The programming API:
typedef void *OMTVALUE;
typedef struct omt *OMT; typedef struct omt *OMT;
typedef struct omtcursor *OMTCURSOR; typedef struct omtcursor *OMTCURSOR;
......
...@@ -55,7 +55,7 @@ static int do_insertion (enum brt_cmd_type type, TXNID xid, FILENUM filenum, BYT ...@@ -55,7 +55,7 @@ static int do_insertion (enum brt_cmd_type type, TXNID xid, FILENUM filenum, BYT
data data
? toku_fill_dbt(&data_dbt, data->data, data->len) ? toku_fill_dbt(&data_dbt, data->data, data->len)
: toku_init_dbt(&data_dbt) }}; : toku_init_dbt(&data_dbt) }};
OMTVALUE brtv=NULL; // TODO BBB This is not entirely safe. Verify initialization needed. OMTVALUE brtv=NULL;
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL); r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL);
if (r==DB_NOTFOUND) { if (r==DB_NOTFOUND) {
......
...@@ -68,7 +68,9 @@ REGRESSION_TESTS = \ ...@@ -68,7 +68,9 @@ REGRESSION_TESTS = \
log-test5 \ log-test5 \
log-test6 \ log-test6 \
memtest \ memtest \
omt-cursor-test \
omt-test \ omt-test \
shortcut \
test-assert \ test-assert \
test-brt-delete-both \ test-brt-delete-both \
test-brt-overflow \ test-brt-overflow \
......
#include <sys/types.h>
#include <assert.h>
typedef struct value *OMTVALUE;
#include "omt.h"
enum { N=10 };
struct value { int x; } vs[N];
OMTVALUE ps[N];
static void test (void) {
OMT o;
OMTCURSOR curs, curs2, curs3;
int i, r;
OMTVALUE v;
for (i=0; i<N; i++) {
vs[i].x=i;
ps[i]=&vs[i];
}
// destroy the omt first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs);
// destroy the cursor first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
assert(v->x==5);
r = toku_omt_cursor_next(curs, &v);
assert(r==0 && v->x==6);
r = toku_omt_cursor_prev(curs, &v);
assert(r==0 && v->x==5);
toku_omt_cursor_destroy(&curs);
toku_omt_destroy(&o);
// Create two cursors, destroy omt first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
r = toku_omt_cursor_create(&curs2); assert(r==0);
r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0);
r = toku_omt_cursor_next(curs, &v); assert(r==0 && v->x==6);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs);
toku_omt_cursor_destroy(&curs2);
// Create two cursors, destroy them first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
r = toku_omt_cursor_create(&curs2); assert(r==0);
r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0);
r = toku_omt_cursor_next(curs, &v); assert(r==0 && v->x==6);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && v->x==3);
toku_omt_cursor_destroy(&curs);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && v->x==2);
toku_omt_cursor_destroy(&curs2);
toku_omt_destroy(&o);
// Create three cursors, destroy them first
r = toku_omt_create_from_sorted_array(&o, ps, 10); assert(r==0);
r = toku_omt_cursor_create(&curs); assert(r==0);
r = toku_omt_fetch(o, 5, &v, curs); assert(r==0);
r = toku_omt_cursor_create(&curs2); assert(r==0);
r = toku_omt_fetch(o, 4, &v, curs2); assert(r==0);
r = toku_omt_cursor_create(&curs3); assert(r==0);
r = toku_omt_fetch(o, 9, &v, curs3); assert(r==0);
r = toku_omt_cursor_next(curs, &v); assert(r==0 && v->x==6);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && v->x==3);
r = toku_omt_cursor_next(curs3, &v); assert(r!=0 && !toku_omt_cursor_is_valid(curs3));
toku_omt_cursor_destroy(&curs);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && v->x==2);
r = toku_omt_cursor_prev(curs2, &v); assert(r==0 && v->x==1);
r = toku_omt_fetch(o, 1, &v, curs3); assert(r==0);
r = toku_omt_cursor_prev(curs3, &v); assert(r==0 && v->x==0);
r = toku_omt_cursor_prev(curs3, &v); assert(r!=0 && !toku_omt_cursor_is_valid(curs3));
toku_omt_cursor_destroy(&curs2);
toku_omt_destroy(&o);
toku_omt_cursor_destroy(&curs3);
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
test();
return 0;
}
...@@ -3,8 +3,8 @@ ...@@ -3,8 +3,8 @@
#include <errno.h> #include <errno.h>
#include <sys/types.h> #include <sys/types.h>
// typedef struct value *TESTVALUE; typedef struct value *OMTVALUE;
typedef struct value *TESTVALUE; typedef OMTVALUE TESTVALUE;
#include "omt.h" #include "omt.h"
#include "../newbrt/memory.h" #include "../newbrt/memory.h"
#include "../newbrt/toku_assert.h" #include "../newbrt/toku_assert.h"
...@@ -221,7 +221,7 @@ void test_create_from_sorted_array(enum create_type create_choice, enum close_wh ...@@ -221,7 +221,7 @@ void test_create_from_sorted_array(enum create_type create_choice, enum close_wh
omt = NULL; omt = NULL;
if (create_choice == BATCH_INSERT) { if (create_choice == BATCH_INSERT) {
r = toku_omt_create_from_sorted_array(&omt, (OMTVALUE) values, length); r = toku_omt_create_from_sorted_array(&omt, values, length);
CKERR(r); CKERR(r);
} }
else if (create_choice == INSERT_AT) { else if (create_choice == INSERT_AT) {
...@@ -257,7 +257,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) { ...@@ -257,7 +257,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) {
for (i = 0; i < len; i++) { for (i = 0; i < len; i++) {
assert(oldv!=val[i]); assert(oldv!=val[i]);
v = NULL; v = NULL;
r = toku_omt_fetch(omtree, i, (OMTVALUE) &v, NULL); r = toku_omt_fetch(omtree, i, &v, NULL);
CKERR(r); CKERR(r);
assert(v != NULL); assert(v != NULL);
assert(v != oldv); assert(v != oldv);
...@@ -265,7 +265,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) { ...@@ -265,7 +265,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) {
assert(v->number == val[i]->number); assert(v->number == val[i]->number);
v = oldv; v = oldv;
r = toku_omt_fetch(omtree, i, (OMTVALUE) &v, c); r = toku_omt_fetch(omtree, i, &v, c);
CKERR(r); CKERR(r);
assert(v != NULL); assert(v != NULL);
assert(v != oldv); assert(v != oldv);
...@@ -274,7 +274,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) { ...@@ -274,7 +274,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) {
assert(toku_omt_cursor_is_valid(c)); assert(toku_omt_cursor_is_valid(c));
v = oldv; v = oldv;
r = toku_omt_cursor_current(c, (OMTVALUE) &v); r = toku_omt_cursor_current(c, &v);
CKERR(r); CKERR(r);
assert(v != NULL); assert(v != NULL);
assert(v != oldv); assert(v != oldv);
...@@ -284,7 +284,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) { ...@@ -284,7 +284,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) {
v = oldv; v = oldv;
j = i + 1; j = i + 1;
while ((r = toku_omt_cursor_next(c, (OMTVALUE) &v)) == 0) { while ((r = toku_omt_cursor_next(c, &v)) == 0) {
assert(toku_omt_cursor_is_valid(c)); assert(toku_omt_cursor_is_valid(c));
assert(v != NULL); assert(v != NULL);
assert(v != oldv); assert(v != oldv);
...@@ -298,7 +298,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) { ...@@ -298,7 +298,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) {
assert(oldv!=val[i]); assert(oldv!=val[i]);
v = NULL; v = NULL;
r = toku_omt_fetch(omtree, i, (OMTVALUE) &v, c); r = toku_omt_fetch(omtree, i, &v, c);
CKERR(r); CKERR(r);
assert(v != NULL); assert(v != NULL);
assert(v != oldv); assert(v != oldv);
...@@ -307,7 +307,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) { ...@@ -307,7 +307,7 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) {
v = oldv; v = oldv;
j = i - 1; j = i - 1;
while ((r = toku_omt_cursor_prev(c, (OMTVALUE) &v)) == 0) { while ((r = toku_omt_cursor_prev(c, &v)) == 0) {
assert(toku_omt_cursor_is_valid(c)); assert(toku_omt_cursor_is_valid(c));
assert(v != NULL); assert(v != NULL);
assert(v != oldv); assert(v != oldv);
...@@ -323,11 +323,11 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) { ...@@ -323,11 +323,11 @@ void test_fetch_verify (OMT omtree, TESTVALUE* val, u_int32_t len ) {
for (i = len; i < len*2; i++) { for (i = len; i < len*2; i++) {
v = oldv; v = oldv;
r = toku_omt_fetch(omtree, i, (OMTVALUE) &v, NULL); r = toku_omt_fetch(omtree, i, &v, NULL);
CKERR2(r, ERANGE); CKERR2(r, ERANGE);
assert(v == oldv); assert(v == oldv);
v = NULL; v = NULL;
r = toku_omt_fetch(omtree, i, (OMTVALUE) &v, c); r = toku_omt_fetch(omtree, i, &v, c);
CKERR2(r, ERANGE); CKERR2(r, ERANGE);
assert(v == NULL); assert(v == NULL);
} }
...@@ -355,10 +355,10 @@ int iterate_helper(TESTVALUE v, u_int32_t idx, void* extra) { ...@@ -355,10 +355,10 @@ int iterate_helper(TESTVALUE v, u_int32_t idx, void* extra) {
void test_iterate_verify(OMT omtree, TESTVALUE* vals, u_int32_t len) { void test_iterate_verify(OMT omtree, TESTVALUE* vals, u_int32_t len) {
int r; int r;
iterate_helper_error_return = 0; iterate_helper_error_return = 0;
r = toku_omt_iterate(omtree, (OMTVALUE) iterate_helper, (void*)vals); r = toku_omt_iterate(omtree, iterate_helper, (void*)vals);
CKERR(r); CKERR(r);
iterate_helper_error_return = 0xFEEDABBA; iterate_helper_error_return = 0xFEEDABBA;
r = toku_omt_iterate(omtree, (OMTVALUE) iterate_helper, NULL); r = toku_omt_iterate(omtree, iterate_helper, NULL);
if (!len) { if (!len) {
CKERR2(r, 0); CKERR2(r, 0);
} }
...@@ -473,7 +473,7 @@ void test_create_insert(enum close_when_done close) { ...@@ -473,7 +473,7 @@ void test_create_insert(enum close_when_done close) {
u_int32_t idx = UINT32_MAX; u_int32_t idx = UINT32_MAX;
assert(length==toku_omt_size(omt)); assert(length==toku_omt_size(omt));
r = toku_omt_insert(omt, to_insert, (OMTVALUE) insert_helper, to_insert, &idx); r = toku_omt_insert(omt, to_insert, insert_helper, to_insert, &idx);
CKERR(r); CKERR(r);
assert(idx <= length); assert(idx <= length);
if (idx > 0) { if (idx > 0) {
...@@ -493,7 +493,7 @@ void test_create_insert(enum close_when_done close) { ...@@ -493,7 +493,7 @@ void test_create_insert(enum close_when_done close) {
test_iterate_verify(omt, values, length); test_iterate_verify(omt, values, length);
idx = UINT32_MAX; idx = UINT32_MAX;
r = toku_omt_insert(omt, to_insert, (OMTVALUE) insert_helper, to_insert, &idx); r = toku_omt_insert(omt, to_insert, insert_helper, to_insert, &idx);
CKERR2(r, DB_KEYEXIST); CKERR2(r, DB_KEYEXIST);
assert(idx < length); assert(idx < length);
assert(values[idx]->number == to_insert->number); assert(values[idx]->number == to_insert->number);
...@@ -678,10 +678,10 @@ void test_find_dir(int dir, void* extra, int (*h)(OMTVALUE, void*), ...@@ -678,10 +678,10 @@ void test_find_dir(int dir, void* extra, int (*h)(OMTVALUE, void*),
omt_val = NULL; omt_val = NULL;
if (dir == 0) { if (dir == 0) {
r = toku_omt_find_zero(omt, h, extra, (OMTVALUE) &omt_val, &idx, c); r = toku_omt_find_zero(omt, h, extra, &omt_val, &idx, c);
} }
else { else {
r = toku_omt_find( omt, h, extra, dir, (OMTVALUE) &omt_val, &idx, c); r = toku_omt_find( omt, h, extra, dir, &omt_val, &idx, c);
} }
CKERR2(r, r_expect); CKERR2(r, r_expect);
if (idx_will_change) { if (idx_will_change) {
...@@ -704,10 +704,10 @@ void test_find_dir(int dir, void* extra, int (*h)(OMTVALUE, void*), ...@@ -704,10 +704,10 @@ void test_find_dir(int dir, void* extra, int (*h)(OMTVALUE, void*),
TESTVALUE tmp; TESTVALUE tmp;
assert(idx_will_change); assert(idx_will_change);
omt_val_curs = NULL; omt_val_curs = NULL;
r = toku_omt_cursor_current(c, (OMTVALUE) &omt_val_curs); r = toku_omt_cursor_current(c, &omt_val_curs);
CKERR(r); CKERR(r);
assert(toku_omt_cursor_is_valid(c)); assert(toku_omt_cursor_is_valid(c));
r = toku_omt_fetch(omt, idx, (OMTVALUE) &tmp, NULL); r = toku_omt_fetch(omt, idx, &tmp, NULL);
CKERR(r); CKERR(r);
if (found) assert(tmp==omt_val); if (found) assert(tmp==omt_val);
assert(omt_val_curs != NULL); assert(omt_val_curs != NULL);
...@@ -742,10 +742,10 @@ void test_find_dir(int dir, void* extra, int (*h)(OMTVALUE, void*), ...@@ -742,10 +742,10 @@ void test_find_dir(int dir, void* extra, int (*h)(OMTVALUE, void*),
omt_val = NULL; omt_val = NULL;
idx = old_idx; idx = old_idx;
if (dir == 0) { if (dir == 0) {
r = toku_omt_find_zero(omt, h, extra, (OMTVALUE) &omt_val, 0, NULL); r = toku_omt_find_zero(omt, h, extra, &omt_val, 0, NULL);
} }
else { else {
r = toku_omt_find( omt, h, extra, dir, (OMTVALUE) &omt_val, 0, NULL); r = toku_omt_find( omt, h, extra, dir, &omt_val, 0, NULL);
} }
CKERR2(r, r_expect); CKERR2(r, r_expect);
assert(idx == old_idx); assert(idx == old_idx);
......
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include "brt.h"
#include "key.h"
static const char fname[]= __FILE__ ".brt";
static TOKUTXN const null_txn = 0;
CACHETABLE ct;
BRT brt;
BRT_CURSOR cursor;
static int test_brt_cursor_keycompare(DB *db __attribute__((unused)), const DBT *a, const DBT *b) {
return toku_keycompare(a->data, a->size, b->data, b->size);
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
int r;
DB a_db;
DB *db = &a_db;
DBT key,val;
unlink(fname);
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
r = toku_open_brt(fname, 0, 1, &brt, 1<<12, ct, null_txn, test_brt_cursor_keycompare, db); assert(r==0);
r = toku_brt_cursor(brt, &cursor, 0); assert(r==0);
int i;
for (i=0; i<1000; i++) {
char string[100];
snprintf(string, sizeof(string), "%04d", i);
r = toku_brt_insert(brt, toku_fill_dbt(&key, string, 5), toku_fill_dbt(&val, string, 5), 0); assert(r==0);
}
r = toku_brt_cursor_get(cursor, &key, &val, DB_NEXT, null_txn); assert(r==0);
assert(strcmp(key.data, "0000")==0);
assert(strcmp(val.data, "0000")==0);
r = toku_brt_cursor_get(cursor, &key, &val, DB_NEXT, null_txn); assert(r==0);
assert(strcmp(key.data, "0001")==0);
assert(strcmp(val.data, "0001")==0);
// This will invalidate due to the root counter bumping, but the OMT itself will still be valid.
r = toku_brt_insert(brt, toku_fill_dbt(&key, "d", 2), toku_fill_dbt(&val, "w", 2), 0); assert(r==0);
r = toku_brt_cursor_get(cursor, &key, &val, DB_NEXT, null_txn); assert(r==0);
assert(strcmp(key.data, "0002")==0);
assert(strcmp(val.data, "0002")==0);
r = toku_brt_cursor_close(cursor); assert(r==0);
r = toku_close_brt(brt, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment