Commit 3e3a8f5d authored by Yoni Fogel's avatar Yoni Fogel

Addresses #993

Merge branch 993 back into main.

git-svn-id: file:///svn/tokudb@5141 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6e4e5048
......@@ -251,7 +251,8 @@ struct __toku_dbc {
int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
void* __toku_dummy0[10];
int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction);
void* __toku_dummy0[9];
char __toku_dummy1[104];
int (*c_close) (DBC *); /* 32-bit offset=188 size=4, 64=bit offset=272 size=8 */
int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=192 size=4, 64=bit offset=280 size=8 */
......
......@@ -267,7 +267,8 @@ struct __toku_dbc {
int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
void* __toku_dummy0[8];
int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction);
void* __toku_dummy0[7];
char __toku_dummy1[112];
int (*c_close) (DBC *); /* 32-bit offset=188 size=4, 64=bit offset=264 size=8 */
int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=192 size=4, 64=bit offset=272 size=8 */
......
......@@ -272,7 +272,8 @@ struct __toku_dbc {
int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
void* __toku_dummy0[10];
int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction);
void* __toku_dummy0[9];
char __toku_dummy1[104];
int (*c_close) (DBC *); /* 32-bit offset=188 size=4, 64=bit offset=272 size=8 */
int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=192 size=4, 64=bit offset=280 size=8 */
......
......@@ -271,7 +271,8 @@ struct __toku_dbc {
int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
void* __toku_dummy0[14];
int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction);
void* __toku_dummy0[13];
char __toku_dummy1[104];
int (*c_close) (DBC *); /* 32-bit offset=204 size=4, 64=bit offset=304 size=8 */
int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=208 size=4, 64=bit offset=312 size=8 */
......
......@@ -11,9 +11,9 @@ extern "C" {
#define TOKUDB 1
#define DB_VERSION_MAJOR 4
#define DB_VERSION_MINOR 6
#define DB_VERSION_PATCH 21
#define DB_VERSION_PATCH 19
#ifndef _TOKUDB_WRAP_H
#define DB_VERSION_STRING "Tokutek: TokuDB 4.6.21"
#define DB_VERSION_STRING "Tokutek: TokuDB 4.6.19"
#else
#define DB_VERSION_STRING_ydb "Tokutek: TokuDB (wrapped bdb)"
#endif
......@@ -276,7 +276,8 @@ struct __toku_dbc {
int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
void* __toku_dummy0[24];
int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction);
void* __toku_dummy0[23];
char __toku_dummy1[104];
int (*c_close) (DBC *); /* 32-bit offset=244 size=4, 64=bit offset=384 size=8 */
int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=248 size=4, 64=bit offset=392 size=8 */
......
......@@ -330,6 +330,9 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
"int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *)",
"int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *)",
"int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *)",
"int (*c_getf_heavi)(DBC *, u_int32_t, "
"void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, "
"int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction)",
NULL};
assert(sizeof(dbc_fields32)==sizeof(dbc_fields64));
print_struct("dbc", 1, dbc_fields32, dbc_fields64, sizeof(dbc_fields32)/sizeof(dbc_fields32[0]), extra);
......
......@@ -272,7 +272,8 @@ struct __toku_dbc {
int (*c_getf_current)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_first)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
int (*c_getf_last)(DBC *, u_int32_t, void(*)(DBT const *, DBT const *, void *), void *);
void* __toku_dummy0[10];
int (*c_getf_heavi)(DBC *, u_int32_t, void(*f)(DBT const *, DBT const *, void *, int), void *extra_f, int (*h)(const DBT *key, const DBT *value, void *extra_h), void *extra_h, int direction);
void* __toku_dummy0[9];
char __toku_dummy1[104];
int (*c_close) (DBC *); /* 32-bit offset=188 size=4, 64=bit offset=272 size=8 */
int (*c_count) (DBC *, db_recno_t *, u_int32_t); /* 32-bit offset=192 size=4, 64=bit offset=280 size=8 */
......
......@@ -3322,6 +3322,64 @@ get_next:;
return -1;
}
int toku_brt_cursor_peek_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
int rr = toku_read_and_pin_brt_header(cursor->brt->cf, &cursor->brt->h);
if (rr!=0) return rr;
uint64_t h_counter = cursor->brt->h->root_put_counter;
rr = toku_unpin_brt_header(cursor->brt);
assert(rr==0);
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
u_int32_t index = 0;
int r = toku_omt_cursor_current_index(cursor->omtcursor, &index);
assert(r==0);
OMT omt = toku_omt_cursor_get_omt(cursor->omtcursor);
get_prev:;
if (index>0) {
r = toku_omt_fetch(omt, --index, &le, NULL);
if (r==0) {
if (le_is_provdel(le)) goto get_prev;
toku_fill_dbt(outkey, le_latest_key(le), le_latest_keylen(le));
toku_fill_dbt(outval, le_latest_val(le), le_latest_vallen(le));
return 0;
}
}
}
return -1;
}
int toku_brt_cursor_peek_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval) {
if (toku_omt_cursor_is_valid(cursor->omtcursor)) {
{
int rr = toku_read_and_pin_brt_header(cursor->brt->cf, &cursor->brt->h);
if (rr!=0) return rr;
uint64_t h_counter = cursor->brt->h->root_put_counter;
rr = toku_unpin_brt_header(cursor->brt);
assert(rr==0);
if (h_counter != cursor->root_put_counter) return -1;
}
OMTVALUE le;
u_int32_t index = UINT32_MAX;
int r = toku_omt_cursor_current_index(cursor->omtcursor, &index);
assert(r==0);
OMT omt = toku_omt_cursor_get_omt(cursor->omtcursor);
get_next:;
if (++index<toku_omt_size(omt)) {
r = toku_omt_fetch(omt, index, &le, NULL);
if (r==0) {
if (le_is_provdel(le)) goto get_next;
toku_fill_dbt(outkey, le_latest_key(le), le_latest_keylen(le));
toku_fill_dbt(outval, le_latest_val(le), le_latest_vallen(le));
return 0;
}
}
}
return -1;
}
static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0!=(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_next_shortcut(cursor, outkey, outval)==0)
......@@ -3330,6 +3388,12 @@ static int brt_cursor_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGG
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
int toku_brt_cursor_after(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_next, BRT_SEARCH_LEFT, key, val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_compare_next_nodup(brt_search_t *search, DBT *x, DBT *y) {
BRT brt = search->context; y = y;
return compare_k_x(brt, search->k, x) < 0; /* return min x: k < x */
......@@ -3413,6 +3477,12 @@ get_prev:;
return -1;
}
int toku_brt_cursor_before(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_prev, BRT_SEARCH_RIGHT, key, val, cursor->brt);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static int brt_cursor_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKULOGGER logger) {
if (0!=(cursor->brt->flags & TOKU_DB_DUP) &&
brt_cursor_prev_shortcut(cursor, outkey, outval)==0)
......@@ -3543,6 +3613,34 @@ int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, int get_flags, T
return r;
}
static int brt_cursor_compare_heavi(brt_search_t *search, DBT *x, DBT *y) {
HEAVI_WRAPPER wrapper = search->context;
int r = wrapper->h(x, y, wrapper->extra_h);
// wrapper->r_h must have the same signus as the final chosen element.
// it is initialized to -1 or 1. 0's are closer to the min (max) that we
// want so once we hit 0 we keep it.
if (r==0) wrapper->r_h = 0;
return (search->direction&BRT_SEARCH_LEFT) ? r>=0 : r<=0;
}
//We pass in toku_dbt_fake to the search functions, since it will not pass the
//key(or val) to the heaviside function if key(or val) is NULL.
//It is not used for anything else,
//the actual 'extra' information for the heaviside function is inside the
//wrapper.
static const DBT __toku_dbt_fake;
static const DBT* const toku_dbt_fake = &__toku_dbt_fake;
int toku_brt_cursor_get_heavi (BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn, int direction, HEAVI_WRAPPER wrapper) {
TOKULOGGER logger = toku_txn_logger(txn);
brt_search_t search; brt_search_init(&search, brt_cursor_compare_heavi,
direction < 0 ? BRT_SEARCH_RIGHT : BRT_SEARCH_LEFT,
(DBT*)toku_dbt_fake,
cursor->brt->flags & TOKU_DB_DUPSORT ? (DBT*)toku_dbt_fake : NULL,
wrapper);
return brt_cursor_search(cursor, &search, outkey, outval, logger);
}
static void toku_brt_keyrange_internal (BRT brt, CACHEKEY nodename, u_int32_t fullhash, DBT *key, u_int64_t *less, u_int64_t *equal, u_int64_t *greater) {
BRTNODE node;
{
......
......@@ -49,6 +49,17 @@ int toku_verify_brt (BRT brt);
typedef struct brt_cursor *BRT_CURSOR;
int toku_brt_cursor (BRT, BRT_CURSOR*, int is_temporary_cursor);
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int brtc_flags, TOKUTXN);
struct heavi_wrapper {
int (*h)(const DBT *key, const DBT *value, void *extra_h);
void *extra_h;
int r_h;
};
typedef struct heavi_wrapper *HEAVI_WRAPPER;
int toku_brt_cursor_get_heavi (BRT_CURSOR cursor, DBT *outkey, DBT *outval, TOKUTXN txn, int direction, HEAVI_WRAPPER wrapper);
int toku_brt_cursor_peek_prev(BRT_CURSOR cursor, DBT *outkey, DBT *outval);
int toku_brt_cursor_peek_next(BRT_CURSOR cursor, DBT *outkey, DBT *outval);
int toku_brt_cursor_before(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn);
int toku_brt_cursor_after(BRT_CURSOR cursor, DBT *key, DBT *val, DBT *outkey, DBT *outval, TOKUTXN txn);
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN);
int toku_brt_cursor_close (BRT_CURSOR curs);
BOOL toku_brt_cursor_uninitialized(BRT_CURSOR c);
......
......@@ -80,6 +80,10 @@ int toku_omt_cursor_create (OMTCURSOR *omtcp) {
return 0;
}
OMT toku_omt_cursor_get_omt(OMTCURSOR c) {
return c->omt;
}
void toku_omt_cursor_invalidate (OMTCURSOR c) {
if (c==NULL || c->omt==NULL) return;
if (c->next == c) {
......@@ -625,6 +629,12 @@ int toku_omt_cursor_current (OMTCURSOR c, OMTVALUE *v) {
return r;
}
int toku_omt_cursor_current_index(OMTCURSOR c, u_int32_t *index) {
if (c->omt == NULL) return EINVAL;
*index = c->index;
return 0;
}
//TODO: Put all omt API functions here.
int toku_omt_create (OMT *omtp) {
return omt_create_internal(omtp, 2);
......
......@@ -417,6 +417,19 @@ int toku_omt_cursor_next (OMTCURSOR c, OMTVALUE *v);
// Performance: time=O(log N) worst case, expected time=O(1) for a randomly
// chosen initial position.
int toku_omt_cursor_current_index(OMTCURSOR c, u_int32_t *index);
// Effect: Stores c's offset in *index.
// Requires: index != NULL
// Returns
// 0 success
// EINVAL c is invalid
// On nonzero return, *index is unchanged and c is unchanged.
// Performance: time=O(1)
OMT toku_omt_cursor_get_omt(OMTCURSOR c);
// Effect: returns the associated omt or NULL if not associated.
// Performance: time=O(1)
int toku_omt_cursor_current (OMTCURSOR c, OMTVALUE *v);
// Effect: Store in v the value pointed by c's abstract offset
// Requires: v != NULL
......
......@@ -9,6 +9,37 @@
#include "test.h"
struct heavi_extra {
DBT key;
DBT val;
DB* db;
};
int heavi_after(const DBT *key, const DBT *val, void *extra) {
//Assumes cmp is int_dbt_cmp
struct heavi_extra *info = extra;
int cmp = int_dbt_cmp(info->db, key, &info->key);
if (cmp!=0) return cmp;
if (!val) return -1;
cmp = int_dbt_cmp(info->db, val, &info->val);
return cmp<=0 ? -1 : 0;
//Returns <0 for too small/equal
//Returns 0 for greater, but with the same key
//Returns >0 for greater with different key
}
int heavi_before(const DBT *key, const DBT *val, void *extra) {
struct heavi_extra *info = extra;
int cmp = int_dbt_cmp(info->db, key, &info->key);
if (cmp!=0) return cmp;
if (!val) return +1;
cmp = int_dbt_cmp(info->db, val, &info->val);
return cmp>=0 ? 1 : 0;
//Returns >0 for too large/equal
//Returns 0 for smaller with same key
//returns -1 for smaller with different key
}
// ENVDIR is defined in the Makefile
int dbtcmp(DBT *dbt1, DBT *dbt2) {
......@@ -593,6 +624,162 @@ void test_current(u_int32_t dup_flags) {
close_dbs();
}
struct dbt_pair {
DBT key;
DBT val;
};
struct int_pair {
int key;
int val;
};
int got_r_h;
void f_heavi(DBT const *key, DBT const *val, void *extra_f, int r_h) {
struct int_pair *info = extra_f;
if (r_h==0) got_r_h = 0;
assert(key->size == 4);
assert(val->size == 4);
info->key = *(int*)key->data;
info->val = *(int*)val->data;
}
void cget_heavi(BOOL success, BOOL find, char txn, int _key, int _val,
int _key_expect, int _val_expect, int direction,
int r_h_expect,
int (*h)(const DBT*,const DBT*,void*)) {
#if defined(USE_BDB)
return;
#else
assert(txns[(int)txn] && cursors[(int)txn]);
int r;
struct heavi_extra input;
struct int_pair output;
dbt_init(&input.key, &_key, sizeof(int));
dbt_init(&input.val, &_val, sizeof(int));
input.db = db;
output.key = 0;
output.val = 0;
got_r_h = direction;
r = cursors[(int)txn]->c_getf_heavi(cursors[(int)txn], 0, //No prelocking
f_heavi, &output,
h, &input, direction);
if (!success) {
CKERR2s(r, DB_LOCK_DEADLOCK, DB_LOCK_NOTGRANTED);
return;
}
if (!find) {
CKERR2s(r, DB_NOTFOUND, DB_KEYEMPTY);
return;
}
CKERR(r);
assert(got_r_h == r_h_expect);
assert(output.key == _key_expect);
assert(output.val == _val_expect);
#endif
}
void test_heavi(u_int32_t dup_flags) {
/* ********************************************************************** */
setup_dbs(dup_flags);
cget_heavi(TRUE, FALSE, 'a', 0, 0, 0, 0, 1, 0, heavi_after);
cget_heavi(TRUE, FALSE, 'a', 0, 0, 0, 0, -1, 0, heavi_before);
close_dbs();
/* ********************************************************************** */
//Not found locks left to right (with empty db == entire db)
setup_dbs(dup_flags);
cget_heavi(TRUE, FALSE, 'a', 0, 0, 0, 0, 1, 0, heavi_after);
put(FALSE, 'b', 7, 6);
put(FALSE, 'b', -1, -1);
put(TRUE, 'a', 4, 4);
early_commit('a');
put(TRUE, 'b', 7, 6);
put(TRUE, 'b', -1, -1);
close_dbs();
/* ********************************************************************** */
//Not found locks left to right (with empty db == entire db)
setup_dbs(dup_flags);
cget_heavi(TRUE, FALSE, 'a', 0, 0, 0, 0, -1, 0, heavi_before);
put(FALSE, 'b', 7, 6);
put(FALSE, 'b', -1, -1);
put(TRUE, 'a', 4, 4);
early_commit('a');
put(TRUE, 'b', 7, 6);
put(TRUE, 'b', -1, -1);
close_dbs();
/* ********************************************************************** */
//Duplicate mode behaves differently.
setup_dbs(dup_flags);
int k,v;
for (k = 10; k <= 100; k+= 10) {
v = k+5;
put(TRUE, 'a', k, v);
}
if (dup_flags) {
cget_heavi(TRUE, TRUE, 'a', 100, 0, 100, 105, 1, 0, heavi_after);
}
else {
cget_heavi(TRUE, FALSE, 'a', 100, 0, 0, 0, 1, 0, heavi_after);
}
close_dbs();
/* ********************************************************************** */
//Locks stop at actual elements in the DB.
setup_dbs(dup_flags);
//int k,v;
for (k = 10; k <= 100; k+= 10) {
v = k+5;
put(TRUE, 'a', k, v);
}
cget_heavi(TRUE, FALSE, 'a', 105, 1, 0, 0, 1, 0, heavi_after);
put(FALSE, 'b', 104, 1);
put(FALSE, 'b', 105, 0);
put(FALSE, 'b', 105, 1);
put(FALSE, 'b', 105, 2);
put(FALSE, 'b', 106, 0);
put(TRUE, 'b', 99, 0);
put(dup_flags!=0, 'b', 100, 104);
close_dbs();
/* ********************************************************************** */
// Test behavior of heavi_after
setup_dbs(dup_flags);
//int k,v;
for (k = 10; k <= 100; k+= 10) {
v = k+5;
put(TRUE, 'a', k, v);
}
for (k = 5; k <= 95; k+= 10) {
v = k+5;
cget_heavi(TRUE, TRUE, 'a', k, v, k+5, v+5, 1, 1, heavi_after);
}
put(FALSE, 'b', -1, -2);
put(TRUE, 'b', 200, 201);
cget_heavi(FALSE, FALSE, 'a', 105, 105, 0, 0, 1, 0, heavi_after);
close_dbs();
/* ********************************************************************** */
// Test behavior of heavi_before
setup_dbs(dup_flags);
//int k,v;
for (k = 10; k <= 100; k+= 10) {
v = k+5;
put(TRUE, 'a', k, v);
}
for (k = 105; k >= 15; k-= 10) {
v = k+5;
cget_heavi(TRUE, TRUE, 'a', k, v, k-5, v-5, -1, -1, heavi_before);
}
put(FALSE, 'b', 200, 201);
put(TRUE, 'b', -1, -2);
cget_heavi(FALSE, FALSE, 'a', -5, -5, 0, 0, -1, 0, heavi_after);
close_dbs();
}
void test(u_int32_t dup_flags) {
/* ********************************************************************** */
setup_dbs(dup_flags);
......@@ -637,6 +824,9 @@ void test(u_int32_t dup_flags) {
test_dbdel(dup_flags);
/* ********************************************************************** */
test_current(dup_flags);
/* ********************************************************************** */
test_heavi(dup_flags);
/* ********************************************************************** */
}
......
......@@ -92,6 +92,146 @@ static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is
/* lightweight cursor methods. */
static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra);
static int toku_c_getf_heavi(DBC *c, u_int32_t flags,
void(*f)(DBT const *key, DBT const *value, void *extra_f, int r_h),
void *extra_f,
int (*h)(const DBT *key, const DBT *value, void *extra_h),
void *extra_h, int direction);
// There is a total order on all key/value pairs in the database.
// In a DB_DUPSORT db, let V_i = (Key,Value) refer to the ith element (0 based indexing).
// In a NODUP db, let V_i = (Key) refer to the ith element (0 based indexing).
// We define V_{-1} = -\infty and
// V_{|V|} = \infty and
// h(-\infty,extra_h) = -1 by definition and
// h( \infty,extra_h) = 1 by definition
// Requires: Direction != 0
// Effect:
// if direction >0 then find the smallest i such that h(V_i,extra_h)>=0.
// if direction <0 then find the largest i such that h(V_i,extra_h)<=0.
// Let signus(r_h) = signus(h(V_i, extra_h))
// If flags&(DB_PRELOCKED|DB_PRELOCKED_WRITE) then skip locking
// That is, we already own the locks
// else
// if direction >0 then readlock [V_{i-1}, V_i]
// if direction <0 then readlock [V_i, V_{i+1}]
// That is, If we search from the right, lock the element we found, up to the
// next element to the right.
// If locking fails, return the locking error code
//
// If (0<=i<|V|) then
// call f(V_i.Key, V_i.Value, extra_f, r_h)
// Note: The lifetime of V_i.Key and V_i.Value is limited: they may only
// be referenced until f returns
// and return 0
// else
// return DB_NOTFOUND
// Rationale: Locking
// If we approach from the left (direction<0) we need to prevent anyone
// from inserting anything to our right that could change our answer,
// so we lock the range from the element found, to the next element to the right.
// The inverse argument applies for approaching from the right.
// Rationale: passing r_h to f
// We want to save the performance hit of requiring f to call h again to
// find out what h's return value was.
// Rationale: separate extra_f, extra_h parameters
// If the same extra parameter is sent to both f and h, then you need a
// special struct for each tuple (f_i, h_i) you use instead of a struct for each
// f_i and each h_i.
// Requires: The signum of h is monotically increasing.
// Requires: f does not create references to key, value, or data within once f
// exits
// Returns
// 0 success
// DB_NOTFOUND i is not in [0,|V|)
// DB_LOCK_NOTGRANTED Failed to obtain a lock.
// On nonzero return, what c points to becomes undefined, That is, c becomes uninitialized
// Performance: ... TODO
// Implementation Notes:
// How do we get the extra locking information efficiently?
// After finding the target, we can copy the cursor, do a DB_NEXT,
// or do a DB_NEXT+DB_PREV (vice versa for direction<0).
// Can we have the BRT provide two key/value pairs instead of one?
// That is, brt_cursor_c_getf_heavi_and_next for direction >0
// and brt_cursor_c_getf_heavi_and_prev for direction <0
// Current suggestion is to make a copy of the cursor, and use the
// copy to find the next(prev) element by using DB_NEXT(DB_PREV).
// This has the overhead of needing to make a copy of the cursor,
// which probably has a memcpy involved.
// The argument against returning two key/value pairs is that
// we should not have to pay to retreive both when we're doing something
// simple like DB_NEXT.
// This could be mitigated by having two BRT functions (or one with a
// BOOL parameter) such that it only returns two values when necessary.
// Parameters
// c The cursor
// flags Additional bool parameters. The current allowed flags are
// DB_PRELOCKED and DB_PRELOCKED_WRITE (bitwise or'd to use both)
// h A heaviside function that, along with direction, defines the query.
// extra_h is passed to h
// For additional information on heaviside functions, see omt.h
// NOTE: In a DB_DUPSORT database, both key and value will be
// passed to h. In a NODUP database, only key will be passed to h.
// f A callback function (i.e. smart dbts) to provide the result of the
// query. key and value are the key/value pair found, extra_f is
// passed to f, r_h is the return value for h for the key and value returned.
// This is used as output. That is, we call f with the outputs of the
// function.
// direction Which direction to search in on the heaviside function. >0
// means from the right, <0 means from the left.
// extra_f Any extra information required for f
// extra_h Any extra information required for h
//
// Example:
// Find the smallest V_i = (key_i,val_i) such that key_i > key_x, assume
// key.data and val.data are c strings, and print them out.
// Create a struct to hold key_x, that is extra_h
// Direction = 1 (We approach from the right, and want the smallest such
// element).
// Construct a heaviside function that returns >=0 if the
// given key > key_x, and -1 otherwise
// That is, call the comparison function on (key, key_x)
// Create a struct to hold key_x, that is extra_f
// construct f to call printf on key_x.data, key_i.data, val_i.data.
// Find the least upper bound (greatest lower bound)
// In this case, h can just return the comparison function's answer.
// direction >0 means upper bound, direction <0 means lower bound.
// (If you want upper/lower bound of the keyvalue pair, you need
// to call the comparison function on the values if the key comparison
// returns 0).
// Handlerton implications:
// The handlerton needs at most one heaviside function per special query type (where a
// special query is one that is not directly supported by the bdb api excluding
// this function).
// It is possible that more than query type can use the same heaviside function
// if the extra_h parameter can be used to change its behavior sufficiently.
//
// That is, part of extra_h can be a boolean strictly_greater
// You can construct a single heaviside function that converts 0 to -1
// (strictly greater) from the comparison function, or one that just returns
// the results of the comparison function (greater or equal).
//
// Implementation Notes:
// The BRT search function supports the following searches:
// SEARCH_LEFT(h(V_i))
// Given a step function b, that goes from 0 to 1
// find the greatest i such that h_b(V_i) == 1
// If it does not exist, return not found
// SEARCH_RIGHT(h(V_i))
// Given a step function b, that goes from 1 to 0
// find the smallest i such that h_b(V_i) == 1
// If it does not exist, return not found
// We can implement c_getf_heavi using these BRT search functions.
// A query of direction<0:
// Create wrapper function B
// return h(V_i) <=0 ? 1 : 0;
// SEARCH_RIGHT(B)
// A query of direction>0:
// Create wrapper function B
// return h(V_i) >=0 ? 1 : 0;
// SEARCH_LEFT(B)
// Effect: Lightweight cursor get
/* cursor methods */
static int toku_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag);
static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag);
......@@ -1639,7 +1779,6 @@ static int toku_c_getf_next_old(DBC *c, u_int32_t flag, void(*f)(DBT const *key,
static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) {
HANDLE_PANICKED_DB(c->dbp);
//int prelocked = flag & DB_PRELOCKED;
if (toku_c_uninitialized(c)) return toku_c_getf_next_old(c, flag, f, extra); //return toku_c_getf_first(c, flag, f, extra);
u_int32_t lock_flags = get_prelocked_flags(flag);
flag &= ~lock_flags;
......@@ -1684,6 +1823,130 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT
return r;
}
static int locked_c_getf_heavi(DBC *c, u_int32_t flags,
void(*f)(DBT const *key, DBT const *value, void *extra_f, int r_h),
void *extra_f,
int (*h)(const DBT *key, const DBT *value, void *extra_h),
void *extra_h, int direction) {
toku_ydb_lock(); int r = toku_c_getf_heavi(c, flags, f, extra_f, h, extra_h, direction); toku_ydb_unlock(); return r;
}
static int toku_c_getf_heavi(DBC *c, u_int32_t flags,
void(*f)(DBT const *key, DBT const *value, void *extra_f, int r_h),
void *extra_f,
int (*h)(const DBT *key, const DBT *value, void *extra_h),
void *extra_h, int direction) {
if (direction==0) return EINVAL;
DBC *tmp_c = NULL;
int r;
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
assert(flags==0);
struct heavi_wrapper wrapper;
wrapper.h = h;
wrapper.extra_h = extra_h;
wrapper.r_h = direction;
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
int c_get_result = toku_brt_cursor_get_heavi(c->i->c, NULL, NULL, txn, direction, &wrapper);
if (c_get_result!=0 && c_get_result!=DB_NOTFOUND) { r = c_get_result; goto cleanup; }
BOOL found = c_get_result==0;
DB *db=c->dbp;
toku_lock_tree* lt = db->i->lt;
if (lt!=NULL && !lock_flags) {
DBT tmp_key;
DBT tmp_val;
DBT *left_key, *left_val, *right_key, *right_val;
if (direction<0) {
if (!found) {
r = toku_brt_cursor_get(c->i->c, NULL, NULL, DB_FIRST, txn);
if (r!=0 && r!=DB_NOTFOUND) goto cleanup;
if (r==DB_NOTFOUND) right_key = right_val = (DBT*)toku_lt_infinity;
else {
//Restore cursor to the 'uninitialized' state it was just in.
brt_cursor_restore_state_from_prev(c->i->c);
right_key = brt_cursor_peek_prev_key(c->i->c);
right_val = brt_cursor_peek_prev_val(c->i->c);
}
left_key = left_val = (DBT*)toku_lt_neg_infinity;
}
else {
left_key = brt_cursor_peek_current_key(c->i->c);
left_val = brt_cursor_peek_current_val(c->i->c);
//Try to find right end the fast way.
r = toku_brt_cursor_peek_next(c->i->c, &tmp_key, &tmp_val);
if (r==0) {
right_key = &tmp_key;
right_val = &tmp_val;
}
else {
//Find the right end the slow way.
if ((r = toku_db_cursor(c->dbp, c->i->txn, &tmp_c, 0, 0))) goto cleanup;
r=toku_brt_cursor_after(tmp_c->i->c, left_key, left_val,
NULL, NULL, txn);
if (r!=0 && r!=DB_NOTFOUND) goto cleanup;
if (r==DB_NOTFOUND) right_key = right_val = (DBT*)toku_lt_infinity;
else {
right_key = brt_cursor_peek_current_key(tmp_c->i->c);
right_val = brt_cursor_peek_current_val(tmp_c->i->c);
}
}
}
}
else {
//direction>0
if (!found) {
r = toku_brt_cursor_get(c->i->c, NULL, NULL, DB_LAST, txn);
if (r!=0 && r!=DB_NOTFOUND) goto cleanup;
if (r==DB_NOTFOUND) left_key = left_val = (DBT*)toku_lt_neg_infinity;
else {
//Restore cursor to the 'uninitialized' state it was just in.
brt_cursor_restore_state_from_prev(c->i->c);
left_key = brt_cursor_peek_prev_key(c->i->c);
left_val = brt_cursor_peek_prev_val(c->i->c);
}
right_key = right_val = (DBT*)toku_lt_infinity;
}
else {
right_key = brt_cursor_peek_current_key(c->i->c);
right_val = brt_cursor_peek_current_val(c->i->c);
//Try to find left end the fast way.
r=toku_brt_cursor_peek_prev(c->i->c, &tmp_key, &tmp_val);
if (r==0) {
left_key = &tmp_key;
left_val = &tmp_val;
}
else {
//Find the left end the slow way.
if ((r = toku_db_cursor(c->dbp, c->i->txn, &tmp_c, 0, 0))) goto cleanup;
r=toku_brt_cursor_before(tmp_c->i->c, right_key, right_val,
NULL, NULL, txn);
if (r==DB_NOTFOUND) left_key = left_val = (DBT*)toku_lt_neg_infinity;
else {
left_key = brt_cursor_peek_current_key(tmp_c->i->c);
left_val = brt_cursor_peek_current_val(tmp_c->i->c);
}
}
}
}
DB_TXN* txn_anc = toku_txn_ancestor(c->i->txn);
TXNID id_anc = toku_txn_get_txnid(txn_anc->i->tokutxn);
if ((r = toku_txn_add_lt(txn_anc, lt))) goto cleanup;
r = toku_lt_acquire_range_read_lock(lt, db, id_anc,
left_key, left_val,
right_key, right_val);
if (r!=0) goto cleanup;
}
if (found) {
f(brt_cursor_peek_current_key(c->i->c), brt_cursor_peek_current_val(c->i->c), extra_f, wrapper.r_h);
}
r = c_get_result;
cleanup:;
int r2 = 0;
if (tmp_c) r2 = toku_c_close(tmp_c);
return r ? r : r2;
}
static int toku_c_close(DBC * c) {
int r = toku_brt_cursor_close(c->i->c);
toku_free(c->i);
......@@ -1987,13 +2250,16 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int
if (result == 0)
return ENOMEM;
memset(result, 0, sizeof *result);
result->c_get = locked_c_get;
result->c_pget = locked_c_pget;
result->c_put = locked_c_put;
result->c_close = locked_c_close;
result->c_del = locked_c_del;
result->c_count = locked_c_count;
result->c_getf_next = locked_c_getf_next;
#define SCRS(name) result->name = locked_ ## name
SCRS(c_get);
SCRS(c_pget);
SCRS(c_put);
SCRS(c_close);
SCRS(c_del);
SCRS(c_count);
SCRS(c_getf_next);
SCRS(c_getf_heavi);
#undef SCRS
MALLOC(result->i);
assert(result->i);
result->dbp = db;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment