Commit a11783d2 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Merge the streamlining changes from #911. Fixes #911.

git-svn-id: file:///svn/tokudb@4500 c7de825b-a66e-492c-adef-691d508d4ae1
parent a26e3379
...@@ -189,8 +189,8 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE, struct kv_pair *); // Given th ...@@ -189,8 +189,8 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE, struct kv_pair *); // Given th
struct brt_cursor { struct brt_cursor {
struct list cursors_link; struct list cursors_link;
BRT brt; BRT brt;
DBT key; DBT key, val; // The key-value pair that the cursor currently points to
DBT val; DBT prevkey, prevval; // The key-value pair that the cursor pointed to previously. (E.g., when we do a DB_NEXT)
int is_temporary_cursor; // If it is a temporary cursor then use the following skey and sval to return tokudb-managed values in dbts. Otherwise use the brt's skey and skval. int is_temporary_cursor; // If it is a temporary cursor then use the following skey and sval to return tokudb-managed values in dbts. Otherwise use the brt's skey and skval.
void *skey, *sval; void *skey, *sval;
OMTCURSOR omtcursor; OMTCURSOR omtcursor;
......
...@@ -2838,16 +2838,12 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOK ...@@ -2838,16 +2838,12 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOK
} }
static inline void dbt_cleanup(DBT *dbt) { static inline void dbt_cleanup(DBT *dbt) {
if (dbt->data && (dbt->flags & DB_DBT_MALLOC)) { if (dbt->data && ( (dbt->flags & DB_DBT_REALLOC)
|| (dbt->flags & DB_DBT_MALLOC))) {
toku_free_n(dbt->data, dbt->size); dbt->data = 0; toku_free_n(dbt->data, dbt->size); dbt->data = 0;
} }
} }
static inline void brt_cursor_cleanup(BRT_CURSOR cursor) {
dbt_cleanup(&cursor->key);
dbt_cleanup(&cursor->val);
}
static inline int brt_cursor_not_set(BRT_CURSOR cursor) { static inline int brt_cursor_not_set(BRT_CURSOR cursor) {
return cursor->key.data == 0 || cursor->val.data == 0; return cursor->key.data == 0 || cursor->val.data == 0;
} }
...@@ -2856,24 +2852,15 @@ BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) { ...@@ -2856,24 +2852,15 @@ BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c) {
return brt_cursor_not_set(c); return brt_cursor_not_set(c);
} }
static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *newval) {
brt_cursor_cleanup(cursor);
cursor->key = *newkey; memset(newkey, 0, sizeof *newkey);
cursor->val = *newval; memset(newval, 0, sizeof *newval);
}
/* Used to restore the state of a cursor. */
void brt_cursor_set_key_val_manually(BRT_CURSOR cursor, DBT* key, DBT* val) {
brt_cursor_set_key_val(cursor, key, val);
}
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) { int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor); BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0) if (cursor == 0)
return ENOMEM; return ENOMEM;
cursor->brt = brt; cursor->brt = brt;
toku_init_dbt(&cursor->key); toku_init_dbt(&cursor->key); cursor->key.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->val); toku_init_dbt(&cursor->val); cursor->val.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevkey); cursor->prevkey.flags = DB_DBT_REALLOC;
toku_init_dbt(&cursor->prevval); cursor->prevval.flags = DB_DBT_REALLOC;
list_push(&brt->cursors, &cursor->cursors_link); list_push(&brt->cursors, &cursor->cursors_link);
cursor->is_temporary_cursor=is_temporary_cursor; cursor->is_temporary_cursor=is_temporary_cursor;
cursor->skey = cursor->sval = 0; cursor->skey = cursor->sval = 0;
...@@ -2885,7 +2872,10 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) { ...@@ -2885,7 +2872,10 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
} }
int toku_brt_cursor_close(BRT_CURSOR cursor) { int toku_brt_cursor_close(BRT_CURSOR cursor) {
brt_cursor_cleanup(cursor); dbt_cleanup(&cursor->key);
dbt_cleanup(&cursor->val);
dbt_cleanup(&cursor->prevkey);
dbt_cleanup(&cursor->prevval);
if (cursor->skey) toku_free(cursor->skey); if (cursor->skey) toku_free(cursor->skey);
if (cursor->sval) toku_free(cursor->sval); if (cursor->sval) toku_free(cursor->sval);
list_remove(&cursor->cursors_link); list_remove(&cursor->cursors_link);
...@@ -2894,6 +2884,34 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) { ...@@ -2894,6 +2884,34 @@ int toku_brt_cursor_close(BRT_CURSOR cursor) {
return 0; return 0;
} }
DBT *brt_cursor_peek_prev_key(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the previous key.
// Requires: The caller may not modify that DBT or the memory at which it points.
{
return &cursor->prevkey;
}
DBT *brt_cursor_peek_prev_val(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the previous val
// Requires: The caller may not modify that DBT or the memory at which it points.
{
return &cursor->prevval;
}
DBT *brt_cursor_peek_current_key(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the current key.
// Requires: The caller may not modify that DBT or the memory at which it points.
{
return &cursor->key;
}
DBT *brt_cursor_peek_current_val(BRT_CURSOR cursor)
// Effect: Return a pointer to a DBT for the current val
// Requires: The caller may not modify that DBT or the memory at which it points.
{
return &cursor->val;
}
static inline int compare_k_x(BRT brt, DBT *k, DBT *x) { static inline int compare_k_x(BRT brt, DBT *k, DBT *x) {
return brt->compare_fun(brt->db, k, x); return brt->compare_fun(brt->db, k, x);
} }
...@@ -2960,26 +2978,6 @@ int toku_brt_cursor_dbts_set_with_dat(BRT_CURSOR cursor, BRT pdb, ...@@ -2960,26 +2978,6 @@ int toku_brt_cursor_dbts_set_with_dat(BRT_CURSOR cursor, BRT pdb,
return r; return r;
} }
/* Used to save the state of a cursor. */
int brt_cursor_save_key_val(BRT_CURSOR cursor, DBT* key, DBT* val) {
if (brt_cursor_not_set(cursor)) {
if (key) { *key = cursor->key; }
if (val) { *val = cursor->val; }
return 0;
}
else {
assert(!key || key->flags == DB_DBT_MALLOC);
assert(!val || val->flags == DB_DBT_MALLOC);
int r;
if ((r = brt_cursor_copyout(cursor, key, val))) { return r; }
/* An initialized cursor cannot have NULL key->data or
* val->data. */
assert(key==NULL || key->data!=NULL);
assert(val==NULL || val->data!=NULL);
return 0;
}
}
static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) { static int brt_cursor_compare_set(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; BRT brt = search->context;
return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */ return compare_kv_xy(brt, search->k, search->v, x, y) <= 0; /* return min xy: kv <= xy */
...@@ -2990,8 +2988,8 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva ...@@ -2990,8 +2988,8 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva
return EINVAL; return EINVAL;
if (op == DB_CURRENT) { if (op == DB_CURRENT) {
int r = ENOSYS; int r = ENOSYS;
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC; DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_REALLOC;
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_REALLOC;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt); brt_search_t search; brt_search_init(&search, brt_cursor_compare_set, BRT_SEARCH_LEFT, &cursor->key, &cursor->val, cursor->brt);
r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter); r = toku_brt_search(cursor->brt, &search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter);
...@@ -3004,54 +3002,64 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva ...@@ -3004,54 +3002,64 @@ static int brt_cursor_current(BRT_CURSOR cursor, int op, DBT *outkey, DBT *outva
return brt_cursor_copyout(cursor, outkey, outval); return brt_cursor_copyout(cursor, outkey, outval);
} }
static void swap_dbts (DBT *a, DBT *b) {
DBT tmp=*a;
*a=*b;
*b=tmp;
}
static void swap_cursor_dbts (BRT_CURSOR cursor) {
swap_dbts(&cursor->prevkey, &cursor->key);
swap_dbts(&cursor->prevval, &cursor->val);
}
void brt_cursor_restore_state_from_prev(BRT_CURSOR cursor) {
toku_omt_cursor_invalidate(cursor->omtcursor);
swap_cursor_dbts(cursor);
}
/* search for the first kv pair that matches the search object */ /* search for the first kv pair that matches the search object */
static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) { static int brt_cursor_search(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC; assert(cursor->prevkey.flags == DB_DBT_REALLOC);
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; assert(cursor->prevval.flags == DB_DBT_REALLOC);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval); swap_cursor_dbts(cursor);
r = brt_cursor_copyout(cursor, outkey, outval); r = brt_cursor_copyout(cursor, outkey, outval);
} }
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r; return r;
} }
/* search for the kv pair that matches the search object and is equal to kv */ /* search for the kv pair that matches the search object and is equal to kv */
static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) { static int brt_cursor_search_eq_kv_xy(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC; assert(cursor->prevkey.flags == DB_DBT_REALLOC);
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; assert(cursor->prevval.flags == DB_DBT_REALLOC);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
if (compare_kv_xy(cursor->brt, search->k, search->v, &newkey, &newval) == 0) { if (compare_kv_xy(cursor->brt, search->k, search->v, &cursor->prevkey, &cursor->prevval) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval); swap_cursor_dbts(cursor);
r = brt_cursor_copyout(cursor, outkey, outval); r = brt_cursor_copyout(cursor, outkey, outval);
} else } else {
r = DB_NOTFOUND; r = DB_NOTFOUND;
}
} }
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r; return r;
} }
/* search for the kv pair that matches the search object and is equal to k */ /* search for the kv pair that matches the search object and is equal to k */
static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) { static int brt_cursor_search_eq_k_x(BRT_CURSOR cursor, brt_search_t *search, DBT *outkey, DBT *outval, TOKULOGGER logger) {
DBT newkey; toku_init_dbt(&newkey); newkey.flags = DB_DBT_MALLOC; assert(cursor->prevkey.flags == DB_DBT_REALLOC);
DBT newval; toku_init_dbt(&newval); newval.flags = DB_DBT_MALLOC; assert(cursor->prevval.flags == DB_DBT_REALLOC);
int r = toku_brt_search(cursor->brt, search, &newkey, &newval, logger, cursor->omtcursor, &cursor->root_put_counter); int r = toku_brt_search(cursor->brt, search, &cursor->prevkey, &cursor->prevval, logger, cursor->omtcursor, &cursor->root_put_counter);
if (r == 0) { if (r == 0) {
if (compare_k_x(cursor->brt, search->k, &newkey) == 0) { if (compare_k_x(cursor->brt, search->k, &cursor->prevkey) == 0) {
brt_cursor_set_key_val(cursor, &newkey, &newval); swap_cursor_dbts(cursor);
r = brt_cursor_copyout(cursor, outkey, outval); r = brt_cursor_copyout(cursor, outkey, outval);
} else } else
r = DB_NOTFOUND; r = DB_NOTFOUND;
} }
dbt_cleanup(&newkey);
dbt_cleanup(&newval);
return r; return r;
} }
...@@ -3094,15 +3102,18 @@ get_next:; ...@@ -3094,15 +3102,18 @@ get_next:;
int r = toku_omt_cursor_next(cursor->omtcursor, &le); int r = toku_omt_cursor_next(cursor->omtcursor, &le);
if (r==0) { if (r==0) {
if (le_is_provdel(le)) goto get_next; if (le_is_provdel(le)) goto get_next;
DBT key,val;
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC; assert(cursor->prevkey.flags == DB_DBT_REALLOC);
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC; assert(cursor->prevval.flags == DB_DBT_REALLOC);
bytevec keyb = le_latest_key(le); bytevec keyb = le_latest_key(le);
bytevec valb = le_latest_val(le); bytevec valb = le_latest_val(le);
r = toku_dbt_set_two_values(&key, &keyb, le_latest_keylen(le), NULL, FALSE, r = toku_dbt_set_two_values(&cursor->prevkey, &keyb, le_latest_keylen(le), NULL, FALSE,
&val, &valb, le_latest_vallen(le), NULL, FALSE); &cursor->prevval, &valb, le_latest_vallen(le), NULL, FALSE);
assert(r==0); assert(r==0);
brt_cursor_set_key_val(cursor, &key, &val);
swap_cursor_dbts(cursor);
return brt_cursor_copyout(cursor, outkey, outval); return brt_cursor_copyout(cursor, outkey, outval);
} }
} }
...@@ -3182,15 +3193,18 @@ get_prev:; ...@@ -3182,15 +3193,18 @@ get_prev:;
int r = toku_omt_cursor_prev(cursor->omtcursor, &le); int r = toku_omt_cursor_prev(cursor->omtcursor, &le);
if (r==0) { if (r==0) {
if (le_is_provdel(le)) goto get_prev; if (le_is_provdel(le)) goto get_prev;
DBT key,val;
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC; assert(cursor->prevkey.flags == DB_DBT_REALLOC);
toku_init_dbt(&val); val.flags = DB_DBT_MALLOC; assert(cursor->prevval.flags == DB_DBT_REALLOC);
bytevec keyb = le_latest_key(le); bytevec keyb = le_latest_key(le);
bytevec valb = le_latest_val(le); bytevec valb = le_latest_val(le);
r = toku_dbt_set_two_values(&key, &keyb, le_latest_keylen(le), NULL, FALSE, r = toku_dbt_set_two_values(&cursor->prevkey, &keyb, le_latest_keylen(le), NULL, FALSE,
&val, &valb, le_latest_vallen(le), NULL, FALSE); &cursor->prevval, &valb, le_latest_vallen(le), NULL, FALSE);
assert(r==0); assert(r==0);
brt_cursor_set_key_val(cursor, &key, &val);
swap_cursor_dbts(cursor);
return brt_cursor_copyout(cursor, outkey, outval); return brt_cursor_copyout(cursor, outkey, outval);
} }
} }
......
...@@ -53,6 +53,12 @@ int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN); ...@@ -53,6 +53,12 @@ int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN);
int toku_brt_cursor_close (BRT_CURSOR curs); int toku_brt_cursor_close (BRT_CURSOR curs);
BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c); BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c);
DBT *brt_cursor_peek_prev_key(BRT_CURSOR cursor);
DBT *brt_cursor_peek_prev_val(BRT_CURSOR cursor);
DBT *brt_cursor_peek_current_key(BRT_CURSOR cursor);
DBT *brt_cursor_peek_current_val(BRT_CURSOR cursor);
void brt_cursor_restore_state_from_prev(BRT_CURSOR cursor);
typedef struct brtenv *BRTENV; typedef struct brtenv *BRTENV;
int brtenv_checkpoint (BRTENV env); int brtenv_checkpoint (BRTENV env);
......
...@@ -1101,8 +1101,6 @@ typedef struct { ...@@ -1101,8 +1101,6 @@ typedef struct {
DB* db; // db the cursor is iterating over DB* db; // db the cursor is iterating over
DB_TXN* txn_anc; // The (root) ancestor of the transaction DB_TXN* txn_anc; // The (root) ancestor of the transaction
TXNID id_anc; TXNID id_anc;
DBT cursor_key; // Original position of cursor (key portion)
DBT cursor_val; // Original position of cursor (val portion)
DBT tmp_key; // Temporary key to protect out param DBT tmp_key; // Temporary key to protect out param
DBT tmp_val; // Temporary val to protect out param DBT tmp_val; // Temporary val to protect out param
DBT tmp_dat; // Temporary data val to protect out param DBT tmp_dat; // Temporary data val to protect out param
...@@ -1110,7 +1108,6 @@ typedef struct { ...@@ -1110,7 +1108,6 @@ typedef struct {
u_int32_t op; // The operation portion of the c_get flag u_int32_t op; // The operation portion of the c_get flag
u_int32_t lock_flags; // The prelock flags. u_int32_t lock_flags; // The prelock flags.
BOOL cursor_is_write; // Whether op can change position of cursor BOOL cursor_is_write; // Whether op can change position of cursor
BOOL cursor_was_saved; // Whether we saved the cursor yet.
BOOL key_is_read; BOOL key_is_read;
BOOL key_is_write; BOOL key_is_write;
BOOL val_is_read; BOOL val_is_read;
...@@ -1118,7 +1115,6 @@ typedef struct { ...@@ -1118,7 +1115,6 @@ typedef struct {
BOOL dat_is_read; BOOL dat_is_read;
BOOL dat_is_write; BOOL dat_is_write;
BOOL duplicates; BOOL duplicates;
BOOL cursor_malloced;
BOOL tmp_key_malloced; BOOL tmp_key_malloced;
BOOL tmp_val_malloced; BOOL tmp_val_malloced;
BOOL tmp_dat_malloced; BOOL tmp_dat_malloced;
...@@ -1299,7 +1295,7 @@ static int toku_c_get_save_inputs(C_GET_VARS* g, DBT* key, DBT* val) { ...@@ -1299,7 +1295,7 @@ static int toku_c_get_save_inputs(C_GET_VARS* g, DBT* key, DBT* val) {
return r; return r;
} }
static int toku_c_get_post_lock(C_GET_VARS* g, BOOL found, DBT* orig_key, DBT* orig_val) { static int toku_c_get_post_lock(C_GET_VARS* g, BOOL found, DBT* orig_key, DBT* orig_val, DBT *prev_key, DBT *prev_val) {
int r = ENOSYS; int r = ENOSYS;
toku_lock_tree* lt = g->db->i->lt; toku_lock_tree* lt = g->db->i->lt;
if (!lt) { r = 0; goto cleanup; } if (!lt) { r = 0; goto cleanup; }
...@@ -1353,8 +1349,8 @@ static int toku_c_get_post_lock(C_GET_VARS* g, BOOL found, DBT* orig_key, DBT* o ...@@ -1353,8 +1349,8 @@ static int toku_c_get_post_lock(C_GET_VARS* g, BOOL found, DBT* orig_key, DBT* o
case (DB_NEXT): case (DB_NEXT):
case (DB_NEXT_NODUP): case (DB_NEXT_NODUP):
assert(!toku_c_uninitialized(g->c)); assert(!toku_c_uninitialized(g->c));
key_l = &g->cursor_key; key_l = prev_key;
val_l = &g->cursor_val; val_l = prev_val;
key_r = found ? &g->tmp_key : toku_lt_infinity; key_r = found ? &g->tmp_key : toku_lt_infinity;
val_r = found ? &g->tmp_val : toku_lt_infinity; val_r = found ? &g->tmp_val : toku_lt_infinity;
break; break;
...@@ -1363,21 +1359,21 @@ static int toku_c_get_post_lock(C_GET_VARS* g, BOOL found, DBT* orig_key, DBT* o ...@@ -1363,21 +1359,21 @@ static int toku_c_get_post_lock(C_GET_VARS* g, BOOL found, DBT* orig_key, DBT* o
assert(!toku_c_uninitialized(g->c)); assert(!toku_c_uninitialized(g->c));
key_l = found ? &g->tmp_key : toku_lt_neg_infinity; key_l = found ? &g->tmp_key : toku_lt_neg_infinity;
val_l = found ? &g->tmp_val : toku_lt_neg_infinity; val_l = found ? &g->tmp_val : toku_lt_neg_infinity;
key_r = &g->cursor_key; key_r = prev_key;
val_r = &g->cursor_val; val_r = prev_val;
break; break;
case (DB_NEXT_DUP): case (DB_NEXT_DUP):
assert(!toku_c_uninitialized(g->c)); assert(!toku_c_uninitialized(g->c));
key_l = key_r = &g->cursor_key; key_l = key_r = prev_key;
val_l = &g->cursor_val; val_l = prev_val;
val_r = found ? &g->tmp_val : toku_lt_infinity; val_r = found ? &g->tmp_val : toku_lt_infinity;
break; break;
#ifdef DB_PREV_DUP #ifdef DB_PREV_DUP
case (DB_PREV_DUP): case (DB_PREV_DUP):
assert(!toku_c_uninitialized(g->c)); assert(!toku_c_uninitialized(g->c));
key_l = key_r = &g->cursor_key; key_l = key_r = prev_key;
val_l = found ? &g->tmp_val : toku_lt_neg_infinity; val_l = found ? &g->tmp_val : toku_lt_neg_infinity;
val_r = &g->cursor_val; val_r = prev_val;
break; break;
#endif #endif
default: default:
...@@ -1396,31 +1392,6 @@ cleanup: ...@@ -1396,31 +1392,6 @@ cleanup:
return r; return r;
} }
/* Used to save the state of a cursor. */
int brt_cursor_save_key_val(BRT_CURSOR cursor, DBT* key, DBT* val);
/* Used to restore the state of a cursor. */
void brt_cursor_set_key_val_manually(BRT_CURSOR cursor, DBT* key, DBT* val);
static int toku_c_get_save_cursor(C_GET_VARS* g) {
int r = ENOSYS;
if (!g->cursor_is_write) { r = 0; goto cleanup; }
if (!toku_c_uninitialized(g->c)) {
g->cursor_key.flags = DB_DBT_MALLOC;
g->cursor_val.flags = DB_DBT_MALLOC;
}
if ((r = brt_cursor_save_key_val(g->c->i->c, &g->cursor_key, &g->cursor_val))) goto cleanup;
if (!toku_c_uninitialized(g->c)) g->cursor_malloced = TRUE;
g->cursor_was_saved = TRUE;
r = 0;
cleanup:
return r;
}
static int toku_c_pget_save_cursor(C_GET_VARS* g) {
return toku_c_get_save_cursor(g);
}
static int toku_c_pget_assign_outputs(C_GET_VARS* g, DBT* key, DBT* val, DBT* dat) { static int toku_c_pget_assign_outputs(C_GET_VARS* g, DBT* key, DBT* val, DBT* dat) {
int r = ENOSYS; int r = ENOSYS;
DBT* write_key = g->key_is_write ? key : NULL; DBT* write_key = g->key_is_write ? key : NULL;
...@@ -1484,8 +1455,6 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * val, u_int32_t flag) ...@@ -1484,8 +1455,6 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * val, u_int32_t flag)
/* Determine whether the key and val parameters are read, write, /* Determine whether the key and val parameters are read, write,
* or both. */ * or both. */
if ((r = toku_c_get_describe_inputs(&g))) goto cleanup; if ((r = toku_c_get_describe_inputs(&g))) goto cleanup;
/* Save the cursor position if the op can modify the cursor position. */
if ((r = toku_c_get_save_cursor(&g))) goto cleanup;
/* Save key and value to temporary local versions. */ /* Save key and value to temporary local versions. */
if ((r = toku_c_get_save_inputs(&g, key, val))) goto cleanup; if ((r = toku_c_get_save_inputs(&g, key, val))) goto cleanup;
/* Run the cursor operation on the brt. */ /* Run the cursor operation on the brt. */
...@@ -1498,27 +1467,17 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * val, u_int32_t flag) ...@@ -1498,27 +1467,17 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * val, u_int32_t flag)
if (r!=0 && r!=DB_NOTFOUND) goto cleanup; if (r!=0 && r!=DB_NOTFOUND) goto cleanup;
/* If we have not yet locked, lock now. */ /* If we have not yet locked, lock now. */
BOOL found = r_cursor_op==0; BOOL found = r_cursor_op==0;
r = toku_c_get_post_lock(&g, found, key, val); r = toku_c_get_post_lock(&g, found, key, val,
if (r!=0) goto cleanup; found ? brt_cursor_peek_prev_key(c->i->c) : brt_cursor_peek_current_key(c->i->c),
found ? brt_cursor_peek_prev_val(c->i->c) : brt_cursor_peek_current_key(c->i->c));
if (r!=0) {
if (g.cursor_is_write && r_cursor_op==0) brt_cursor_restore_state_from_prev(c->i->c);
goto cleanup;
}
/* if found, write the outputs to the output parameters. */ /* if found, write the outputs to the output parameters. */
if (found && (r = toku_c_get_assign_outputs(&g, key, val))) goto cleanup; if (found && (r = toku_c_get_assign_outputs(&g, key, val))) goto cleanup;
r = r_cursor_op; r = r_cursor_op;
cleanup: cleanup:
if (g.cursor_was_saved && g.cursor_malloced) {
/* We saved the cursor. We either need to restore it, or free
* the saved version. */
if (r!=0 && r!=DB_NOTFOUND) {
/* Failure since 0 and DB_NOTFOUND are 'successes';
* Restore the cursor. */
brt_cursor_set_key_val_manually(c->i->c, &g.cursor_key, &g.cursor_val);
/* cursor_key/val will be zeroed out. */
}
else {
/* Delete the saved cursor. */
if (g.cursor_key.data) toku_free(g.cursor_key.data);
if (g.cursor_val.data) toku_free(g.cursor_val.data);
}
}
/* Cleanup temporary keys. */ /* Cleanup temporary keys. */
if (g.tmp_key.data && g.tmp_key_malloced) toku_free(g.tmp_key.data); if (g.tmp_key.data && g.tmp_key_malloced) toku_free(g.tmp_key.data);
if (g.tmp_val.data && g.tmp_val_malloced) toku_free(g.tmp_val.data); if (g.tmp_val.data && g.tmp_val_malloced) toku_free(g.tmp_val.data);
...@@ -1573,23 +1532,15 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag) ...@@ -1573,23 +1532,15 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag)
/* Initialize variables. */ /* Initialize variables. */
g.c = c; g.c = c;
g.db = c->dbp; g.db = c->dbp;
g.flag = flag;
unsigned int brtflags; unsigned int brtflags;
toku_brt_get_flags(g.db->i->brt, &brtflags); toku_brt_get_flags(g.db->i->brt, &brtflags);
g.duplicates = (brtflags & TOKU_DB_DUPSORT) != 0; g.duplicates = (brtflags & TOKU_DB_DUPSORT) != 0;
/* Standardize the op flag. */
toku_c_pget_fix_flags(&g);
/* Determine whether the key, val, and data, parameters are read, write,
* or both. */
if ((r = toku_c_pget_describe_inputs(&g))) goto cleanup;
/* The 'key' from C_GET_VARS is the secondary key, and the 'val' /* The 'key' from C_GET_VARS is the secondary key, and the 'val'
* from C_GET_VARS is the primary key. The 'data' parameter here * from C_GET_VARS is the primary key. The 'data' parameter here
* is ALWAYS write-only */ * is ALWAYS write-only */
/* Save the cursor position if the op can modify the cursor position. */ int r_cursor_op;
if ((r = toku_c_pget_save_cursor(&g))) goto cleanup;;
if (0) { if (0) {
delete_silently_and_retry: delete_silently_and_retry:
...@@ -1606,39 +1557,38 @@ delete_silently_and_retry: ...@@ -1606,39 +1557,38 @@ delete_silently_and_retry:
memset(&g.tmp_dat, 0, sizeof(g.tmp_dat)); memset(&g.tmp_dat, 0, sizeof(g.tmp_dat));
g.tmp_dat_malloced = FALSE; g.tmp_dat_malloced = FALSE;
/* Silently delete and re-run. */ /* Silently delete and re-run. */
if ((r = toku_c_del_noassociate(c, 0))) goto cleanup; if ((r = toku_c_del_noassociate(c, 0))) goto cleanup_after_actual_get;
if (g.cursor_is_write && r_cursor_op==0) brt_cursor_restore_state_from_prev(c->i->c);
} }
g.flag = flag;
/* Standardize the op flag. */
toku_c_pget_fix_flags(&g);
/* Determine whether the key, val, and data, parameters are read, write,
* or both. */
if ((r = toku_c_pget_describe_inputs(&g))) goto cleanup;
/* Save the inputs. */ /* Save the inputs. */
if ((r = toku_c_pget_save_inputs(&g, key, pkey, data))) goto cleanup; if ((r = toku_c_pget_save_inputs(&g, key, pkey, data))) goto cleanup;
if ((r = toku_c_get_noassociate(c, &g.tmp_key, &g.tmp_val, g.flag))) goto cleanup; if ((r_cursor_op = r = toku_c_get_noassociate(c, &g.tmp_key, &g.tmp_val, g.flag))) goto cleanup;
r = toku_db_get(pdb, c->i->txn, &g.tmp_val, &g.tmp_dat, 0); r = toku_db_get(pdb, c->i->txn, &g.tmp_val, &g.tmp_dat, 0);
if (r==DB_NOTFOUND) goto delete_silently_and_retry; if (r==DB_NOTFOUND) goto delete_silently_and_retry;
if (r!=0) goto cleanup; if (r!=0) {
cleanup_after_actual_get:
if (g.cursor_is_write && r_cursor_op==0) brt_cursor_restore_state_from_prev(c->i->c);
goto cleanup;
}
r = verify_secondary_key(g.db, &g.tmp_val, &g.tmp_dat, &g.tmp_key); r = verify_secondary_key(g.db, &g.tmp_val, &g.tmp_dat, &g.tmp_key);
if (r==DB_SECONDARY_BAD) goto delete_silently_and_retry; if (r==DB_SECONDARY_BAD) goto delete_silently_and_retry;
if (r!=0) goto cleanup; if (r!=0) goto cleanup_after_actual_get;
/* Atomically assign all 3 outputs. */ /* Atomically assign all 3 outputs. */
if ((r = toku_c_pget_assign_outputs(&g, key, pkey, data))) goto cleanup; if ((r = toku_c_pget_assign_outputs(&g, key, pkey, data))) goto cleanup_after_actual_get;
r = 0; r = 0;
cleanup: cleanup:
if (g.cursor_was_saved && g.cursor_malloced) {
/* We saved the cursor. We either need to restore it, or free
* the saved version. */
if (r!=0) {
/* Restore the cursor. */
brt_cursor_set_key_val_manually(c->i->c, &g.cursor_key, &g.cursor_val);
/* cursor_key/val will be zeroed out. */
}
else {
/* Delete the saved cursor. */
if (g.cursor_key.data) toku_free(g.cursor_key.data);
if (g.cursor_val.data) toku_free(g.cursor_val.data);
}
}
/* Cleanup temporary keys. */ /* Cleanup temporary keys. */
if (g.tmp_key.data && g.tmp_key_malloced) toku_free(g.tmp_key.data); if (g.tmp_key.data && g.tmp_key_malloced) toku_free(g.tmp_key.data);
if (g.tmp_val.data && g.tmp_val_malloced) toku_free(g.tmp_val.data); if (g.tmp_val.data && g.tmp_val_malloced) toku_free(g.tmp_val.data);
...@@ -1698,19 +1648,11 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT ...@@ -1698,19 +1648,11 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT
DBT key,val; DBT key,val;
memset(&key, 0, sizeof(key)); memset(&key, 0, sizeof(key));
memset(&val, 0, sizeof(val)); memset(&val, 0, sizeof(val));
DBT prevkey,prevval;
memset(&prevkey, 0, sizeof(key)); prevkey.flags = DB_DBT_MALLOC;
memset(&prevval, 0, sizeof(val)); prevval.flags = DB_DBT_MALLOC;
int r; int r;
DB *db=c->dbp; DB *db=c->dbp;
toku_lock_tree* lt = db->i->lt; toku_lock_tree* lt = db->i->lt;
BOOL do_locking = lt!=NULL && !lock_flags; BOOL do_locking = lt!=NULL && !lock_flags;
if (do_locking) {
r = brt_cursor_save_key_val(c->i->c, &prevkey, &prevval);
if (r!=0) goto cleanup;
}
unsigned int brtflags; unsigned int brtflags;
toku_brt_get_flags(db->i->brt, &brtflags); toku_brt_get_flags(db->i->brt, &brtflags);
...@@ -1721,11 +1663,15 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT ...@@ -1721,11 +1663,15 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT
if (c_get_result!=0 && c_get_result!=DB_NOTFOUND) { r = c_get_result; goto cleanup; } if (c_get_result!=0 && c_get_result!=DB_NOTFOUND) { r = c_get_result; goto cleanup; }
int found = c_get_result==0; int found = c_get_result==0;
if (do_locking) { if (do_locking) {
DBT *prevkey = found ? brt_cursor_peek_prev_key(c->i->c) : brt_cursor_peek_current_key(c->i->c);
DBT *prevval = found ? brt_cursor_peek_prev_val(c->i->c) : brt_cursor_peek_current_key(c->i->c);
DB_TXN *txn_anc = toku_txn_ancestor(c->i->txn); DB_TXN *txn_anc = toku_txn_ancestor(c->i->txn);
r = toku_txn_add_lt(txn_anc, lt); r = toku_txn_add_lt(txn_anc, lt);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
r = toku_lt_acquire_range_read_lock(lt, db, toku_txn_get_txnid(txn_anc->i->tokutxn), r = toku_lt_acquire_range_read_lock(lt, db, toku_txn_get_txnid(txn_anc->i->tokutxn),
&prevkey, &prevval, prevkey, prevval,
found ? &key : toku_lt_infinity, found ? &key : toku_lt_infinity,
found ? &val : toku_lt_infinity); found ? &val : toku_lt_infinity);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
...@@ -1735,8 +1681,6 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT ...@@ -1735,8 +1681,6 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT
} }
r = c_get_result; r = c_get_result;
cleanup: cleanup:
if (prevkey.data) toku_free(prevkey.data);
if (prevval.data) toku_free(prevval.data);
return r; return r;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment