Commit 2bf831f3 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1112 refs[t:1112] Implement 'flatten' function at brt layer

git-svn-id: file:///svn/toku/tokudb@14168 c7de825b-a66e-492c-adef-691d508d4ae1
parent d57d3fcc
......@@ -219,7 +219,8 @@ struct __toku_db {
int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
int (*getf_set)(DB*, DB_TXN*, u_int32_t, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_set without a persistent cursor) */;
int (*getf_get_both)(DB*, DB_TXN*, u_int32_t, DBT*, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_get_both without a persistent cursor) */;
void* __toku_dummy0[20];
int (*flatten)(DB*, DB_TXN*) /* Flatten a dictionary, similar to (but faster than) a table scan */;
void* __toku_dummy0[19];
char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=236 size=4, 64=bit offset=376 size=8 */
void* __toku_dummy2[5];
......
......@@ -230,7 +230,8 @@ struct __toku_db {
int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
int (*getf_set)(DB*, DB_TXN*, u_int32_t, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_set without a persistent cursor) */;
int (*getf_get_both)(DB*, DB_TXN*, u_int32_t, DBT*, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_get_both without a persistent cursor) */;
void* __toku_dummy0[23];
int (*flatten)(DB*, DB_TXN*) /* Flatten a dictionary, similar to (but faster than) a table scan */;
void* __toku_dummy0[22];
char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=248 size=4, 64=bit offset=400 size=8 */
void* __toku_dummy2[5];
......
......@@ -233,7 +233,8 @@ struct __toku_db {
int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
int (*getf_set)(DB*, DB_TXN*, u_int32_t, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_set without a persistent cursor) */;
int (*getf_get_both)(DB*, DB_TXN*, u_int32_t, DBT*, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_get_both without a persistent cursor) */;
void* __toku_dummy0[25];
int (*flatten)(DB*, DB_TXN*) /* Flatten a dictionary, similar to (but faster than) a table scan */;
void* __toku_dummy0[24];
char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=256 size=4, 64=bit offset=416 size=8 */
void* __toku_dummy2[5];
......
......@@ -233,7 +233,8 @@ struct __toku_db {
int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
int (*getf_set)(DB*, DB_TXN*, u_int32_t, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_set without a persistent cursor) */;
int (*getf_get_both)(DB*, DB_TXN*, u_int32_t, DBT*, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_get_both without a persistent cursor) */;
void* __toku_dummy0[28];
int (*flatten)(DB*, DB_TXN*) /* Flatten a dictionary, similar to (but faster than) a table scan */;
void* __toku_dummy0[27];
char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=268 size=4, 64=bit offset=440 size=8 */
void* __toku_dummy2[5];
......
......@@ -237,7 +237,8 @@ struct __toku_db {
int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
int (*getf_set)(DB*, DB_TXN*, u_int32_t, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_set without a persistent cursor) */;
int (*getf_get_both)(DB*, DB_TXN*, u_int32_t, DBT*, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_get_both without a persistent cursor) */;
void* __toku_dummy1[32];
int (*flatten)(DB*, DB_TXN*) /* Flatten a dictionary, similar to (but faster than) a table scan */;
void* __toku_dummy1[31];
char __toku_dummy2[80];
void *api_internal; /* 32-bit offset=276 size=4, 64=bit offset=464 size=8 */
void* __toku_dummy3[5];
......
......@@ -393,6 +393,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
"int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */",
"int (*getf_set)(DB*, DB_TXN*, u_int32_t, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_set without a persistent cursor) */",
"int (*getf_get_both)(DB*, DB_TXN*, u_int32_t, DBT*, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_get_both without a persistent cursor) */",
"int (*flatten)(DB*, DB_TXN*) /* Flatten a dictionary, similar to (but faster than) a table scan */",
NULL};
print_struct("db", 1, db_fields32, db_fields64, sizeof(db_fields32)/sizeof(db_fields32[0]), extra);
}
......
......@@ -204,6 +204,7 @@ struct __toku_db {
int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
int (*getf_set)(DB*, DB_TXN*, u_int32_t, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_set without a persistent cursor) */;
int (*getf_get_both)(DB*, DB_TXN*, u_int32_t, DBT*, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_get_both without a persistent cursor) */;
int (*flatten)(DB*, DB_TXN*) /* Flatten a dictionary, similar to (but faster than) a table scan */;
void *api_internal;
int (*close) (DB*, u_int32_t);
int (*cursor) (DB *, DB_TXN *, DBC **, u_int32_t);
......
......@@ -16,7 +16,7 @@
#endif
const char *pname;
enum run_mode { RUN_HWC, RUN_LWC, RUN_VERIFY, RUN_HEAVI, RUN_RANGE} run_mode = RUN_HWC;
enum run_mode { RUN_HWC, RUN_LWC, RUN_VERIFY, RUN_HEAVI, RUN_RANGE, RUN_FLATTEN} run_mode = RUN_HWC;
int do_txns=1, prelock=0, prelockflag=0;
u_int32_t lock_flag = 0;
long limitcount=-1;
......@@ -30,6 +30,7 @@ static int print_usage (const char *argv0) {
fprintf(stderr, "Usage:\n%s [--verify-lwc | --lwc | --nohwc] [--prelock] [--prelockflag] [--prelockwriteflag] [--env DIR]\n", argv0);
fprintf(stderr, " --hwc run heavy weight cursors (this is the default)\n");
fprintf(stderr, " --verify-lwc means to run the light weight cursor and the heavyweight cursor to verify that they get the same answer.\n");
fprintf(stderr, " --flatten Flatten only using special flatten function\n");
fprintf(stderr, " --lwc run light weight cursors instead of heavy weight cursors\n");
fprintf(stderr, " --prelock acquire a read lock on the entire table before running\n");
fprintf(stderr, " --prelockflag pass DB_PRELOCKED to the the cursor get operation whenever the locks have been acquired\n");
......@@ -70,6 +71,9 @@ static void parse_args (int argc, const char *argv[]) {
} else if (strcmp(*argv,"--verify-lwc")==0) {
if (specified_run_mode && run_mode!=RUN_VERIFY) { two_modes: fprintf(stderr, "You specified two run modes\n"); exit(1); }
run_mode = RUN_VERIFY;
} else if (strcmp(*argv, "--flatten")==0) {
if (specified_run_mode && run_mode!=RUN_FLATTEN) goto two_modes;
run_mode = RUN_FLATTEN;
} else if (strcmp(*argv, "--lwc")==0) {
if (specified_run_mode && run_mode!=RUN_LWC) goto two_modes;
run_mode = RUN_LWC;
......@@ -268,6 +272,19 @@ static void scanscan_lwc (void) {
printf("LWC Scan %lld bytes (%d rows) in %9.6fs at %9fMB/s\n", e.totalbytes, e.rowcounter, tdiff, 1e-6*e.totalbytes/tdiff);
}
}
static void scanscan_flatten (void) {
int r;
int counter=0;
for (counter=0; counter<n_experiments; counter++) {
double prevtime = gettime();
r = db->flatten(db, tid);
assert(r==0);
double thistime = gettime();
double tdiff = thistime-prevtime;
printf("Flatten Scan in %9.6fs\n", tdiff);
}
}
#endif
static void scanscan_range (void) {
......@@ -472,6 +489,7 @@ int main (int argc, const char *argv[]) {
case RUN_HWC: scanscan_hwc(); break;
#ifdef TOKUDB
case RUN_LWC: scanscan_lwc(); break;
case RUN_FLATTEN: scanscan_flatten(); break;
case RUN_VERIFY: scanscan_verify(); break;
case RUN_HEAVI: scanscan_heaviside(); break;
#endif
......
......@@ -204,6 +204,7 @@ struct __toku_db {
int (*set_descriptor) (DB*, u_int32_t version, const DBT* descriptor, toku_dbt_upgradef dbt_userformat_upgrade) /* set row/dictionary descriptor for a db. Available only while db is open */;
int (*getf_set)(DB*, DB_TXN*, u_int32_t, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_set without a persistent cursor) */;
int (*getf_get_both)(DB*, DB_TXN*, u_int32_t, DBT*, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_get_both without a persistent cursor) */;
int (*flatten)(DB*, DB_TXN*) /* Flatten a dictionary, similar to (but faster than) a table scan */;
void *api_internal;
int (*close) (DB*, u_int32_t);
int (*cursor) (DB *, DB_TXN *, DBC **, u_int32_t);
......
......@@ -3850,6 +3850,30 @@ toku_brt_cursor_current(BRT_CURSOR cursor, int op, BRT_GET_CALLBACK_FUNCTION get
return getf(cursor->key.size, cursor->key.data, cursor->val.size, cursor->val.data, getf_v); // brt_cursor_copyout(cursor, outkey, outval);
}
static int
brt_flatten_getf(ITEMLEN UU(keylen), bytevec UU(key),
ITEMLEN UU(vallen), bytevec UU(val),
void *UU(v)) {
return DB_NOTFOUND;
}
int
toku_brt_flatten(BRT brt, TOKULOGGER logger)
{
BRT_CURSOR tmp_cursor;
int r = toku_brt_cursor(brt, &tmp_cursor, logger);
if (r!=0) return r;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, tmp_cursor->brt);
r = brt_cursor_search(tmp_cursor, &search, brt_flatten_getf, NULL, logger);
if (r==DB_NOTFOUND) r = 0;
{
//Cleanup temporary cursor
int r2 = toku_brt_cursor_close(tmp_cursor);
if (r==0) r = r2;
}
return r;
}
int
toku_brt_cursor_first(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, TOKULOGGER logger)
{
......
......@@ -89,6 +89,7 @@ int toku_brt_cursor (BRT, BRT_CURSOR*, TOKULOGGER);
// get is deprecated in favor of the individual functions below
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, int get_flags, TOKUTXN txn);
int toku_brt_flatten(BRT, TOKULOGGER logger);
int toku_brt_cursor_first(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, TOKULOGGER logger);
int toku_brt_cursor_last(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, TOKULOGGER logger);
int toku_brt_cursor_next(BRT_CURSOR cursor, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, TOKULOGGER logger);
......
......@@ -3882,6 +3882,27 @@ static int locked_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32
return r;
}
static int
toku_db_flatten(DB *db, DB_TXN *txn) {
HANDLE_PANICKED_DB(db);
TOKULOGGER logger = toku_txn_logger(txn ? db_txn_struct_i(txn)->tokutxn : NULL);
int r = toku_brt_flatten(db->i->brt, logger);
return r;
}
static inline int autotxn_db_flatten(DB* db, DB_TXN* txn) {
BOOL changed; int r;
r = toku_db_construct_autotxn(db, &txn, &changed, FALSE);
if (r!=0) return r;
r = toku_db_flatten(db, txn);
return toku_db_destruct_autotxn(txn, r, changed);
}
static int locked_db_flatten(DB *db, DB_TXN *txn) {
toku_ydb_lock(); int r = autotxn_db_flatten(db, txn); toku_ydb_unlock(); return r;
}
static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
int r;
......@@ -3939,6 +3960,7 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
SDB(row_size_supported);
SDB(getf_set);
SDB(getf_get_both);
SDB(flatten);
#undef SDB
result->dbt_pos_infty = toku_db_dbt_pos_infty;
result->dbt_neg_infty = toku_db_dbt_neg_infty;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment