Commit c7f4f9c5 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #2249 [t:2249] Merge DB_ENV->put_multiple branch back onto main.

Fixed some makefile issues, ported recovery tests to use toku_hard_crash_on_purpose(), fixed db-benchmark-test to use default name
for 0th db

git-svn-id: file:///svn/toku/tokudb@16936 c7de825b-a66e-492c-adef-691d508d4ae1
parent 735be2b7
......@@ -196,7 +196,14 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
void* __toku_dummy0[24];
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void* __toku_dummy0[21];
char __toku_dummy1[64];
void *api1_internal; /* 32-bit offset=212 size=4, 64=bit offset=360 size=8 */
void* __toku_dummy2[7];
......
......@@ -198,7 +198,14 @@ struct __toku_db_env {
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
void* __toku_dummy0[24];
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void* __toku_dummy0[21];
char __toku_dummy1[96];
void *api1_internal; /* 32-bit offset=244 size=4, 64=bit offset=392 size=8 */
void* __toku_dummy2[7];
......
......@@ -199,7 +199,14 @@ struct __toku_db_env {
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
void* __toku_dummy0[39];
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void* __toku_dummy0[36];
char __toku_dummy1[128];
void *api1_internal; /* 32-bit offset=336 size=4, 64=bit offset=544 size=8 */
void* __toku_dummy2[7];
......
......@@ -198,59 +198,65 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
void* __toku_dummy0[1];
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
void* __toku_dummy1[38];
char __toku_dummy2[128];
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void* __toku_dummy0[36];
char __toku_dummy1[128];
void *api1_internal; /* 32-bit offset=336 size=4, 64=bit offset=544 size=8 */
void* __toku_dummy3[8];
void* __toku_dummy2[8];
int (*close) (DB_ENV *, u_int32_t); /* 32-bit offset=372 size=4, 64=bit offset=616 size=8 */
int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t); /* 32-bit offset=376 size=4, 64=bit offset=624 size=8 */
int (*dbrename) (DB_ENV *, DB_TXN *, const char *, const char *, const char *, u_int32_t); /* 32-bit offset=380 size=4, 64=bit offset=632 size=8 */
void (*err) (const DB_ENV *, int, const char *, ...); /* 32-bit offset=384 size=4, 64=bit offset=640 size=8 */
void* __toku_dummy4[3];
void* __toku_dummy3[3];
int (*get_cachesize) (DB_ENV *, u_int32_t *, u_int32_t *, int *); /* 32-bit offset=400 size=4, 64=bit offset=672 size=8 */
void* __toku_dummy5[4];
void* __toku_dummy4[4];
int (*get_flags) (DB_ENV *, u_int32_t *); /* 32-bit offset=420 size=4, 64=bit offset=712 size=8 */
void* __toku_dummy6[4];
void* __toku_dummy5[4];
int (*get_lg_max) (DB_ENV *, u_int32_t*); /* 32-bit offset=440 size=4, 64=bit offset=752 size=8 */
void* __toku_dummy7[4];
void* __toku_dummy6[4];
int (*get_lk_max_locks) (DB_ENV *, u_int32_t *); /* 32-bit offset=460 size=4, 64=bit offset=792 size=8 */
void* __toku_dummy8[21];
void* __toku_dummy7[21];
int (*log_archive) (DB_ENV *, char **[], u_int32_t); /* 32-bit offset=548 size=4, 64=bit offset=968 size=8 */
void* __toku_dummy9[2];
void* __toku_dummy8[2];
int (*log_flush) (DB_ENV *, const DB_LSN *); /* 32-bit offset=560 size=4, 64=bit offset=992 size=8 */
void* __toku_dummy10[25];
void* __toku_dummy9[25];
int (*open) (DB_ENV *, const char *, u_int32_t, int); /* 32-bit offset=664 size=4, 64=bit offset=1200 size=8 */
void* __toku_dummy11[27];
void* __toku_dummy10[27];
int (*set_cachesize) (DB_ENV *, u_int32_t, u_int32_t, int); /* 32-bit offset=776 size=4, 64=bit offset=1424 size=8 */
int (*set_data_dir) (DB_ENV *, const char *); /* 32-bit offset=780 size=4, 64=bit offset=1432 size=8 */
void* __toku_dummy12[1];
void* __toku_dummy11[1];
void (*set_errcall) (DB_ENV *, void (*)(const DB_ENV *, const char *, const char *)); /* 32-bit offset=788 size=4, 64=bit offset=1448 size=8 */
void (*set_errfile) (DB_ENV *, FILE*); /* 32-bit offset=792 size=4, 64=bit offset=1456 size=8 */
void (*set_errpfx) (DB_ENV *, const char *); /* 32-bit offset=796 size=4, 64=bit offset=1464 size=8 */
void* __toku_dummy13[2];
void* __toku_dummy12[2];
int (*set_flags) (DB_ENV *, u_int32_t, int); /* 32-bit offset=808 size=4, 64=bit offset=1488 size=8 */
void* __toku_dummy14[2];
void* __toku_dummy13[2];
int (*set_lg_bsize) (DB_ENV *, u_int32_t); /* 32-bit offset=820 size=4, 64=bit offset=1512 size=8 */
int (*set_lg_dir) (DB_ENV *, const char *); /* 32-bit offset=824 size=4, 64=bit offset=1520 size=8 */
void* __toku_dummy15[1];
void* __toku_dummy14[1];
int (*set_lg_max) (DB_ENV *, u_int32_t); /* 32-bit offset=832 size=4, 64=bit offset=1536 size=8 */
void* __toku_dummy16[2];
void* __toku_dummy15[2];
int (*set_lk_detect) (DB_ENV *, u_int32_t); /* 32-bit offset=844 size=4, 64=bit offset=1560 size=8 */
void* __toku_dummy17[1];
void* __toku_dummy16[1];
int (*set_lk_max_locks) (DB_ENV *, u_int32_t); /* 32-bit offset=852 size=4, 64=bit offset=1576 size=8 */
void* __toku_dummy18[14];
void* __toku_dummy17[14];
int (*set_tmp_dir) (DB_ENV *, const char *); /* 32-bit offset=912 size=4, 64=bit offset=1696 size=8 */
void* __toku_dummy19[2];
void* __toku_dummy18[2];
int (*set_verbose) (DB_ENV *, u_int32_t, int); /* 32-bit offset=924 size=4, 64=bit offset=1720 size=8 */
void* __toku_dummy20[1];
void* __toku_dummy19[1];
int (*txn_begin) (DB_ENV *, DB_TXN *, DB_TXN **, u_int32_t); /* 32-bit offset=932 size=4, 64=bit offset=1736 size=8 */
int (*txn_checkpoint) (DB_ENV *, u_int32_t, u_int32_t, u_int32_t); /* 32-bit offset=936 size=4, 64=bit offset=1744 size=8 */
void* __toku_dummy21[1];
void* __toku_dummy20[1];
int (*txn_stat) (DB_ENV *, DB_TXN_STAT **, u_int32_t); /* 32-bit offset=944 size=4, 64=bit offset=1760 size=8 */
void* __toku_dummy22[2]; /* Padding at the end */
char __toku_dummy23[16]; /* Padding at the end */
void* __toku_dummy21[2]; /* Padding at the end */
char __toku_dummy22[16]; /* Padding at the end */
};
struct __toku_db_key_range {
double less; /* 32-bit offset=0 size=8, 64=bit offset=0 size=8 */
......
......@@ -200,60 +200,66 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
void* __toku_dummy0[1];
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
void* __toku_dummy1[39];
char __toku_dummy2[144];
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void* __toku_dummy0[37];
char __toku_dummy1[144];
void *api1_internal; /* 32-bit offset=356 size=4, 64=bit offset=568 size=8 */
void* __toku_dummy3[8];
void* __toku_dummy2[8];
int (*close) (DB_ENV *, u_int32_t); /* 32-bit offset=392 size=4, 64=bit offset=640 size=8 */
int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t); /* 32-bit offset=396 size=4, 64=bit offset=648 size=8 */
int (*dbrename) (DB_ENV *, DB_TXN *, const char *, const char *, const char *, u_int32_t); /* 32-bit offset=400 size=4, 64=bit offset=656 size=8 */
void (*err) (const DB_ENV *, int, const char *, ...); /* 32-bit offset=404 size=4, 64=bit offset=664 size=8 */
void* __toku_dummy4[3];
void* __toku_dummy3[3];
int (*get_cachesize) (DB_ENV *, u_int32_t *, u_int32_t *, int *); /* 32-bit offset=420 size=4, 64=bit offset=696 size=8 */
void* __toku_dummy5[5];
void* __toku_dummy4[5];
int (*get_flags) (DB_ENV *, u_int32_t *); /* 32-bit offset=444 size=4, 64=bit offset=744 size=8 */
void* __toku_dummy6[4];
void* __toku_dummy5[4];
int (*get_lg_max) (DB_ENV *, u_int32_t*); /* 32-bit offset=464 size=4, 64=bit offset=784 size=8 */
void* __toku_dummy7[4];
void* __toku_dummy6[4];
int (*get_lk_max_locks) (DB_ENV *, u_int32_t *); /* 32-bit offset=484 size=4, 64=bit offset=824 size=8 */
void* __toku_dummy8[22];
void* __toku_dummy7[22];
int (*log_archive) (DB_ENV *, char **[], u_int32_t); /* 32-bit offset=576 size=4, 64=bit offset=1008 size=8 */
void* __toku_dummy9[2];
void* __toku_dummy8[2];
int (*log_flush) (DB_ENV *, const DB_LSN *); /* 32-bit offset=588 size=4, 64=bit offset=1032 size=8 */
void* __toku_dummy10[25];
void* __toku_dummy9[25];
int (*open) (DB_ENV *, const char *, u_int32_t, int); /* 32-bit offset=692 size=4, 64=bit offset=1240 size=8 */
void* __toku_dummy11[30];
void* __toku_dummy10[30];
int (*set_cachesize) (DB_ENV *, u_int32_t, u_int32_t, int); /* 32-bit offset=816 size=4, 64=bit offset=1488 size=8 */
void* __toku_dummy12[1];
void* __toku_dummy11[1];
int (*set_data_dir) (DB_ENV *, const char *); /* 32-bit offset=824 size=4, 64=bit offset=1504 size=8 */
void* __toku_dummy13[1];
void* __toku_dummy12[1];
void (*set_errcall) (DB_ENV *, void (*)(const DB_ENV *, const char *, const char *)); /* 32-bit offset=832 size=4, 64=bit offset=1520 size=8 */
void (*set_errfile) (DB_ENV *, FILE*); /* 32-bit offset=836 size=4, 64=bit offset=1528 size=8 */
void (*set_errpfx) (DB_ENV *, const char *); /* 32-bit offset=840 size=4, 64=bit offset=1536 size=8 */
void* __toku_dummy14[2];
void* __toku_dummy13[2];
int (*set_flags) (DB_ENV *, u_int32_t, int); /* 32-bit offset=852 size=4, 64=bit offset=1560 size=8 */
void* __toku_dummy15[2];
void* __toku_dummy14[2];
int (*set_lg_bsize) (DB_ENV *, u_int32_t); /* 32-bit offset=864 size=4, 64=bit offset=1584 size=8 */
int (*set_lg_dir) (DB_ENV *, const char *); /* 32-bit offset=868 size=4, 64=bit offset=1592 size=8 */
void* __toku_dummy16[1];
void* __toku_dummy15[1];
int (*set_lg_max) (DB_ENV *, u_int32_t); /* 32-bit offset=876 size=4, 64=bit offset=1608 size=8 */
void* __toku_dummy17[2];
void* __toku_dummy16[2];
int (*set_lk_detect) (DB_ENV *, u_int32_t); /* 32-bit offset=888 size=4, 64=bit offset=1632 size=8 */
void* __toku_dummy18[1];
void* __toku_dummy17[1];
int (*set_lk_max_locks) (DB_ENV *, u_int32_t); /* 32-bit offset=896 size=4, 64=bit offset=1648 size=8 */
void* __toku_dummy19[14];
void* __toku_dummy18[14];
int (*set_tmp_dir) (DB_ENV *, const char *); /* 32-bit offset=956 size=4, 64=bit offset=1768 size=8 */
void* __toku_dummy20[2];
void* __toku_dummy19[2];
int (*set_verbose) (DB_ENV *, u_int32_t, int); /* 32-bit offset=968 size=4, 64=bit offset=1792 size=8 */
void* __toku_dummy21[1];
void* __toku_dummy20[1];
int (*txn_begin) (DB_ENV *, DB_TXN *, DB_TXN **, u_int32_t); /* 32-bit offset=976 size=4, 64=bit offset=1808 size=8 */
int (*txn_checkpoint) (DB_ENV *, u_int32_t, u_int32_t, u_int32_t); /* 32-bit offset=980 size=4, 64=bit offset=1816 size=8 */
void* __toku_dummy22[1];
void* __toku_dummy21[1];
int (*txn_stat) (DB_ENV *, DB_TXN_STAT **, u_int32_t); /* 32-bit offset=988 size=4, 64=bit offset=1832 size=8 */
void* __toku_dummy23[2]; /* Padding at the end */
char __toku_dummy24[16]; /* Padding at the end */
void* __toku_dummy22[2]; /* Padding at the end */
char __toku_dummy23[16]; /* Padding at the end */
};
struct __toku_db_key_range {
double less; /* 32-bit offset=0 size=8, 64=bit offset=0 size=8 */
......
......@@ -447,6 +447,13 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
"int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */",
"int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */",
"int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */",
"int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */",
"int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */",
"int (*set_multiple_callbacks) (DB_ENV *env,\n"
" int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),\n"
" int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),\n"
" int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),\n"
" int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */",
NULL};
print_struct("db_env", 1, db_env_fields32, db_env_fields64, sizeof(db_env_fields32)/sizeof(db_env_fields32[0]), extra);
}
......
......@@ -200,7 +200,14 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
void *app_private;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void *api1_internal;
int (*close) (DB_ENV *, u_int32_t);
int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t);
......
......@@ -23,6 +23,7 @@
#endif
int verbose=1;
int which;
enum { SERIAL_SPACING = 1<<6 };
enum { DEFAULT_ITEMS_TO_INSERT_PER_ITERATION = 1<<20 };
......@@ -39,6 +40,8 @@ int pagesize = 0;
long long cachesize = 128*1024*1024;
int do_1514_point_query = 0;
int dupflags = 0;
int insert_multiple = 0;
int num_dbs = 1;
int noserial = 0; // Don't do the serial stuff
int norandom = 0; // Don't do the random stuff
int prelock = 0;
......@@ -118,10 +121,43 @@ char *dbfilename = "bench.db";
char *dbname;
DB_ENV *dbenv;
DB *db;
enum {MAX_DBS=128};
DB *dbs[MAX_DBS];
uint32_t put_flagss[MAX_DBS];
DB_TXN *parenttid=0;
DB_TXN *tid=0;
#if defined(TOKUDB)
static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **dbs_in, DBT *keys, DBT *vals, void *extra) {
assert((int)num_dbs_in == num_dbs);
assert(extra == &put_flags); //Verifying extra gets set right.
assert(row->size >= 4);
int32_t row_keysize = *(int32_t*)row->data;
assert(row_keysize == keysize);
assert((int)row->size >= 4+keysize);
int32_t row_valsize = row->size - 4 - keysize;
assert(row_valsize == valsize);
void *key = row->data+4;
void *val = row->data+4 + keysize;
for (which = 0; which < num_dbs; which++) {
assert(dbs_in[which] == dbs[which]);
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
}
static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) {
assert(extra == &put_flags); //Verifying extra gets set right.
return 0;
}
#endif
static void benchmark_setup (void) {
int r;
......@@ -162,6 +198,14 @@ static void benchmark_setup (void) {
r = dbenv->set_lg_dir(dbenv, log_dir);
assert(r == 0);
}
#if defined(TOKUDB)
if (insert_multiple) {
r = dbenv->set_multiple_callbacks(dbenv,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r);
}
#endif
r = dbenv->open(dbenv, dbdir, env_open_flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
assert(r == 0);
......@@ -176,12 +220,16 @@ static void benchmark_setup (void) {
}
#endif
r = db_create(&db, dbenv, 0);
for (which = 0; which < num_dbs; which++) {
r = db_create(&dbs[which], dbenv, 0);
assert(r == 0);
}
if (do_transactions) {
r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r);
}
for (which = 0; which < num_dbs; which++) {
DB *db = dbs[which];
if (pagesize && db->set_pagesize) {
r = db->set_pagesize(db, pagesize);
assert(r == 0);
......@@ -190,9 +238,15 @@ static void benchmark_setup (void) {
r = db->set_flags(db, dupflags);
assert(r == 0);
}
r = db->open(db, tid, dbfilename, NULL, DB_BTREE, DB_CREATE, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
char name[strlen(dbfilename)+10];
if (which==0)
sprintf(name, "%s", dbfilename);
else
sprintf(name, "%s_%d", dbfilename, which);
r = db->open(db, tid, name, NULL, DB_BTREE, DB_CREATE, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
if (r!=0) fprintf(stderr, "errno=%d, %s\n", errno, strerror(errno));
assert(r == 0);
}
if (insert1first) {
if (do_transactions) {
r=tid->commit(tid, 0);
......@@ -215,8 +269,12 @@ static void benchmark_setup (void) {
r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r);
}
if (do_transactions) {
if (singlex)
if (singlex) {
for (which = 0; which < num_dbs; which++) {
DB *db = dbs[which];
do_prelock(db, tid);
}
}
else {
r=tid->commit(tid, 0);
assert(r==0);
......@@ -272,8 +330,11 @@ static void benchmark_shutdown (void) {
assert(!tid);
assert(!parenttid);
for (which = 0; which < num_dbs; which++) {
DB *db = dbs[which];
r = db->close(db, 0);
assert(r == 0);
}
r = dbenv->close(dbenv, 0);
assert(r == 0);
}
......@@ -307,21 +368,42 @@ static void fill_array (unsigned char *data, int size) {
}
static void insert (long long v) {
unsigned char kc[keysize], vc[valsize];
int r;
unsigned char data[keysize+valsize+4];
unsigned char *kc = data+4, *vc = data+keysize+4;
DBT kt, vt;
fill_array(kc, sizeof kc);
fill_array(kc, keysize);
long_long_to_array(kc, keysize, v); // Fill in the array first, then write the long long in.
fill_array(vc, sizeof vc);
fill_array(vc, valsize);
long_long_to_array(vc, valsize, v);
int r = db->put(db, tid, fill_dbt(&kt, kc, keysize), fill_dbt(&vt, vc, valsize), put_flags);
*(uint32_t*)(data) = keysize;
if (insert_multiple) {
DBT row;
fill_dbt(&row, data, sizeof(data));
#if defined(TOKUDB)
r = dbenv->put_multiple(dbenv, tid, &row, num_dbs, dbs, put_flagss, &put_flags); //Extra used just to verify its passed right
#else
r = EINVAL;
#endif
CKERR(r);
}
else {
for (which = 0; which < num_dbs; which++) {
DB *db = dbs[which];
r = db->put(db, tid, fill_dbt(&kt, kc, keysize), fill_dbt(&vt, vc, valsize), put_flags);
CKERR(r);
}
}
if (do_transactions) {
if (n_insertions_since_txn_began>=items_per_transaction && !singlex) {
n_insertions_since_txn_began=0;
r = tid->commit(tid, commitflags); assert(r==0);
tid = NULL;
r=dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
for (which = 0; which < num_dbs; which++) {
DB *db = dbs[which];
do_prelock(db, tid);
}
n_insertions_since_txn_began=0;
}
n_insertions_since_txn_began++;
......@@ -332,8 +414,11 @@ static void serial_insert_from (long long from) {
long long i;
if (do_transactions && !singlex) {
int r = dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
for (which = 0; which < num_dbs; which++) {
DB *db = dbs[which];
do_prelock(db, tid);
}
}
for (i=0; i<items_per_iteration; i++) {
insert((from+i)*SERIAL_SPACING);
}
......@@ -351,8 +436,11 @@ static void random_insert_below (long long below) {
long long i;
if (do_transactions && !singlex) {
int r = dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
for (which = 0; which < num_dbs; which++) {
DB *db = dbs[which];
do_prelock(db, tid);
}
}
for (i=0; i<items_per_iteration; i++) {
insert(llrandom()%below);
}
......@@ -421,6 +509,8 @@ static int print_usage (const char *argv0) {
fprintf(stderr, " --append append to an existing file\n");
fprintf(stderr, " --userandom use random()\n");
fprintf(stderr, " --checkpoint-period %"PRIu32" checkpoint period\n", checkpoint_period);
fprintf(stderr, " --numdbs N Insert same items into N dbs (1 to %d)\n", MAX_DBS);
fprintf(stderr, " --insertmultiple Use DB_ENV->put_multiple api. Requires transactions.\n");
fprintf(stderr, " n_iterations how many iterations (default %lld)\n", default_n_items/DEFAULT_ITEMS_TO_INSERT_PER_ITERATION);
return 1;
......@@ -448,12 +538,15 @@ test1514(void) {
struct timeval t1,t2;
for (which = 0; which < num_dbs; which++) {
DB *db = dbs[which];
r = db->cursor(db, tid, &c, 0); CKERR(r);
gettimeofday(&t1,0);
r = c->c_getf_set(c, 0, fill_dbt(&kt, kc, keysize), nothing, NULL);
gettimeofday(&t2,0);
CKERR2(r, DB_NOTFOUND);
r = c->c_close(c); CKERR(r);
}
if (verbose) printf("(#1514) Single Point Query %9.6fs\n", toku_tdiff(&t2, &t1));
}
......@@ -476,6 +569,14 @@ int main (int argc, const char *argv[]) {
noserial=1;
} else if (strcmp(arg, "--norandom") == 0) {
norandom=1;
} else if (strcmp(arg, "--insertmultiple") == 0) {
insert_multiple=1;
} else if (strcmp(arg, "--numdbs") == 0) {
num_dbs = atoi(argv[++i]);
if (num_dbs <= 0 || num_dbs > MAX_DBS) {
fprintf(stderr, "--numdbs needs between 1 and %d\n", MAX_DBS);
return print_usage(argv[0]);
}
} else if (strcmp(arg, "--compressibility") == 0) {
compressibility = atof(argv[++i]);
init_random_c(); (void) get_random_c();
......@@ -600,15 +701,28 @@ int main (int argc, const char *argv[]) {
printf("insertions of %d per batch%s\n", items_per_iteration, do_transactions ? " (with transactions)" : "");
}
#if !defined TOKUDB
if (insert_multiple) {
fprintf(stderr, "--insert_multiple only works on the TokuDB (not BDB)\n");
return print_usage(argv[0]);
}
if (check_small_rolltmp) {
fprintf(stderr, "--check_small_rolltmp only works on the TokuDB (not BDB)\n");
return print_usage(argv[0]);
}
#endif
if (insert_multiple) {
for (which = 0; which < num_dbs; which++) {
put_flagss[i] = put_flags;
}
}
if (check_small_rolltmp && !singlex) {
fprintf(stderr, "--check_small_rolltmp requires --singlex\n");
return print_usage(argv[0]);
}
if (!do_transactions && insert_multiple) {
fprintf(stderr, "--insert_multiple requires transactions\n");
return print_usage(argv[0]);
}
benchmark_setup();
gettimeofday(&t1,0);
biginsert(total_n_items, &t1);
......
......@@ -200,7 +200,14 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
void *app_private;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void *api1_internal;
int (*close) (DB_ENV *, u_int32_t);
int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t);
......
......@@ -2629,7 +2629,7 @@ toku_brt_broadcast_commit_all (BRT brt)
// Effect: Insert the key-val pair into brt.
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
return toku_brt_maybe_insert(brt, key, val, txn, FALSE, ZERO_LSN);
return toku_brt_maybe_insert(brt, key, val, txn, FALSE, ZERO_LSN, TRUE);
}
static void
......@@ -2638,7 +2638,27 @@ txn_note_doing_work(TOKUTXN txn) {
txn->has_done_work = 1;
}
int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn) {
int
toku_brt_log_put_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) {
int r = 0;
assert(txn);
assert(num_brts > 0);
TOKULOGGER logger = toku_txn_logger(txn);
if (logger) {
FILENUM fnums[num_brts];
FILENUMS filenums = {.num = num_brts, .filenums = fnums};
int i;
for (i = 0; i < num_brts; i++) {
fnums[i] = toku_cachefile_filenum(brts[i]->cf);
}
BYTESTRING rowbs = {.len=row->size, .data=row->data};
TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_enq_insert_multiple(logger, (LSN*)0, 0, filenums, xid, rowbs);
}
return r;
}
int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging) {
int r = 0;
XIDS message_xids;
TXNID xid = toku_txn_get_txnid(txn);
......@@ -2663,7 +2683,7 @@ int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_
message_xids = xids_get_root_xids();
}
TOKULOGGER logger = toku_txn_logger(txn);
if (logger) {
if (do_logging && logger) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
r = toku_log_enq_insert(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs, valbs);
......@@ -2681,10 +2701,30 @@ int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, BOOL oplsn_
}
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
return toku_brt_maybe_delete(brt, key, txn, FALSE, ZERO_LSN);
return toku_brt_maybe_delete(brt, key, txn, FALSE, ZERO_LSN, TRUE);
}
int toku_brt_maybe_delete(BRT brt, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn) {
int
toku_brt_log_del_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) {
int r = 0;
assert(txn);
assert(num_brts > 0);
TOKULOGGER logger = toku_txn_logger(txn);
if (logger) {
FILENUM fnums[num_brts];
FILENUMS filenums = {.num = num_brts, .filenums = fnums};
int i;
for (i = 0; i < num_brts; i++) {
fnums[i] = toku_cachefile_filenum(brts[i]->cf);
}
BYTESTRING rowbs = {.len=row->size, .data=row->data};
TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_enq_delete_multiple(logger, (LSN*)0, 0, filenums, xid, rowbs);
}
return r;
}
int toku_brt_maybe_delete(BRT brt, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging) {
int r;
XIDS message_xids;
TXNID xid = toku_txn_get_txnid(txn);
......@@ -2703,7 +2743,7 @@ int toku_brt_maybe_delete(BRT brt, DBT *key, TOKUTXN txn, BOOL oplsn_valid, LSN
message_xids = xids_get_root_xids();
}
TOKULOGGER logger = toku_txn_logger(txn);
if (logger) {
if (do_logging && logger) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs);
if (r!=0) return r;
......
......@@ -58,7 +58,9 @@ int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successful
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn);
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging);
int toku_brt_log_put_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row);
int toku_brt_log_del_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row);
// Effect: Delete a key from a brt
// Returns 0 if successful
......@@ -66,7 +68,7 @@ int toku_brt_delete (BRT brt, DBT *k, TOKUTXN txn);
// Effect: Delete a key from a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successful
int toku_brt_maybe_delete (BRT brt, DBT *k, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn);
int toku_brt_maybe_delete (BRT brt, DBT *k, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging);
// Effect: Delete a pair only if both k and v are equal according to the comparison function.
// Returns 0 if successful
......
......@@ -48,6 +48,11 @@ typedef struct __toku_lsn { u_int64_t lsn; } LSN;
/* Make the FILEID a struct for the same reason. */
typedef struct __toku_fileid { u_int32_t fileid; } FILENUM;
typedef struct {
u_int32_t num;
FILENUM *filenums;
} FILENUMS;
#if !TOKU_WINDOWS && !defined(BOOL_DEFINED)
#define BOOL_DEFINED
typedef enum __toku_bool { FALSE=0, TRUE=1} BOOL;
......@@ -101,6 +106,11 @@ typedef struct brt_msg BRT_MSG_S, *BRT_MSG;
typedef int (*brt_compare_func)(DB *, const DBT *, const DBT *);
typedef int (*generate_keys_vals_for_put_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra);
typedef int (*cleanup_keys_vals_for_put_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra);
typedef int (*generate_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra);
typedef int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra);
#define UU(x) x __attribute__((__unused__))
#endif
......
......@@ -149,6 +149,10 @@ static inline int toku_logsizeof_TXNID (TXNID txnid __attribute__((__unused__)))
return 8;
}
static inline int toku_logsizeof_FILENUMS (FILENUMS fs) {
return 4 + fs.num * toku_logsizeof_FILENUM(fs.filenums[0]);
}
static inline int toku_logsizeof_BYTESTRING (BYTESTRING bs) {
return 4+bs.len;
}
......
......@@ -30,6 +30,7 @@ static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) {
return 0;
}
static inline void toku_free_BYTESTRING(BYTESTRING val) { toku_free(val.data); }
static inline void toku_free_FILENUMS(FILENUMS val) { toku_free(val.filenums); }
void toku_set_lsn_increment (uint64_t incr) __attribute__((__visibility__("default")));
......
......@@ -150,6 +150,14 @@ const struct logtype logtypes[] = {
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
NULLFIELD}},
{"enq_insert_multiple", 'm', FA{{"FILENUMS", "filenums", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "row", 0},
NULLFIELD}},
{"enq_delete_multiple", 'M', FA{{"FILENUMS", "filenums", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "row", 0},
NULLFIELD}},
{"comment", 'T', FA{{"u_int64_t", "timestamp", 0},
{"BYTESTRING", "comment", 0},
NULLFIELD}},
......@@ -426,6 +434,10 @@ generate_log_reader (void) {
fprintf(cf, " toku_free_BYTESTRING(data->%s);\n", ft->name);
free_count++;
}
else if ( strcmp(ft->type, "FILENUMS") == 0 ) {
fprintf(cf, " toku_free_FILENUMS(data->%s);\n", ft->name);
free_count++;
}
});
if ( free_count == 0 ) fprintf(cf, " struct logtype_%s *dummy __attribute__ ((unused)) = data;\n", lt->name);
fprintf(cf, "}\n\n");
......
......@@ -679,6 +679,24 @@ int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, struct x1764 *checksum, u_in
return 0;
}
// fills in the fs with malloced data.
int toku_fread_FILENUMS (FILE *f, FILENUMS *fs, struct x1764 *checksum, u_int32_t *len) {
int r=toku_fread_u_int32_t(f, (u_int32_t*)&fs->num, checksum, len);
if (r!=0) return r;
fs->filenums = toku_malloc(fs->num * sizeof(FILENUM));
u_int32_t i;
for (i=0; i<fs->num; i++) {
r=toku_fread_FILENUM (f, &fs->filenums[i], checksum, len);
if (r!=0) {
toku_free(fs->filenums);
fs->filenums=0;
return r;
}
}
return 0;
}
int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__))) {
LSN v;
int r = toku_fread_LSN(inf, &v, checksum, len);
......@@ -752,11 +770,35 @@ int toku_logprint_BYTESTRING (FILE *outf, FILE *inf, const char *fieldname, stru
toku_free(bs.data);
return 0;
}
int toku_logprint_FILENUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format) {
return toku_logprint_u_int32_t(outf, inf, fieldname, checksum, len, format);
}
static void
toku_print_FILENUMS (FILE *outf, u_int32_t num, FILENUM *filenums) {
fprintf(outf, "{num=%u filenums=\"", num);
u_int32_t i;
for (i=0; i<num; i++) {
if (i>0)
fprintf(outf, ",");
fprintf(outf, "0x%"PRIx32, filenums[i].fileid);
}
fprintf(outf, "\"}");
}
int toku_logprint_FILENUMS (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__))) {
FILENUMS bs;
int r = toku_fread_FILENUMS(inf, &bs, checksum, len);
if (r!=0) return r;
fprintf(outf, " %s=", fieldname);
toku_print_FILENUMS(outf, bs.num, bs.filenums);
toku_free(bs.filenums);
return 0;
}
int toku_read_and_print_logmagic (FILE *f, u_int32_t *versionp) {
{
char magic[8];
......
......@@ -54,6 +54,7 @@ int toku_fread_LSN (FILE *f, LSN *lsn, struct x1764 *checksum, u_int32_t *le
int toku_fread_FILENUM (FILE *f, FILENUM *filenum, struct x1764 *checksum, u_int32_t *len);
int toku_fread_TXNID (FILE *f, TXNID *txnid, struct x1764 *checksum, u_int32_t *len);
int toku_fread_BYTESTRING (FILE *f, BYTESTRING *bs, struct x1764 *checksum, u_int32_t *len);
int toku_fread_FILENUMS (FILE *f, FILENUMS *fs, struct x1764 *checksum, u_int32_t *len);
int toku_logprint_LSN (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__)));
int toku_logprint_TXNID (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__)));
......@@ -63,6 +64,7 @@ int toku_logprint_u_int64_t (FILE *outf, FILE *inf, const char *fieldname, struc
void toku_print_BYTESTRING (FILE *outf, u_int32_t len, char *data);
int toku_logprint_BYTESTRING (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format __attribute__((__unused__)));
int toku_logprint_FILENUM (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format);
int toku_logprint_FILENUMS (FILE *outf, FILE *inf, const char *fieldname, struct x1764 *checksum, u_int32_t *len, const char *format);
int toku_read_and_print_logmagic (FILE *f, u_int32_t *versionp);
int toku_read_logmagic (FILE *f, u_int32_t *versionp);
......
......@@ -166,13 +166,22 @@ struct recover_env {
TOKULOGGER logger;
brt_compare_func bt_compare;
brt_compare_func dup_compare;
generate_keys_vals_for_put_func generate_keys_vals_for_put;
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put;
generate_keys_for_del_func generate_keys_for_del;
cleanup_keys_for_del_func cleanup_keys_for_del;
struct scan_state ss;
struct file_map fmap;
BOOL goforward;
};
typedef struct recover_env *RECOVER_ENV;
static int recover_env_init (RECOVER_ENV renv, brt_compare_func bt_compare, brt_compare_func dup_compare, size_t cachetable_size) {
static int recover_env_init (RECOVER_ENV renv, brt_compare_func bt_compare, brt_compare_func dup_compare,
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del,
size_t cachetable_size) {
int r;
r = toku_create_cachetable(&renv->ct, cachetable_size ? cachetable_size : 1<<25, (LSN){0}, 0);
......@@ -183,6 +192,10 @@ static int recover_env_init (RECOVER_ENV renv, brt_compare_func bt_compare, brt_
toku_logger_set_cachetable(renv->logger, renv->ct);
renv->bt_compare = bt_compare;
renv->dup_compare = dup_compare;
renv->generate_keys_vals_for_put = generate_keys_vals_for_put;
renv->cleanup_keys_vals_for_put = cleanup_keys_vals_for_put;
renv->generate_keys_for_del = generate_keys_for_del;
renv->cleanup_keys_for_del = cleanup_keys_for_del;
file_map_init(&renv->fmap);
renv->goforward = FALSE;
......@@ -463,7 +476,7 @@ static int toku_recover_enq_insert (struct logtype_enq_insert *l, RECOVER_ENV re
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
toku_fill_dbt(&valdbt, l->value.data, l->value.len);
r = toku_brt_maybe_insert(tuple->brt, &keydbt, &valdbt, txn, TRUE, l->lsn);
r = toku_brt_maybe_insert(tuple->brt, &keydbt, &valdbt, txn, TRUE, l->lsn, FALSE);
assert(r == 0);
return 0;
......@@ -474,6 +487,136 @@ static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), R
return 0;
}
static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple *l, RECOVER_ENV renv) {
int r;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0;
}
#if 0
int (*generate_keys_vals_for_put) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], DBT vals[num_dbs], void *extra),
int (*cleanup_keys_vals_for_put) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], DBT vals[num_dbs], void *extra),
int (*generate_keys_for_del) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], void *extra),
int (*cleanup_keys_for_del) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], void *extra));
#endif
DB* dbs[l->filenums.num];
memset(dbs, 0, sizeof(dbs));
uint32_t file;
uint32_t num_dbs = 0;
for (file = 0; file < l->filenums.num; file++) {
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything for this file.
continue;
}
dbs[num_dbs++] = tuple->brt->db;
}
if (num_dbs == 0) //All files are closed/deleted. We're done.
return 0;
DBT keydbts[num_dbs], valdbts[num_dbs], rowdbt;
memset(keydbts, 0, sizeof(keydbts));
memset(valdbts, 0, sizeof(valdbts));
//Generate all the DBTs
toku_fill_dbt(&rowdbt, l->row.data, l->row.len);
r = renv->generate_keys_vals_for_put(&rowdbt, num_dbs, dbs, keydbts, valdbts, NULL);
assert(r==0);
uint32_t which_db = 0;
for (file = 0; file < l->filenums.num; file++) {
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything for this file.
continue;
}
assert(tuple->brt->db == dbs[which_db]);
r = toku_brt_maybe_insert(tuple->brt, &keydbts[which_db], &valdbts[which_db], txn, TRUE, l->lsn, FALSE);
assert(r == 0);
which_db++;
}
assert(which_db == num_dbs);
//Do cleanup of all dbts.
r = renv->cleanup_keys_vals_for_put(&rowdbt, num_dbs, dbs, keydbts, valdbts, NULL);
assert(r==0);
return 0;
}
static int toku_recover_backward_enq_insert_multiple (struct logtype_enq_insert_multiple *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple *l, RECOVER_ENV renv) {
int r;
TOKUTXN txn = NULL;
r = toku_txnid2txn(renv->logger, l->xid, &txn);
assert(r == 0);
if (txn == NULL) {
//This is a straddle txn.
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0;
}
DB* dbs[l->filenums.num];
memset(dbs, 0, sizeof(dbs));
uint32_t file;
uint32_t num_dbs = 0;
for (file = 0; file < l->filenums.num; file++) {
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything for this file.
continue;
}
dbs[num_dbs++] = tuple->brt->db;
}
if (num_dbs == 0) //All files are closed/deleted. We're done.
return 0;
DBT keydbts[num_dbs], rowdbt;
memset(keydbts, 0, sizeof(keydbts));
//Generate all the DBTs
toku_fill_dbt(&rowdbt, l->row.data, l->row.len);
r = renv->generate_keys_for_del(&rowdbt, num_dbs, dbs, keydbts, NULL);
assert(r==0);
uint32_t which_db = 0;
for (file = 0; file < l->filenums.num; file++) {
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything for this file.
continue;
}
assert(tuple->brt->db == dbs[which_db]);
r = toku_brt_maybe_delete(tuple->brt, &keydbts[which_db], txn, TRUE, l->lsn, FALSE);
assert(r == 0);
which_db++;
}
assert(which_db == num_dbs);
//Do cleanup of all dbts.
r = renv->cleanup_keys_for_del(&rowdbt, num_dbs, dbs, keydbts, NULL);
assert(r==0);
return 0;
}
static int toku_recover_backward_enq_delete_multiple (struct logtype_enq_delete_multiple *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_enq_delete_both (struct logtype_enq_delete_both *l, RECOVER_ENV renv) {
int r;
TOKUTXN txn = NULL;
......@@ -522,7 +665,7 @@ static int toku_recover_enq_delete_any (struct logtype_enq_delete_any *l, RECOVE
}
DBT keydbt;
toku_fill_dbt(&keydbt, l->key.data, l->key.len);
r = toku_brt_maybe_delete(tuple->brt, &keydbt, txn, TRUE, l->lsn);
r = toku_brt_maybe_delete(tuple->brt, &keydbt, txn, TRUE, l->lsn, FALSE);
assert(r == 0);
return 0;
......@@ -1185,7 +1328,14 @@ int tokudb_recover_delete_rolltmp_files(const char *UU(data_dir), const char *lo
return r;
}
int tokudb_recover(const char *env_dir, const char *log_dir, brt_compare_func bt_compare, brt_compare_func dup_compare, size_t cachetable_size) {
int tokudb_recover(const char *env_dir, const char *log_dir,
brt_compare_func bt_compare,
brt_compare_func dup_compare,
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del,
size_t cachetable_size) {
int r;
int lockfd = -1;
......@@ -1202,7 +1352,12 @@ int tokudb_recover(const char *env_dir, const char *log_dir, brt_compare_func bt
int rr = 0;
if (tokudb_needs_recovery(log_dir, FALSE)) {
struct recover_env renv;
r = recover_env_init(&renv, bt_compare, dup_compare, cachetable_size);
r = recover_env_init(&renv, bt_compare, dup_compare,
generate_keys_vals_for_put,
cleanup_keys_vals_for_put,
generate_keys_for_del,
cleanup_keys_for_del,
cachetable_size);
assert(r == 0);
rr = do_recovery(&renv, env_dir, log_dir);
......
......@@ -16,7 +16,14 @@
// Run tokudb recovery from the log
// Returns 0 if success
int tokudb_recover(const char *envdir, const char *logdir, brt_compare_func bt_compare, brt_compare_func dup_compare, size_t cachetable_size);
int tokudb_recover (const char *env_dir, const char *log_dir,
brt_compare_func bt_compare,
brt_compare_func dup_compare,
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del,
size_t cachetable_size);
// Effect: Check the tokudb logs to determine whether or not we need to run recovery.
// If the log is empty or if there is a clean shutdown at the end of the log, then we
......
......@@ -35,7 +35,7 @@ int recovery_main (int argc, const char *argv[]) {
return(1);
}
int r = tokudb_recover(data_dir, log_dir, 0, 0, 0);
int r = tokudb_recover(data_dir, log_dir, NULL, NULL, NULL, NULL, NULL, NULL, 0);
if (r!=0) {
fprintf(stderr, "Recovery failed\n");
return(1);
......
......@@ -40,7 +40,7 @@ run_test(void) {
r = close(devnul); assert(r==0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
assert(r == 0);
return 0;
......
......@@ -25,7 +25,7 @@ run_test(void) {
r = toku_logger_close(&logger); assert(r == 0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
assert(r == 0);
return 0;
}
......
......@@ -27,7 +27,7 @@ run_test(void) {
r = dup2(devnul, fileno(stderr)); assert(r==fileno(stderr));
r = close(devnul); assert(r==0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
assert(r == 0);
return 0;
}
......
......@@ -20,7 +20,7 @@ run_test(void) {
r = toku_logger_close(&logger); assert(r == 0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
assert(r == DB_RUNRECOVERY);
return 0;
......
......@@ -34,7 +34,7 @@ run_test(void) {
r = close(devnul); assert(r==0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
assert(r == 0);
return 0;
}
......
......@@ -34,7 +34,7 @@ run_test(void) {
r = close(devnul); assert(r==0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
assert(r == 0);
return 0;
}
......
......@@ -28,7 +28,7 @@ run_test(void) {
r = close(devnul); assert(r==0);
// run recovery
r = tokudb_recover("/junk", TESTDIR, 0, 0, 0);
r = tokudb_recover("/junk", TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
assert(r != 0);
return 0;
}
......
......@@ -20,7 +20,7 @@ run_test(void) {
r = close(devnul); assert(r==0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
assert(r != 0);
return 0;
}
......
......@@ -14,7 +14,7 @@ run_test(void) {
r = toku_os_mkdir(TESTDIR, S_IRWXU); assert(r == 0);
// run recovery
r = tokudb_recover(NULL, NULL, 0, 0, 0);
r = tokudb_recover(NULL, NULL, 0, 0, NULL, NULL, NULL, NULL, 0);
assert(r != 0);
return 0;
}
......
......@@ -193,4 +193,21 @@ static inline void wbuf_FILENUM (struct wbuf *w, FILENUM fileid) {
wbuf_uint(w, fileid.fileid);
}
static inline void wbuf_nocrc_FILENUMS (struct wbuf *w, FILENUMS v) {
wbuf_nocrc_uint(w, v.num);
uint32_t i;
for (i = 0; i < v.num; i++) {
wbuf_nocrc_FILENUM(w, v.filenums[i]);
}
}
static inline void wbuf_FILENUMS (struct wbuf *w, FILENUMS v) {
wbuf_uint(w, v.num);
uint32_t i;
for (i = 0; i < v.num; i++) {
wbuf_FILENUM(w, v.filenums[i]);
}
}
#endif
......@@ -2,6 +2,7 @@
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
// Define BDB if you want to compile this to use Berkeley DB
#include <inttypes.h>
#ifdef BDB
#include <db.h>
#define DIRSUF bdb
......@@ -10,7 +11,6 @@
#define DIRSUF tokudb
#endif
#include <inttypes.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
......
......@@ -125,6 +125,10 @@ BDB_DONTRUN_TESTS = \
recover-delboth-after-checkpoint \
filesize \
isolation \
env-put-multiple \
recover-lsn-filter-multiple \
recover-put-multiple-fdelete-all \
recover-put-multiple-fdelete-some \
#\ ends prev line
# checkpoint tests depend on this header file,
......
// this test makes sure the LSN filtering is used during recovery of put_multiple
#include <sys/stat.h>
#include <fcntl.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
enum {MAX_DBS = 64, MAX_KEY = 8, MAX_VAL = 8};
DB *dbs_multiple[MAX_DBS];
DB *dbs_single[MAX_DBS];
char names_single[MAX_DBS][sizeof("dbs_0xFFF")];
char names_multiple[MAX_DBS][sizeof("dbm_0xFFF")];
uint32_t num_dbs;
uint32_t flags[MAX_DBS];
uint32_t kbuf[MAX_DBS][MAX_KEY/4];
uint32_t vbuf[MAX_DBS][MAX_VAL/4];
#define CKERRIFNOT0(r) do { if (num_dbs>0) { CKERR(r); } else { CKERR2(r, EINVAL); } } while (0)
#define CKERR2IFNOT0(r, rexpect) do { if (num_dbs>0) { CKERR2(r, rexpect); } else { CKERR2(r, EINVAL); } } while (0)
static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) {
assert(num_dbs_in > 0);
assert(num_dbs_in == num_dbs);
assert(extra==&num_dbs); //Verifying extra gets set right.
assert(row->size == 4);
uint32_t which;
for (which = 0; which < num_dbs_in; which++) {
kbuf[which][0] = *(uint32_t*)row->data;
kbuf[which][1] = which;
vbuf[which][0] = which;
vbuf[which][1] = *(uint32_t*)row->data;
keys[which].data = kbuf[which];
keys[which].size = sizeof(kbuf[which]);
vals[which].data = vbuf[which];
vals[which].size = sizeof(vbuf[which]);
}
return 0;
}
static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) {
assert(extra==&num_dbs); //Verifying extra gets set right.
return 0;
}
static void run_test (void) {
int r;
if (verbose)
printf("env-put-multiple num_dbs[%u]\n", num_dbs);
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
uint32_t which;
{
//Create dbs.
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
DB *db;
for (which = 0; which < num_dbs; which++) {
r = db_create(&dbs_multiple[which], env, 0);
CKERR(r);
db = dbs_multiple[which];
r = db->open(db, txn, names_multiple[which], NULL, DB_BTREE, DB_CREATE, 0666);
CKERR(r);
r = db_create(&dbs_single[which], env, 0);
CKERR(r);
db = dbs_single[which];
r = db->open(db, txn, names_single[which], NULL, DB_BTREE, DB_CREATE, 0666);
CKERR(r);
}
r = txn->commit(txn, 0);
}
uint32_t magic = 0xDEADBEEF;
// txn_begin; insert magic number
{
for (which = 0; which < num_dbs; which++) {
flags[which] = 0;
}
memset(flags, 0, sizeof(flags)); //reset
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
DBT val={.data = vbuf[which], .size = sizeof(vbuf[which])};
DB *db = dbs_single[which];
r = db->put(db, txn, &key, &val, flags[which]);
CKERR(r);
}
r = txn->commit(txn, 0);
}
{
//Insert again with DB_YESOVERWRITE, expect it to work.
for (which = 0; which < num_dbs; which++) {
flags[which] = DB_YESOVERWRITE;
}
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
DBT val={.data = vbuf[which], .size = sizeof(vbuf[which])};
DB *db = dbs_single[which];
r = db->put(db, txn, &key, &val, flags[which]);
CKERR(r);
}
r = txn->commit(txn, 0);
}
{
//Insert again with DB_NOOVERWRITE, expect it to fail (unless 0 dbs).
for (which = 0; which < num_dbs; which++) {
flags[which] = DB_NOOVERWRITE;
}
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
CKERR2IFNOT0(r, DB_KEYEXIST);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
DBT val={.data = vbuf[which], .size = sizeof(vbuf[which])};
DB *db = dbs_single[which];
r = db->put(db, txn, &key, &val, flags[which]);
CKERR2(r, DB_KEYEXIST);
}
r = txn->commit(txn, 0);
}
{
//Different number
magic = 0xFEEDADAD;
//Insert again with DB_YESOVERWRITE, using 2 transactions, expect it to fail (unless 0 dbs).
for (which = 0; which < num_dbs; which++) {
flags[which] = DB_YESOVERWRITE;
}
DB_TXN *txna;
r = env->txn_begin(env, NULL, &txna, 0);
CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)};
r = env->put_multiple(env, txna, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
DBT val={.data = vbuf[which], .size = sizeof(vbuf[which])};
DB *db = dbs_single[which];
r = db->put(db, txna, &key, &val, flags[which]);
CKERR(r);
}
DB_TXN *txnb;
r = env->txn_begin(env, NULL, &txnb, 0);
CKERR(r);
//Lock should fail
r = env->put_multiple(env, txnb, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
CKERR2IFNOT0(r, DB_LOCK_NOTGRANTED);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
DBT val={.data = vbuf[which], .size = sizeof(vbuf[which])};
DB *db = dbs_single[which];
r = db->put(db, txnb, &key, &val, flags[which]);
CKERR2(r, DB_LOCK_NOTGRANTED);
}
r = txna->commit(txna, 0);
//Should succeed this time.
r = env->put_multiple(env, txnb, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
DBT val={.data = vbuf[which], .size = sizeof(vbuf[which])};
DB *db = dbs_single[which];
r = db->put(db, txnb, &key, &val, flags[which]);
CKERR(r);
}
r = txnb->commit(txnb, 0);
}
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
DBC *c_single;
DBC *c_multiple;
DBT k_single, v_single, k_multiple, v_multiple;
memset(&k_single, 0, sizeof(k_single));
memset(&v_single, 0, sizeof(v_single));
memset(&k_multiple, 0, sizeof(k_multiple));
memset(&v_multiple, 0, sizeof(v_multiple));
for (which = 0; which < num_dbs; which++) {
r = dbs_multiple[which]->cursor(dbs_multiple[which], txn, &c_multiple, 0);
CKERR(r);
r = dbs_single[which]->cursor(dbs_single[which], txn, &c_single, 0);
CKERR(r);
int r1 = 0;
int r2;
while (r1 == 0) {
r1 = c_single->c_get(c_single, &k_single, &v_single, DB_NEXT);
r2 = c_multiple->c_get(c_multiple, &k_multiple, &v_multiple, DB_NEXT);
assert(r1==r2);
CKERR2s(r1, 0, DB_NOTFOUND);
if (r1 == 0) {
assert(k_single.size == k_multiple.size);
assert(v_single.size == v_multiple.size);
assert(memcmp(k_single.data, k_multiple.data, k_single.size) == 0);
assert(memcmp(v_single.data, v_multiple.data, v_single.size) == 0);
}
}
r = c_single->c_close(c_single);
CKERR(r);
r = c_multiple->c_close(c_multiple);
CKERR(r);
}
r = txn->commit(txn, 0);
}
{
for (which = 0; which < num_dbs; which++) {
r = dbs_single[which]->close(dbs_single[which], 0);
CKERR(r);
r = dbs_multiple[which]->close(dbs_multiple[which], 0);
CKERR(r);
}
}
r = env->close(env, 0);
CKERR(r);
}
int test_main (int argc, char *argv[]) {
parse_args(argc, argv);
uint32_t which;
for (which = 0; which < MAX_DBS; which++) {
sprintf(names_multiple[which], "dbm_0x%02X", which);
sprintf(names_single[which], "dbs_0x%02X", which);
}
for (num_dbs = 0; num_dbs < 4; num_dbs++) {
run_test();
}
for (num_dbs = 4; num_dbs <= MAX_DBS; num_dbs *= 2) {
run_test();
}
return 0;
}
// this test makes sure the LSN filtering is used during recovery of put_multiple
#include <sys/stat.h>
#include <fcntl.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
char *namea="a.db";
char *nameb="b.db";
enum {num_dbs = 2};
BOOL do_test=FALSE, do_recover=FALSE;
static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) {
assert((int)num_dbs_in == num_dbs);
if (do_recover)
assert(extra==NULL);
else
assert(extra==&namea); //Verifying extra gets set right.
assert(row->size >= 4);
int32_t keysize = *(int32_t*)row->data;
assert((int)row->size >= 4+keysize);
int32_t valsize = row->size - 4 - keysize;
void *key = row->data+4;
void *val = row->data+4 + keysize;
int which;
for (which = 0; which < num_dbs; which++) {
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
}
static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) {
if (do_recover)
assert(extra==NULL);
else
assert(extra==&namea); //Verifying extra gets set right.
return 0;
}
static void run_test (void) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
// create a txn that never closes, forcing recovery to run from the beginning of the log
{
DB_TXN *oldest_living_txn;
r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r);
}
DB *dba;
DB *dbb;
r = db_create(&dba, env, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
DB *dbs[num_dbs] = {dba, dbb};
uint32_t flags[num_dbs] = {DB_YESOVERWRITE, DB_YESOVERWRITE};
// txn_begin; insert <a,a>; txn_abort
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = txn->abort(txn); CKERR(r);
}
r = dbb->close(dbb, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
dbs[1] = dbb;
// txn_begin; insert <a,b>;
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="b", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
}
// checkpoint
r = env->txn_checkpoint(env, 0, 0, 0); CKERR(r);
// abort the process
toku_hard_crash_on_purpose();
}
static void run_recover (void) {
DB_ENV *env;
int r;
// Recovery starts from oldest_living_txn, which is older than any inserts done in run_test,
// so recovery always runs over the entire log.
// run recovery
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
// verify the data
{
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); CKERR(r);
DBT k, v;
r = cursor->c_get(cursor, dbt_init_malloc(&k), dbt_init_malloc(&v), DB_FIRST);
assert(r == DB_NOTFOUND);
r = cursor->c_close(cursor); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
r = db->close(db, 0); CKERR(r);
}
{
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); CKERR(r);
DBT k, v;
r = cursor->c_get(cursor, dbt_init_malloc(&k), dbt_init_malloc(&v), DB_FIRST);
assert(r == DB_NOTFOUND);
r = cursor->c_close(cursor); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
r = db->close(db, 0); CKERR(r);
}
r = env->close(env, 0); CKERR(r);
exit(0);
}
const char *cmd;
static void test_parse_args (int argc, char *argv[]) {
int resultcode;
cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v") == 0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "--test")==0) {
do_test=TRUE;
} else if (strcmp(argv[0], "--recover") == 0) {
do_recover=TRUE;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--test | --recover } \n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int test_main (int argc, char *argv[]) {
test_parse_args(argc, argv);
if (do_test) {
run_test();
} else if (do_recover) {
run_recover();
}
return 0;
}
// this test makes sure the LSN filtering is used during recovery of put_multiple
#include <sys/stat.h>
#include <fcntl.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
char *namea="a.db";
char *nameb="b.db";
enum {num_dbs = 2};
BOOL do_test=FALSE, do_recover=FALSE;
static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) {
assert(num_dbs_in > 0);
if (do_recover)
assert(extra==NULL);
else
assert(extra==&namea); //Verifying extra gets set right.
assert(row->size >= 4);
int32_t keysize = *(int32_t*)row->data;
assert((int)row->size >= 4+keysize);
int32_t valsize = row->size - 4 - keysize;
void *key = row->data+4;
void *val = row->data+4 + keysize;
uint32_t which;
for (which = 0; which < num_dbs_in; which++) {
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
}
static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) {
if (do_recover)
assert(extra==NULL);
else
assert(extra==&namea); //Verifying extra gets set right.
return 0;
}
static void run_test (void) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
// create a txn that never closes, forcing recovery to run from the beginning of the log
{
DB_TXN *oldest_living_txn;
r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r);
}
DB *dba;
DB *dbb;
r = db_create(&dba, env, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
DB *dbs[num_dbs] = {dba, dbb};
uint32_t flags[num_dbs] = {DB_YESOVERWRITE, DB_YESOVERWRITE};
// txn_begin; insert <a,a>; txn_abort
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = txn->abort(txn); CKERR(r);
}
r = dbb->close(dbb, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
dbs[1] = dbb;
// txn_begin; insert <a,b>;
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="b", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = dba->close(dba, 0); CKERR(r);
r = env->dbremove(env, txn, namea, NULL, 0); CKERR(r);
r = dba->close(dbb, 0); CKERR(r);
r = env->dbremove(env, txn, nameb, NULL, 0); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
r = env->log_flush(env, NULL); CKERR(r);
// abort the process
toku_hard_crash_on_purpose();
}
static void run_recover (void) {
DB_ENV *env;
int r;
// Recovery starts from oldest_living_txn, which is older than any inserts done in run_test,
// so recovery always runs over the entire log.
// run recovery
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
// verify the data
{
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR2(r, ENOENT);
r = db->close(db, 0); CKERR(r);
}
{
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR2(r, ENOENT);
r = db->close(db, 0); CKERR(r);
}
r = env->close(env, 0); CKERR(r);
exit(0);
}
const char *cmd;
static void test_parse_args (int argc, char *argv[]) {
int resultcode;
cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v") == 0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "--test")==0) {
do_test=TRUE;
} else if (strcmp(argv[0], "--recover") == 0) {
do_recover=TRUE;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--test | --recover } \n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int test_main (int argc, char *argv[]) {
test_parse_args(argc, argv);
if (do_test) {
run_test();
} else if (do_recover) {
run_recover();
}
return 0;
}
// this test makes sure the LSN filtering is used during recovery of put_multiple
#include <sys/stat.h>
#include <fcntl.h>
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
char *namea="a.db";
char *nameb="b.db";
enum {num_dbs = 2};
BOOL do_test=FALSE, do_recover=FALSE;
static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) {
assert(num_dbs_in > 0);
if (do_recover)
assert(extra==NULL);
else
assert(extra==&namea); //Verifying extra gets set right.
assert(row->size >= 4);
int32_t keysize = *(int32_t*)row->data;
assert((int)row->size >= 4+keysize);
int32_t valsize = row->size - 4 - keysize;
void *key = row->data+4;
void *val = row->data+4 + keysize;
uint32_t which;
for (which = 0; which < num_dbs_in; which++) {
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
}
static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) {
if (do_recover)
assert(extra==NULL);
else
assert(extra==&namea); //Verifying extra gets set right.
return 0;
}
static void run_test (void) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
// create a txn that never closes, forcing recovery to run from the beginning of the log
{
DB_TXN *oldest_living_txn;
r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r);
}
DB *dba;
DB *dbb;
r = db_create(&dba, env, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
DB *dbs[num_dbs] = {dba, dbb};
uint32_t flags[num_dbs] = {DB_YESOVERWRITE, DB_YESOVERWRITE};
// txn_begin; insert <a,a>; txn_abort
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = txn->abort(txn); CKERR(r);
}
r = dbb->close(dbb, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
dbs[1] = dbb;
// txn_begin; insert <a,b>;
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="b", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = dba->close(dba, 0); CKERR(r);
r = env->dbremove(env, txn, namea, NULL, 0); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
r = env->log_flush(env, NULL); CKERR(r);
// abort the process
toku_hard_crash_on_purpose();
}
static void run_recover (void) {
DB_ENV *env;
int r;
// Recovery starts from oldest_living_txn, which is older than any inserts done in run_test,
// so recovery always runs over the entire log.
// run recovery
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
// verify the data
{
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR2(r, ENOENT);
r = db->close(db, 0); CKERR(r);
}
{
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); CKERR(r);
DBT k, v;
r = cursor->c_get(cursor, dbt_init_malloc(&k), dbt_init_malloc(&v), DB_FIRST);
CKERR(r);
assert(k.size == 2);
assert(v.size == 2);
assert(memcmp(k.data, "a", 2) == 0);
assert(memcmp(v.data, "b", 2) == 0);
r = cursor->c_close(cursor); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
r = db->close(db, 0); CKERR(r);
}
r = env->close(env, 0); CKERR(r);
exit(0);
}
const char *cmd;
static void test_parse_args (int argc, char *argv[]) {
int resultcode;
cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-v") == 0) {
verbose++;
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "--test")==0) {
do_test=TRUE;
} else if (strcmp(argv[0], "--recover") == 0) {
do_recover=TRUE;
} else if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s [-v|-q]* [-h] {--test | --recover } \n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
int test_main (int argc, char *argv[]) {
test_parse_args(argc, argv);
if (do_test) {
run_test();
} else if (do_recover) {
run_recover();
}
return 0;
}
......@@ -55,6 +55,10 @@ struct __toku_db_env_internal {
char *data_dir;
int (*bt_compare) (DB *, const DBT *, const DBT *);
int (*dup_compare) (DB *, const DBT *, const DBT *);
generate_keys_vals_for_put_func generate_keys_vals_for_put;
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put;
generate_keys_for_del_func generate_keys_for_del;
cleanup_keys_for_del_func cleanup_keys_for_del;
//void (*noticecall)(DB_ENV *, db_notices);
unsigned long cachetable_size;
CACHETABLE cachetable;
......
......@@ -337,7 +337,10 @@ ydb_do_recovery (DB_ENV *env) {
logdir = toku_strdup(env->i->dir);
}
toku_ydb_unlock();
int r = tokudb_recover(envdir, logdir, env->i->bt_compare, env->i->dup_compare, env->i->cachetable_size);
int r = tokudb_recover(envdir, logdir, env->i->bt_compare, env->i->dup_compare,
env->i->generate_keys_vals_for_put, env->i->cleanup_keys_vals_for_put,
env->i->generate_keys_for_del, env->i->cleanup_keys_for_del,
env->i->cachetable_size);
toku_ydb_lock();
toku_free(logdir);
return r;
......@@ -1222,6 +1225,60 @@ locked_env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const D
return r;
}
static int
env_set_multiple_callbacks(DB_ENV *env,
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (env_opened(env)) r = EINVAL;
else {
env->i->generate_keys_vals_for_put = generate_keys_vals_for_put;
env->i->cleanup_keys_vals_for_put = cleanup_keys_vals_for_put;
env->i->generate_keys_for_del = generate_keys_for_del;
env->i->cleanup_keys_for_del = cleanup_keys_for_del;
}
return r;
}
static int
locked_env_set_multiple_callbacks(DB_ENV *env,
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del) {
toku_ydb_lock();
int r = env_set_multiple_callbacks(env,
generate_keys_vals_for_put,
cleanup_keys_vals_for_put,
generate_keys_for_del,
cleanup_keys_for_del);
toku_ydb_unlock();
return r;
}
static int env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra);
static int env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra);
static int
locked_env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) {
toku_ydb_lock();
int r = env_put_multiple(env, txn, row, num_dbs, dbs, flags, extra);
toku_ydb_unlock();
return r;
}
static int
locked_env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) {
toku_ydb_lock();
int r = env_del_multiple(env, txn, row, num_dbs, dbs, flags, extra);
toku_ydb_unlock();
return r;
}
static void
format_time(const time_t *timer, char *buf) {
ctime_r(timer, buf);
......@@ -1413,6 +1470,9 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
SENV(dbrename);
SENV(set_default_bt_compare);
SENV(set_default_dup_compare);
SENV(set_multiple_callbacks);
SENV(put_multiple);
SENV(del_multiple);
SENV(checkpointing_set_period);
SENV(checkpointing_get_period);
result->checkpointing_postpone = env_checkpointing_postpone;
......@@ -3576,6 +3636,79 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
return r;
}
static int
env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) {
int r;
int generated_dbts = 0;
DBT keydbts[num_dbs];
uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs];
BRT brts[num_dbs];
if (!txn || !num_dbs) {
r = EINVAL;
goto cleanup;
}
if (!env->i->generate_keys_for_del || !env->i->cleanup_keys_for_del) {
r = EINVAL;
goto cleanup;
}
memset(keydbts, 0, sizeof(keydbts));
//Generate all the DBTs
r = env->i->generate_keys_for_del(row, num_dbs, dbs, keydbts, extra);
if (r!=0) goto cleanup;
generated_dbts = 1;
uint32_t which_db;
for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db];
lock_flags[which_db] = get_prelocked_flags(flags[which_db], txn, db);
remaining_flags[which_db] = flags[which_db] & ~lock_flags[which_db];
if (remaining_flags[which_db] & ~DB_DELETE_ANY) {
r = EINVAL;
goto cleanup;
}
BOOL error_if_missing = (BOOL)(!(remaining_flags[which_db]&DB_DELETE_ANY));
if (error_if_missing) {
//Check if the key exists in the db.
r = db_getf_set(db, txn, lock_flags[which_db], &keydbts[which_db], ydb_getf_do_nothing, NULL);
if (r!=0) goto cleanup;
}
//Check overwrite constraints
if (r!=0) goto cleanup;
//Do locking if necessary.
if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
//Needs locking
RANGE_LOCK_REQUEST_S request;
//Left end of range == right end of range (point lock)
write_lock_request_init(&request, txn, db,
&keydbts[which_db], toku_lt_neg_infinity,
&keydbts[which_db], toku_lt_infinity);
r = grab_range_lock(&request);
if (r!=0) goto cleanup;
}
brts[which_db] = db->i->brt;
}
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
r = toku_brt_log_del_multiple(ttxn, brts, num_dbs, row);
if (r!=0) goto cleanup;
for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db];
r = toku_brt_maybe_delete(db->i->brt, &keydbts[which_db], ttxn, FALSE, ZERO_LSN, FALSE);
if (r!=0) goto cleanup;
}
cleanup:
if (generated_dbts) {
int r2 = env->i->cleanup_keys_for_del(row, num_dbs, dbs, keydbts, extra);
if (r==0) r = r2;
}
return r;
}
static int locked_c_get(DBC * c, DBT * key, DBT * data, u_int32_t flag) {
//{ unsigned int i; printf("cget flags=%d keylen=%d key={", flag, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", data->size); for(i=0; i<data->size; i++) printf("%d,", ((char*)data->data)[i]); printf("}\n"); }
toku_ydb_lock(); int r = toku_c_get(c, key, data, flag); toku_ydb_unlock();
......@@ -4157,6 +4290,71 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) {
return r;
}
static int
env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) {
int r;
int generated_dbts = 0;
DBT keydbts[num_dbs], valdbts[num_dbs];
uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs];
BRT brts[num_dbs];
if (!txn || !num_dbs) {
r = EINVAL;
goto cleanup;
}
if (!env->i->generate_keys_vals_for_put || !env->i->cleanup_keys_vals_for_put) {
r = EINVAL;
goto cleanup;
}
memset(keydbts, 0, sizeof(keydbts));
memset(valdbts, 0, sizeof(valdbts));
//Generate all the DBTs
r = env->i->generate_keys_vals_for_put(row, num_dbs, dbs, keydbts, valdbts, extra);
if (r!=0) goto cleanup;
generated_dbts = 1;
uint32_t which_db;
for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db];
lock_flags[which_db] = get_prelocked_flags(flags[which_db], txn, db);
remaining_flags[which_db] = flags[which_db] & ~lock_flags[which_db];
//Check overwrite constraints
r = db_put_check_overwrite_constraint(db, txn,
&keydbts[which_db], &valdbts[which_db],
lock_flags[which_db], remaining_flags[which_db]);
if (r!=0) goto cleanup;
//Do locking if necessary.
if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
//Needs locking
RANGE_LOCK_REQUEST_S request;
//Left end of range == right end of range (point lock)
write_lock_request_init(&request, txn, db,
&keydbts[which_db], &valdbts[which_db],
&keydbts[which_db], &valdbts[which_db]);
r = grab_range_lock(&request);
if (r!=0) goto cleanup;
}
brts[which_db] = db->i->brt;
}
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
r = toku_brt_log_put_multiple(ttxn, brts, num_dbs, row);
if (r!=0) goto cleanup;
for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db];
r = toku_brt_maybe_insert(db->i->brt, &keydbts[which_db], &valdbts[which_db], ttxn, FALSE, ZERO_LSN, FALSE);
if (r!=0) goto cleanup;
}
cleanup:
if (generated_dbts) {
int r2 = env->i->cleanup_keys_vals_for_put(row, num_dbs, dbs, keydbts, valdbts, extra);
if (r==0) r = r2;
}
return r;
}
static int toku_db_remove(DB * db, const char *fname, const char *dbname, u_int32_t flags);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment