Commit 56d2c6fd authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge 2216 onto main (from 16706-17293) as

{{{
svn merge -r16706:17293 https://svn.tokutek.com/tokudb/toku/tokudb.2216
}}}

Refs #2216. [t:2216]


git-svn-id: file:///svn/toku/tokudb@17299 c7de825b-a66e-492c-adef-691d508d4ae1
parent ae499b39
......@@ -31,7 +31,7 @@ runs_installed: sample_offsets
./sample_offsets
sample_offsets_mysql: CPPFLAGS+=$(MYSQL_H)
sample_offsets_mysql: sample_offsets.c
$(CC) $(CFLAGS) $(CPPFLAGS) $< -o $@
$(CC) $(CFLAGS) -I../toku_include $(CPPFLAGS) $< -o $@
make_db_h_4_1.o: make_db_h.c sample_offsets_32_4_1.h sample_offsets_64_4_1.h
$(CC) $(CFLAGS) $(CPPFLAGS) -I$(BDBDIR)/db-4.1.25/build_unix $< -c -o $@ -DUSE_MAJOR=4 -DUSE_MINOR=1 -DTDB_NATIVE=0
......@@ -78,7 +78,7 @@ make_tdb_h.o: make_db_h.c
tdb.h: make_tdb_h
./make_tdb_h > $@
sample_offsets:
sample_offsets: CPPFLAGS+=-I../toku_include
sample_offsets_local: ./db.h
sample_offsets_local: sample_offsets.c
$(CC) $(CFLAGS) $(CPPFLAGS) -DLOCAL $< -o $@
......
......@@ -46,6 +46,15 @@ typedef struct __toku_db_btree_stat64 {
u_int64_t bt_dsize; /* how big are the keys+values (not counting the lengths) (an estimate, unless flattened) */
u_int64_t bt_fsize; /* how big is the underlying file */
} DB_BTREE_STAT64;
typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_duplicate_callback)(DB_LOADER *loader, void (*duplicate)(DB *db, int i, DBT *key, DBT *val)); /* set the duplicate callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
};
typedef struct __toku_engine_status {
char now[26]; /* time of engine status query (i.e. now) */
u_int64_t ydb_lock_ctr; /* how many times has ydb lock been taken/released */
......@@ -196,6 +205,7 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t flags[/*N*/], uint32_t dbt_flags[/*N*/], void *extra);
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
......@@ -204,7 +214,7 @@ struct __toku_db_env {
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *extra));
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
......@@ -213,8 +223,8 @@ struct __toku_db_env {
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[20];
void *extra));
void* __toku_dummy0[19];
char __toku_dummy1[64];
void *api1_internal; /* 32-bit offset=212 size=4, 64=bit offset=360 size=8 */
void* __toku_dummy2[7];
......
......@@ -46,6 +46,15 @@ typedef struct __toku_db_btree_stat64 {
u_int64_t bt_dsize; /* how big are the keys+values (not counting the lengths) (an estimate, unless flattened) */
u_int64_t bt_fsize; /* how big is the underlying file */
} DB_BTREE_STAT64;
typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_duplicate_callback)(DB_LOADER *loader, void (*duplicate)(DB *db, int i, DBT *key, DBT *val)); /* set the duplicate callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
};
typedef struct __toku_engine_status {
char now[26]; /* time of engine status query (i.e. now) */
u_int64_t ydb_lock_ctr; /* how many times has ydb lock been taken/released */
......@@ -198,6 +207,7 @@ struct __toku_db_env {
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t flags[/*N*/], uint32_t dbt_flags[/*N*/], void *extra);
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
......@@ -206,7 +216,7 @@ struct __toku_db_env {
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *extra));
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
......@@ -215,8 +225,8 @@ struct __toku_db_env {
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[20];
void *extra));
void* __toku_dummy0[19];
char __toku_dummy1[96];
void *api1_internal; /* 32-bit offset=244 size=4, 64=bit offset=392 size=8 */
void* __toku_dummy2[7];
......
......@@ -46,6 +46,15 @@ typedef struct __toku_db_btree_stat64 {
u_int64_t bt_dsize; /* how big are the keys+values (not counting the lengths) (an estimate, unless flattened) */
u_int64_t bt_fsize; /* how big is the underlying file */
} DB_BTREE_STAT64;
typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_duplicate_callback)(DB_LOADER *loader, void (*duplicate)(DB *db, int i, DBT *key, DBT *val)); /* set the duplicate callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
};
typedef struct __toku_engine_status {
char now[26]; /* time of engine status query (i.e. now) */
u_int64_t ydb_lock_ctr; /* how many times has ydb lock been taken/released */
......@@ -199,6 +208,7 @@ struct __toku_db_env {
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t flags[/*N*/], uint32_t dbt_flags[/*N*/], void *extra);
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
......@@ -207,7 +217,7 @@ struct __toku_db_env {
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *extra));
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
......@@ -216,8 +226,8 @@ struct __toku_db_env {
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[35];
void *extra));
void* __toku_dummy0[34];
char __toku_dummy1[128];
void *api1_internal; /* 32-bit offset=336 size=4, 64=bit offset=544 size=8 */
void* __toku_dummy2[7];
......
......@@ -46,6 +46,15 @@ typedef struct __toku_db_btree_stat64 {
u_int64_t bt_dsize; /* how big are the keys+values (not counting the lengths) (an estimate, unless flattened) */
u_int64_t bt_fsize; /* how big is the underlying file */
} DB_BTREE_STAT64;
typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_duplicate_callback)(DB_LOADER *loader, void (*duplicate)(DB *db, int i, DBT *key, DBT *val)); /* set the duplicate callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
};
typedef struct __toku_engine_status {
char now[26]; /* time of engine status query (i.e. now) */
u_int64_t ydb_lock_ctr; /* how many times has ydb lock been taken/released */
......@@ -198,16 +207,17 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t flags[/*N*/], uint32_t dbt_flags[/*N*/], void *extra);
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *extra));
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
......@@ -216,8 +226,8 @@ struct __toku_db_env {
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[35];
void *extra));
void* __toku_dummy0[34];
char __toku_dummy1[128];
void *api1_internal; /* 32-bit offset=336 size=4, 64=bit offset=544 size=8 */
void* __toku_dummy2[8];
......
......@@ -46,6 +46,15 @@ typedef struct __toku_db_btree_stat64 {
u_int64_t bt_dsize; /* how big are the keys+values (not counting the lengths) (an estimate, unless flattened) */
u_int64_t bt_fsize; /* how big is the underlying file */
} DB_BTREE_STAT64;
typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_duplicate_callback)(DB_LOADER *loader, void (*duplicate)(DB *db, int i, DBT *key, DBT *val)); /* set the duplicate callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
};
typedef struct __toku_engine_status {
char now[26]; /* time of engine status query (i.e. now) */
u_int64_t ydb_lock_ctr; /* how many times has ydb lock been taken/released */
......@@ -200,16 +209,17 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t flags[/*N*/], uint32_t dbt_flags[/*N*/], void *extra);
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *extra));
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
......@@ -218,8 +228,8 @@ struct __toku_db_env {
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[36];
void *extra));
void* __toku_dummy0[35];
char __toku_dummy1[144];
void *api1_internal; /* 32-bit offset=356 size=4, 64=bit offset=568 size=8 */
void* __toku_dummy2[8];
......
......@@ -375,6 +375,17 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
printf(" u_int64_t bt_fsize; /* how big is the underlying file */\n");
printf("} DB_BTREE_STAT64;\n");
//bulk loader
printf("typedef struct __toku_loader DB_LOADER;\n");
printf("struct __toku_loader_internal;\n");
printf("struct __toku_loader {\n");
printf(" struct __toku_loader_internal *i;\n");
printf(" int (*set_duplicate_callback)(DB_LOADER *loader, void (*duplicate)(DB *db, int i, DBT *key, DBT *val)); /* set the duplicate callback */\n");
printf(" int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */\n");
printf(" int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */\n");
printf(" int (*close)(DB_LOADER *loader); /* finish loading, free memory */\n");
printf("};\n");
//engine status info
printf("typedef struct __toku_engine_status {\n");
printf(" char now[26]; /* time of engine status query (i.e. now) */ \n");
......@@ -450,6 +461,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
"int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */",
"int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */",
"int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */",
"int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t flags[/*N*/], uint32_t dbt_flags[/*N*/], void *extra)",
"int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
" const DBT *key, const DBT *val,\n"
" uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,\n"
......@@ -458,7 +470,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
" int (*generate_row_for_put)(DB *dest_db, DB *src_db,\n"
" DBT *dest_key, DBT *dest_val,\n"
" const DBT *src_key, const DBT *src_val,\n"
" void *extra));",
" void *extra))",
"int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
" const DBT *key, const DBT *val,\n"
" uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,\n"
......@@ -467,7 +479,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
" int (*generate_row_for_del)(DB *dest_db, DB *src_db,\n"
" DBT *dest_key,\n"
" const DBT *src_key, const DBT *src_val,\n"
" void *extra));",
" void *extra))",
NULL};
print_struct("db_env", 1, db_env_fields32, db_env_fields64, sizeof(db_env_fields32)/sizeof(db_env_fields32[0]), extra);
}
......
......@@ -46,6 +46,15 @@ typedef struct __toku_db_btree_stat64 {
u_int64_t bt_dsize; /* how big are the keys+values (not counting the lengths) (an estimate, unless flattened) */
u_int64_t bt_fsize; /* how big is the underlying file */
} DB_BTREE_STAT64;
typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_duplicate_callback)(DB_LOADER *loader, void (*duplicate)(DB *db, int i, DBT *key, DBT *val)); /* set the duplicate callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
};
typedef struct __toku_engine_status {
char now[26]; /* time of engine status query (i.e. now) */
u_int64_t ydb_lock_ctr; /* how many times has ydb lock been taken/released */
......@@ -200,16 +209,17 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t flags[/*N*/], uint32_t dbt_flags[/*N*/], void *extra);
void *app_private;
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private;
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *extra));
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
......@@ -218,7 +228,7 @@ struct __toku_db_env {
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *extra));
void *api1_internal;
int (*close) (DB_ENV *, u_int32_t);
int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t);
......
......@@ -46,6 +46,15 @@ typedef struct __toku_db_btree_stat64 {
u_int64_t bt_dsize; /* how big are the keys+values (not counting the lengths) (an estimate, unless flattened) */
u_int64_t bt_fsize; /* how big is the underlying file */
} DB_BTREE_STAT64;
typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_duplicate_callback)(DB_LOADER *loader, void (*duplicate)(DB *db, int i, DBT *key, DBT *val)); /* set the duplicate callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
};
typedef struct __toku_engine_status {
char now[26]; /* time of engine status query (i.e. now) */
u_int64_t ydb_lock_ctr; /* how many times has ydb lock been taken/released */
......@@ -200,16 +209,17 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t flags[/*N*/], uint32_t dbt_flags[/*N*/], void *extra);
void *app_private;
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private;
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *extra));
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
......@@ -218,7 +228,7 @@ struct __toku_db_env {
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *extra));
void *api1_internal;
int (*close) (DB_ENV *, u_int32_t);
int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t);
......
......@@ -58,6 +58,7 @@ BRT_SOURCES = \
logcursor \
memarena \
mempool \
merger \
minicron \
omt \
recover \
......
......@@ -2893,8 +2893,7 @@ static int brt_create_file(BRT brt, const char *fname, int *fdp) {
}
// open a file for use by the brt. if the file does not exist, error
static int brt_open_file(BRT brt, const char *fname, int *fdp) {
brt = brt;
static int brt_open_file(const char *fname, int *fdp) {
mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
int r;
int fd;
......@@ -3099,7 +3098,7 @@ int toku_brt_open_recovery(BRT t, const char *fname, const char *fname_in_env, i
{
int fd = -1;
BOOL did_create = FALSE;
r = brt_open_file(t, fname, &fd);
r = brt_open_file(fname, &fd);
FILENUM reserved_filenum = t->filenum;
if (r==ENOENT && is_create) {
toku_cachetable_reserve_filenum(cachetable, &reserved_filenum, t->did_set_filenum, t->filenum);
......@@ -3235,6 +3234,27 @@ toku_brt_open(BRT t, const char *fname, const char *fname_in_env, int is_create,
return toku_brt_open_recovery(t, fname, fname_in_env, is_create, only_create, cachetable, txn, db, FALSE);
}
int toku_redirect_brt (const char *fname_in_env, BRT brt, TOKUTXN txn __attribute__((__unused__)))
// Effect: A BRT points to a file. Change it to point to a different file.
// The original file remains unchanged, but the posix file descriptor is closed. A new file descriptor is opened.
// The different file must be a valid BRT file.
// The block size and flags in the file must match the existing BRT.
// The new file must already have its descriptor in it (and it must match the existing descriptor).
// The new file will have the same filenum.
// This function should record a log entry for the redirect. (And should probably fsync the log after writing that entry.)
// If the txn aborts, then this operation will be undone
// Requires: Sufficient locks are held (e.g., the YDB monolithic lock, and possibly a dictionary lock) to prevent race conditions.
{
int fd=0;
int r;
r = brt_open_file(fname_in_env, &fd);
assert(r==0);
r = toku_cachetable_redirect(brt->cf, fd, fname_in_env);
assert(r = 0);
return 0;
}
int toku_brt_get_fd(BRT brt, int *fdp) {
*fdp = toku_cachefile_fd(brt->cf);
return 0;
......@@ -5047,7 +5067,8 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, bytevec lo
for (i=0; i<node->u.n.n_children; i++) {
fprintf(file, "%*schild %d\n", depth, "", i);
if (i>0) {
fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, (unsigned)toku_dtoh32(*(int*)&node->u.n.childkeys[i-1]->key));
char *key = node->u.n.childkeys[i-1]->key;
fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->u.n.childkeys[i-1]->keylen, (unsigned)toku_dtoh32(*(int*)key));
}
toku_dump_brtnode(file, brt, BNC_BLOCKNUM(node, i), depth+4,
(i==0) ? lorange : node->u.n.childkeys[i-1]->key,
......
......@@ -30,6 +30,9 @@ typedef int(*BRT_GET_STRADDLE_CALLBACK_FUNCTION)(ITEMLEN, bytevec, ITEMLEN, byte
int toku_open_brt (const char *fname, int is_create, BRT *, int nodesize, CACHETABLE, TOKUTXN, int(*)(DB*,const DBT*,const DBT*), DB*);
int toku_redirect_brt (const char *fname_in_env, BRT brt, TOKUTXN txn);
// See the brt.c file for what this toku_redirect_brt does
u_int32_t toku_serialize_descriptor_size(struct descriptor *desc);
int toku_brt_create(BRT *);
int toku_brt_set_flags(BRT, unsigned int flags);
......
#include <stdio.h>
#include <memory.h>
#include <errno.h>
#include <toku_assert.h>
#include <db.h>
#include "brttypes.h"
typedef struct brtloader_s *BRTLOADER;
int toku_brt_loader_open (BRTLOADER *bl, generate_row_for_put_func g, DB *src_db, int N, DB *dbs[], const char *temp_file_template);
int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val);
int toku_brt_loader_close (BRTLOADER bl);
struct brtloader_s {
int panic;
generate_row_for_put_func generate_row_for_put;
DB *src_db;
int N;
DB **dbs;
const char *temp_file_template;
FILE *fprimary_rows; char *fprimary_rows_name;
FILE *fprimary_idx; char *fprimary_idx_name;
u_int64_t fprimary_offset;
};
static int open_temp_file (BRTLOADER bl, FILE **filep, char **fnamep) {
char *fname = toku_strdup(bl->temp_file_template);
int fd = mkstemp(fname);
if (fd<0) { int r = errno; toku_free(fname); return r; }
FILE *f = fdopen(fd, "r+");
if (f==NULL) { int r = errno; toku_free(fname); close(fd); return r; }
*filep = f;
*fnamep = fname;
return 0;
}
int toku_brt_loader_open (BRTLOADER *blp, generate_row_for_put_func g, DB *src_db, int N, DB*dbs[], const char *temp_file_template) {
BRTLOADER MALLOC(bl);
bl->panic = 0;
bl->generate_row_for_put = g;
bl->src_db = src_db;
bl->N = N;
MALLOC_N(N, bl->dbs);
for (int i=0; i<N; i++) bl->dbs[i]=dbs[i];
bl->temp_file_template = toku_strdup(temp_file_template);
bl->fprimary_rows = bl->fprimary_idx = NULL;
{ int r = open_temp_file(bl, &bl->fprimary_rows, &bl->fprimary_rows_name); if (r!=0) return r; }
{ int r = open_temp_file(bl, &bl->fprimary_idx, &bl->fprimary_idx_name); if (r!=0) return r; }
bl->fprimary_offset = 0;
*blp = bl;
return 0;
}
#define handle_ferror(ok, file) if (!(ok)) { bl->panic=1; bl->panic_errno = ferror(file); return bl->panic_errno; }
static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl) {
size_t r = fwrite(ptr, size, nmemb, stream);
if (r!=nmemb) {
int e = ferror(stream);
assert(e!=0);
bl->panic = 1;
return e;
}
return 0;
}
static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream, BRTLOADER bl) {
size_t r = fread(ptr, size, nmemb, stream);
if (r==0) {
if (feof(stream)) return EOF;
else {
bl->panic=1;
return ferror(stream);
}
} else if (r<nmemb) {
bl->panic=1;
return ferror(stream);
} else {
return 0;
}
}
static int loader_write_row(DBT *key, DBT *val, FILE *data, FILE *idx, u_int64_t *dataoff, BRTLOADER bl) {
int klen = key->size;
int vlen = val->size;
int r;
// we have a chance to handle the errors because when we close we can delete all the files.
if ((r=bl_fwrite(&klen, sizeof(klen), 1, data, bl))) return r;
if ((r=bl_fwrite(key->data, 1, klen, data, bl))) return r;
if ((r=bl_fwrite(&vlen, sizeof(vlen), 1, data, bl))) return r;
if ((r=bl_fwrite(val->data, 1, vlen, data, bl))) return r;
if ((r=bl_fwrite(dataoff, sizeof(*dataoff), 1, idx, bl))) return r;
int sum = klen+vlen+sizeof(klen)+sizeof(vlen);
if ((r=bl_fwrite(&sum, sizeof(sum), 1, idx, bl))) return r;
(*dataoff) += sum;
return 0;
}
int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val) {
if (bl->panic) return EINVAL; // previous panic
return loader_write_row(key, val, bl->fprimary_rows, bl->fprimary_idx, &bl->fprimary_offset, bl);
}
static int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl) {
int r;
int klen,vlen;
if ((r = bl_fread(&klen, sizeof(klen), 1, f, bl))) return r;
assert(klen>=0);
if ((int)key->ulen<klen) { key->ulen=klen; key->data=toku_xrealloc(key->data, klen); }
if ((r = bl_fread(key->data, 1, klen, f, bl))) return r;
if ((r = bl_fread(&vlen, sizeof(vlen), 1, f, bl))) return r;
assert(vlen>=0);
if ((int)val->ulen<vlen) { val->ulen=vlen; val->data=toku_xrealloc(val->data, vlen); }
if ((r = bl_fread(val->data, 1, vlen, f, bl))) return r;
return 0;
}
static int loader_do_i (BRTLOADER bl, DB *dest_db) {
int r = fseek(bl->fprimary_rows, 0, SEEK_SET);
assert(r==0);
DBT pkey={.data=0, .flags=DB_DBT_REALLOC, .size=0, .ulen=0};
DBT pval=pkey;
DBT skey=pkey;
DBT sval=pkey;
FILE *sfile, *sidx;
char *sfilename, *sidxname;
u_int64_t soffset=0;
r = open_temp_file(bl, &sfile, &sfilename); if (r!=0) return r;
r = open_temp_file(bl, &sidx, &sidxname); if (r!=0) return r;
while (0==(r=loader_read_row(bl->fprimary_rows, &pkey, &pval, bl))) {
r = bl->generate_row_for_put(bl->src_db, dest_db, &skey, &sval, &pkey, &pval, NULL);
assert(r==0);
r = loader_write_row(&skey, &sval, sfile, sidx, &soffset, bl);
if (r!=0) return r; // TODO: erase the files
}
return 0;
}
int toku_brt_loader_close (BRTLOADER bl) {
for (int i=0; i<bl->N; i++) {
int r = loader_do_i(bl, bl->dbs[i]);
if (r!=0) return r;
}
return 0;
}
......@@ -570,6 +570,22 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
return r;
}
static int cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf);
int toku_cachetable_redirect (CACHEFILE cf, int fd, const char *fname_in_env) {
CACHETABLE ct = cf->cachetable;
if (cf->fname_relative_to_env) toku_free(cf->fname_relative_to_env);
cf->fname_relative_to_env = toku_strdup(fname_in_env);
int r = cachetable_flush_cachefile(ct, cf);
assert(r==0);
assert(cf->closefd_waiting == 0); // cannot handle it if something else is waiting
r = close(cf->fd);
assert(r==0);
cf->fd = fd;
return 0;
}
//TEST_ONLY_FUNCTION
int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname, const char *fname_relative_to_env, int flags, mode_t mode) {
int fd = open(fname, flags+O_BINARY, mode);
......@@ -657,8 +673,6 @@ static void remove_cf_from_cachefiles_list (CACHEFILE cf) {
cachefiles_unlock(ct);
}
static int cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf);
int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid, LSN oplsn) {
CACHEFILE cf = *cfp;
......@@ -1865,8 +1879,8 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
{
LSN begin_lsn={.lsn=-1}; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed.
int r = toku_log_begin_checkpoint(logger, &begin_lsn, 0, 0);
ct->lsn_of_checkpoint_in_progress = begin_lsn;
assert(r==0);
ct->lsn_of_checkpoint_in_progress = begin_lsn;
}
// Log all the open transactions
{
......
......@@ -75,10 +75,14 @@ int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, const
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char *fname_relative_to_env /*(used for logging)*/);
int toku_cachetable_openfd_with_filenum (CACHEFILE *,CACHETABLE, int /*fd*/, const char *fname_relative_to_env, BOOL with_filenum, FILENUM filenum, BOOL reserved);
// Change the binding of which file is attached to a cachefile. Close the old fd. Use the new fd.
int toku_cachetable_redirect (CACHEFILE cf, int fd, const char *fname_in_env);
int toku_cachetable_reserve_filenum (CACHETABLE ct, FILENUM *reserved_filenum, BOOL with_filenum, FILENUM filenum);
void toku_cachetable_unreserve_filenum (CACHETABLE ct, FILENUM reserved_filenum);
// Get access to the asynchronous work queue
// Returns: a pointer to the work queue
WORKQUEUE toku_cachetable_get_workqueue (CACHETABLE);
......
......@@ -145,15 +145,16 @@ static int lc_create(TOKULOGCURSOR *lc, const char *log_dir) {
}
int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
int r = lc_create(lc, log_dir);
TOKULOGCURSOR cursor;
int r = lc_create(&cursor, log_dir);
if ( r!=0 )
return r;
TOKULOGCURSOR cursor = *lc;
r = toku_logger_find_logfiles(cursor->logdir, &(cursor->logfiles), &(cursor->n_logfiles));
if (r!=0) {
toku_logcursor_destroy(&cursor);
*lc = NULL;
} else {
*lc = cursor;
}
return r;
}
......
......@@ -76,6 +76,9 @@ const struct logtype rollbacks[] = {
NULLFIELD}},
{"tablelock_on_empty_table", 'L', FA{{"FILENUM", "filenum", 0},
NULLFIELD}},
{"load", 'l', FA{{"BYTESTRING", "old_iname", 0},
{"BYTESTRING", "new_iname", 0},
NULLFIELD}},
// {"fclose", 'c', FA{{"FILENUM", "filenum", 0},
// {"BYTESTRING", "fname", 0},
// NULLFIELD}},
......@@ -163,8 +166,12 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "src_val", 0},
NULLFIELD}},
{"comment", 'T', FA{{"u_int64_t", "timestamp", 0},
{"BYTESTRING", "comment", 0},
NULLFIELD}},
{"BYTESTRING", "comment", 0},
NULLFIELD}},
{"load", 'l', FA{{"u_int64_t", "timestamp", 0},
{"BYTESTRING", "old_iname", 0},
{"BYTESTRING", "new_iname", 0},
NULLFIELD}},
{0,0,FA{NULLFIELD}}
};
......
/* merger test.
* Test 0: A super-simple merge of one file with a few records.
* Test 1: A fairly simple merge of a few hundred records in a few files, the data in the files is interleaved randomly (that is the "pop" operation will tend to get data from a randomly chosen file.
* Test 2: A merge of a thousand files, (interleaved randomly), each file perhaps a megabyte. This test is intended to demonstrate performance improvements when we have a proper merge heap.
* Test 3: A merge of 10 files, (interleaved randomly) each file perhaps a gigabyte. This test is intended to demonstrate performance improvements when we pipeline the reads properly.
* Test 4: A merge of 100 files, presorted, each perhaps a 10MB. All the records in the first file are less than all the records in the second. This test is intended to show performance improvements when we prefer to refill the buffer that is the most empty.
*/
#include "toku_assert.h"
#include "merger.h"
#include <stdio.h>
#include <sys/types.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <arpa/inet.h>
static int default_compare (DB *db __attribute__((__unused__)), const DBT *keya, const DBT *keyb) {
if (keya->size < keyb->size) {
int c = memcmp(keya->data, keyb->data, keya->size);
if (c==0) return -1; // tie breaker is the length, a is shorter so it's first.
else return c;
} else if (keya->size > keyb->size) {
int c = memcmp(keya->data, keyb->data, keyb->size);
if (c==0) return +1; // tie breaker is the length, a is longer so it's second
else return c;
} else {
return memcmp(keya->data, keyb->data, keya->size);
}
}
static void write_ij (FILE *f, u_int32_t i, u_int32_t j) {
u_int32_t len=sizeof(i);
{ int r = fwrite(&len, sizeof(len), 1, f); assert(r==1); }
i = htonl(i);
{ int r = fwrite(&i, sizeof(i), 1, f); assert(r==1); }
{ int r = fwrite(&len, sizeof(len), 1, f); assert(r==1); }
j = htonl(j);
{ int r = fwrite(&j, sizeof(j), 1, f); assert(r==1); }
}
static void check_ij (MERGER m, u_int32_t i, u_int32_t j) {
DBT key,val;
memset(&key, 0, sizeof(key));
memset(&val, 0, sizeof(val));
int r = merger_pop(m, &key, &val);
assert(r==0);
assert(key.size = 4);
assert(val.size = 4);
u_int32_t goti = ntohl(*(u_int32_t*)key.data);
u_int32_t gotj = ntohl(*(u_int32_t*)val.data);
//printf("Got %d,%d (expect %d,%d)\n", goti, gotj, i, j);
assert(goti == i);
assert(gotj == j);
}
static void check_nothing_is_left (MERGER m) {
DBT key,val;
memset(&key, 0, sizeof(key));
memset(&val, 0, sizeof(val));
int r = merger_pop(m, &key, &val);
assert(r!=0); // should be nothing left.
}
static void test0 (void) {
char *fname = "merger-test0.data";
char *fnames[] = {fname};
FILE *f = fopen(fname, "w");
assert(f);
const u_int32_t N = 100;
for (u_int32_t i=0; i<N; i++) {
write_ij(f, i, N-i);
}
{ int r = fclose(f); assert(r==0); }
MERGER m = create_merger (1, fnames, NULL, default_compare, NULL);
for (u_int32_t i=0; i<N; i++) {
check_ij(m, i, N-i);
}
check_nothing_is_left(m);
merger_close(m);
{ int r = unlink(fname); assert(r==0); }
}
static void test1 (void) {
const int NFILES = 10;
const u_int32_t NRECORDS = NFILES*100;
char *fnames[NFILES];
{
FILE *files[NFILES];
for (int i=0; i<NFILES; i++) {
char fname[] = "merger-test-XXX.data";
snprintf(fname, sizeof(fname), "merger-test-%3x.data", i);
fnames[i] = strdup(fname);
files [i] = fopen(fname, "w");
}
for (u_int32_t i=0; i<NRECORDS; i++) {
int fnum = random()%NFILES;
write_ij(files[fnum], i, 2*i);
}
for (int i=0; i<NFILES; i++) {
int r = fclose(files[i]); assert(r==0);
}
}
MERGER m = create_merger(NFILES, fnames, NULL, default_compare, NULL);
for (u_int32_t i=0; i<NRECORDS; i++) {
check_ij(m, i, 2*i);
}
check_nothing_is_left(m);
merger_close(m);
for (int i=0; i<NFILES; i++) {
free(fnames[i]);
}
}
int main (int argc, char *argv[] __attribute__((__unused__))) {
assert(argc==1);
test0();
test1();
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: brt.c 16740 2009-12-29 23:01:29Z bkuszmaul $"
#ident "Copyright (c) 2009 Tokutek Inc. All rights reserved."
/* See merger.h for a description of this module. */
#include "merger.h"
#include <memory.h>
#include <toku_assert.h>
#include <string.h>
typedef unsigned char BOOL;
#define TRUE 1
#define FALSE 0
struct merger {
int n_files;
FILE **files;
BOOL *present;// one for each file: present[i] is TRUE if and only if keys[i] and vals[i] are valid. (If the file gets empty then files[i] will be NULL)
DBT *keys; // one for each file. ulen keeps track of how much memory is actually allocated.
DBT *vals; // one for each file.
DB *db;
COMPARISON_FUNCTION cf;
MEMORY_ALLOCATION_UPDATER mup;
};
MERGER create_merger (int n_files, char *file_names[n_files], DB *db, COMPARISON_FUNCTION cf, MEMORY_ALLOCATION_UPDATER mup) {
MERGER MALLOC(result);
result->n_files = n_files;
MALLOC_N(n_files, result->files);
MALLOC_N(n_files, result->present);
MALLOC_N(n_files, result->keys);
MALLOC_N(n_files, result->vals);
for (int i=0; i<n_files; i++) {
result->files[i] = fopen(file_names[i], "r");
assert(result->files[i]);
result->present[i] = FALSE;
memset(&result->keys[i], 0, sizeof(result->keys[0]));
memset(&result->vals[i], 0, sizeof(result->vals[0]));
}
result->db = db;
result->cf = cf;
result->mup = mup;
return result;
}
void merger_close (MERGER m) {
for (int i=0; i<m->n_files; i++) {
if (m->files[i]) {
int r = fclose(m->files[i]);
assert(r==0);
}
if (m->keys[i].data) {
toku_free(m->keys[i].data);
}
if (m->vals[i].data) {
toku_free(m->vals[i].data);
}
}
toku_free(m->files);
toku_free(m->present);
toku_free(m->keys);
toku_free(m->vals);
toku_free(m);
}
static int merge_fill_dbt (MERGER m, int i)
// Effect: Make sure that keys[i] has data in it and return 0.
// If we cannot, then return nonzero.
{
if (m->present[i]) return 0; // it's there, so we are OK.
if (m->files[i]==NULL) return -1; // the file was previously empty, so no more.
u_int32_t keylen;
{
int n = fread(&keylen, sizeof(keylen), 1, m->files[i]);
if (n!=1) {
// must have hit EOF, so close the file and set return -1.
int r = fclose(m->files[i]);
assert(r==0);
m->files[i] = NULL;
return -1;
}
}
// Got something, so we should be able to get the rest.
if (m->keys[i].ulen < keylen) {
REALLOC_N(keylen, m->keys[i].data);
m->keys[i].ulen = keylen;
}
{
size_t n = fread(m->keys[i].data, 1, keylen, m->files[i]);
assert(n==keylen);
}
u_int32_t vallen;
{
int n = fread(&vallen, sizeof(vallen), 1, m->files[i]);
assert(n==1);
}
if (m->vals[i].ulen < vallen) {
REALLOC_N(vallen, m->vals[i].data);
m->vals[i].ulen = vallen;
}
{
size_t n = fread(m->vals[i].data, 1, vallen, m->files[i]);
assert(n==vallen);
}
m->keys[i].size = keylen;
m->vals[i].size = vallen;
m->present[i] = TRUE;
return 0;
}
static int find_first_nonempty (MERGER m, int *besti) {
for (int i=0; i<m->n_files; i++) {
if (merge_fill_dbt(m, i)==0) {
*besti = i;
return 0;
}
}
return -1;
}
int merger_pop (MERGER m,
/*out*/ DBT *key,
/*out*/ DBT *val)
// This version is as simple as I can make it.
{
int firsti = -1;
if (find_first_nonempty(m, &firsti)) {
// there are no more nonempty rows.
return -1;
}
int besti = firsti;
// besti is the first nonempty item.
for (int i=firsti+1; i<m->n_files; i++) {
if (merge_fill_dbt(m, i)==0) {
// there is something there, so we continue
if (m->cf(m->db, &m->keys[besti], &m->keys[i])>0) {
// then i is the new besti.
besti = i;
}
}
}
// Now besti is the one to return.
*key = m->keys[besti];
*val = m->vals[besti];
m->present[besti] = FALSE;
return 0;
}
/* This is a C header (no Cilk or C++ inside here) */
/* The merger abstraction:
*
* This module implements a multithreaded file merger, specialized for the temporary file format used by the loader.
* The input files have rows stored as follows
* <keylen (4 byte integer in native order)>
* <key (char[keylen])>
* <vallen (4 byte integer in native order)>
* <val (char[vallen])>
* The input files are sorted according to the comparison function.
*
* Given a bunch of input files each containing rows, the merger can produce the minimal row from all those files.
*
* The merger periodically asks for memory, and the allocated memory may go up or down. If the memory allocation increases, the merger may malloc() more memory.
* If the memory allocation decreases, the merger should free some memory.
*
* Implementation hints: The merger should double buffer its input.
* That is, for each file, the merger should use two buffers. It should fill the first buffer, and then in the background fill the other buffer.
* Whenever a buffer empties, we hope that the other buffer is full (if not we wait) and we swap buffers, and then have the background thread fill the other buffer.
* This strategy implies that there is a background thread filling those other buffers.
* The background thread may have several refillable buffers to choose from at any moment.
* There are two obvious approaches for choosing which buffer to refill next:
* 1) Refill the one that's been empty the longest.
* 2) Refill the one for which the "front" of the buffer is the most empty.
* The advantage of approach (1) is that it's simple, and less likely to have race conditions.
* The advantage of approach (2) is that if some buffer gets emptied quickly we start refilling it earlier, possibly avoiding a pipeline stall.
* This could be an issue if the data was already sorted, so that file[0] is always emptying first, then file[1], and so forth.
*/
#include "db.h"
typedef struct merger *MERGER;
typedef void (*MEMORY_ALLOCATION_UPDATER) (/*in */ size_t currently_using,
/*in */ size_t currently_requested,
/*out*/size_t *new_allocation);
typedef int (*COMPARISON_FUNCTION) (DB *db, const DBT *keya, const DBT *keyb);
MERGER create_merger (int n_files, char *file_names[n_files], DB *db, COMPARISON_FUNCTION f, MEMORY_ALLOCATION_UPDATER mup);
// Effect: Create a new merger, which will merge the files named by file_names.
// The comparison function, f, decides which rows are smaller when they come from different files.
// The merger calls mup, a memory allocation updater, periodically, with three arguments:
// currently_using how much memory is the merger currently using.
// currently_requested how much total memory would the merger like.
// new_allocation (out) how much memory the system says the merger may have. If new_allocation is more than currently_using, then the merger
// may allocate more memory (up to the new allocation). If new_allocation is less, then the merger must free some memory,
// (and it should call the mup function again to indicate that the memory has been reduced).
void merger_close (MERGER);
// Effect: Close the files and free the memory used by the merger.
int merger_pop (MERGER m,
/*out*/ DBT *key,
/*out*/ DBT *val);
// Effect: If there are any rows left then return the minimal row in *key and *val.
// The pointers to key and val remain valid until the next call to merger_pop or merger_close. That is, we force the flags to be 0 in the DBT.
// Requires: The flags in the dbts must be zero.
// Rationale: We are trying to make this path as fast as possible, so we don't want to copy the data unnecessarily, and we don't want to mess around with DB_DBT_MALLOC and so forth.
// It is fairly straightforward to keep the key and val "live": In most cases, the buffer is still valid. In the case where the key and val are the last
// item, then we must take care not to reuse the buffer until the next merger_pop.
......@@ -602,7 +602,6 @@ static int toku_recover_backward_enq_delete_multiple (struct logtype_enq_delete_
return 0;
}
static int toku_recover_enq_delete_both (struct logtype_enq_delete_both *l, RECOVER_ENV renv) {
int r;
TOKUTXN txn = NULL;
......@@ -948,6 +947,17 @@ static int toku_recover_backward_comment (struct logtype_comment *UU(l), RECOVER
return 0;
}
static int toku_recover_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) {
// need to implement
assert(1);
return 0;
}
static int toku_recover_backward_load(struct logtype_load *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_delete_rolltmp_files (const char *log_dir) {
struct dirent *de;
DIR *d = opendir(log_dir);
......
......@@ -372,3 +372,29 @@ toku_commit_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU(yi
{
return do_nothing_with_filenum(txn, filenum);
}
int
toku_commit_load (BYTESTRING UU(old_iname),
BYTESTRING UU(new_iname),
TOKUTXN UU(txn),
YIELDF UU(yield),
void *UU(yield_v),
LSN UU(oplsn))
{
// need to implement
assert(1);
return 0;
}
int
toku_rollback_load (BYTESTRING UU(old_iname),
BYTESTRING UU(new_iname),
TOKUTXN UU(txn),
YIELDF UU(yield),
void *UU(yield_v),
LSN UU(oplsn))
{
// need to implement
assert(1);
return 0;
}
......@@ -33,6 +33,7 @@ OBJS_RAW = \
ydb \
errors \
dlmalloc \
loader \
elocks \
#\end
#OBJS automatically defined.
......
/* -*- mode: C; c-basic-offset: 4 -*-
*
* Copyright (c) 2007, 2008, 2009, 2010 Tokutek Inc. All rights reserved."
* The technology is licensed by the Massachusetts Institute of Technology,
* Rutgers State University of New Jersey, and the Research Foundation of
* State University of New York at Stony Brook under United States of America
* Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
*/
/*
* a REFERENCE IMPLEMENTATION of the loader
*/
#include "includes.h"
#include "ydb-internal.h"
#include "loader.h"
enum {VERBOSE=0};
struct __toku_loader_internal {
DB_ENV *env;
DB_TXN *txn;
int N;
DB **dbs; /* [N] */
DB *src_db;
uint32_t *flags;
uint32_t *dbt_flags;
void *extra;
void (*duplicate_callback)(DB *db, int i, DBT *key, DBT *val);
int (*poll_func)(void *extra, float progress);
DBT dkey; /* duplicate key */
DBT dval; /* duplicate val */
int di; /* duplicate i */
};
int toku_loader_create_loader(DB_ENV *env,
DB_TXN *txn,
DB_LOADER **blp,
DB *src_db,
int N,
DB *dbs[],
uint32_t flags[N],
uint32_t dbt_flags[N],
void *extra)
{
if (VERBOSE) printf("toku_loader_create_loader\n");
DB_LOADER *loader;
loader = toku_malloc(sizeof(DB_LOADER));
assert(loader != NULL);
loader->i = toku_malloc(sizeof(*loader->i));
assert(loader->i != NULL);
loader->i->env = env;
loader->i->txn = txn;
loader->i->N = N;
loader->i->src_db = src_db;
loader->i->dbs = dbs;
loader->i->flags = flags;
loader->i->dbt_flags = dbt_flags;
loader->i->extra = extra;
memset(&loader->i->dkey, 0, sizeof(loader->i->dkey));
memset(&loader->i->dval, 0, sizeof(loader->i->dval));
loader->i->di = 0;
loader->set_poll_function = toku_loader_set_poll_function;
loader->set_duplicate_callback = toku_loader_set_duplicate_callback;
loader->put = toku_loader_put;
loader->close = toku_loader_close;
*blp = loader;
return 0;
}
int toku_loader_set_poll_function(DB_LOADER *loader,
int (*poll_func)(void *extra, float progress))
{
if (VERBOSE) printf("toku_loader_set_poll_function\n");
loader->i->poll_func = poll_func;
return 0;
}
int toku_loader_set_duplicate_callback(DB_LOADER *loader,
void (*duplicate)(DB *db, int i, DBT *key, DBT *val))
{
if (VERBOSE) printf("toku_loader_set_duplicate_callback\n");
loader->i->duplicate_callback = duplicate;
return 0;
}
int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
{
if (VERBOSE) printf("toku_loader_put\n");
int i;
int r;
DBT ekey, eval;
if ( loader->i->dkey.data != NULL ) {
if (VERBOSE) printf("skipping put\n");
return 0;
}
for (i=0; i<loader->i->N; i++) {
r = loader->i->env->i->generate_row_for_put(loader->i->dbs[i], // dest_db
loader->i->dbs[0], // src_db,
&ekey, &eval, // dest_key, dest_val
key, val, // src_key, src_val
NULL); // extra
assert(r == 0);
r = loader->i->dbs[i]->put(loader->i->dbs[i], loader->i->txn, &ekey, &eval, DB_NOOVERWRITE);
if ( r != 0 ) {
if (VERBOSE) printf("put returns %d\n", r);
// spec says errors all happen on close
// - have to save key, val, and i for duplicate callback
loader->i->dkey.size = key->size;
loader->i->dkey.data = toku_malloc(key->size);
memcpy(loader->i->dkey.data, key->data, key->size);
loader->i->dval.size = val->size;
loader->i->dval.data = toku_malloc(val->size);
memcpy(loader->i->dval.data, val->data, val->size);
loader->i->di = i;
}
}
return 0;
}
int toku_loader_close(DB_LOADER *loader)
{
if (VERBOSE) printf("toku_loader_close\n");
// as per spec, if you found a duplicate during load, call duplicate_callback
if ( loader->i->dkey.data != NULL ) {
if ( loader->i->duplicate_callback != NULL ) {
loader->i->duplicate_callback(loader->i->dbs[loader->i->di], loader->i->di, &loader->i->dkey, &loader->i->dval);
}
toku_free(loader->i->dkey.data);
toku_free(loader->i->dval.data);
}
toku_free(loader->i);
toku_free(loader);
return 0;
}
int toku_loader_abort(DB_LOADER *loader) {
return toku_loader_close(loader);
}
#ifndef TOKULOADER_H
#define TOKULOADER_H
/* Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved.
*
* The technology is licensed by the Massachusetts Institute of Technology,
* Rutgers State University of New Jersey, and the Research Foundation of
* State University of New York at Stony Brook under United States of America
* Serial No. 11/760379 and to the patents and/or patent applications resulting from it.
*/
int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[N], uint32_t flags[N], uint32_t dbt_flags[N], void *extra);
int toku_loader_set_duplicate_callback(DB_LOADER *loader, void (*duplicate)(DB *db, int i, DBT *key, DBT *val));
int toku_loader_set_poll_function(DB_LOADER *loader, int (*poll_func)(void *extra, float progress));
int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val);
int toku_loader_close(DB_LOADER *loader);
int toku_loader_abort(DB_LOADER *loader);
#endif
......@@ -126,6 +126,8 @@ BDB_DONTRUN_TESTS = \
recover-fclose-in-checkpoint \
recover-delboth-checkpoint \
recover-delboth-after-checkpoint \
loader-reference-test \
loader-stress-test \
recover-lsn-filter-multiple \
recover-put-multiple-fdelete-all \
recover-put-multiple-fdelete-some \
......@@ -391,6 +393,8 @@ endif
%.tdbt$(BINSUF): RPATH_DIRS+=$(dir $(TLIBTDB))
%.tdb$(BINSUF) %.tdbt$(BINSUF): CFLAGS+= -DENVDIR=\"dir.$<.tdb\" -DUSE_TDB -DIS_TDB=1
%.tdb$(BINSUF) %.tdbt$(BINSUF): CPPFLAGS+=-I$(TOKUROOT)include
loader-stress-test: CPPFLAGS+=-I$(TOKUROOT)include -DENVDIR=\"dir.$<\"
loader-stress-test: LOADLIBES+=-L.. -ltokudb -Wl,-rpath,..
%.tdb$(BINSUF) %.tdbt$(BINSUF): %.c $(DEPEND_COMPILE) $(DEPEND_LINK) $(TDB_EXTRA_NEEDED)
$(CC) $< $(filter-out ../../lib/libtokuportability.a,$(BIN_FROM_C_FLAGS)) $(LINK_MUST_BE_LAST)
......
default: build
TARGETS = get.parallel get.serial
CPPFLAGS = -I../../../include -I../../../toku_include -I../../../linux
CFLAGS = -Wall -W -Werror -g $(OPTFLAGS)
LOADLIBES = -L../../../lib -ltokuportability -ltokudb
LDFLAGS = -Wl,-rpath,../../../lib
build: $(TARGETS)
CILKPP=/home/bradley/cilkarts/8503/cilk/bin/cilk++
CXX=/home/bradley/cilkarts/8503/cilk/bin/c++
CILKSCREEN=/home/bradley/cilkarts/8503/cilk/bin/cilkscreen
%.parallel: %.cilk
$(CILKPP) $(CPPFLAGS) $(CFLAGS) $(LDFLAGS) $< -o $@ $(LOADLIBES)
%.serial: %.cilk
$(CILKPP) -fcilk-stub $(CPPFLAGS) $(CFLAGS) $(LDFLAGS) $< -o $@ $(LOADLIBES)
check: check_get.parallel check_get.serial
check_%.parallel: %.parallel
$(CILKSCREEN) ./$<
check_%.serial: %.serial
valgrind ./$<
foo.serial foo.parallel: CPPFLAGS=
foo.serial foo.parallel: LOADLIBES=
foo.serial foo.parallel: LDFLAGS=
foo.serial: CFLAGS=-lpthread
%.o: %.cilk
$(CILKPP) $(CPPFLAGS) $(CFLAGS) $< -c -o $@
foo2: foo2.o bar2.o
$(CILKPP) foo2.o bar2.o -o $@ -lpthread -L/home/bradley/cilkarts/8503/cilk/lib64 -lcilkrts -Wl,-rpath=/home/bradley/cilkarts/8503/cilk/lib64
broken:
$(CXX) foo2.o bar2.o -o $@ -lpthread -L/home/bradley/cilkarts/8503/cilk/lib64 -lcilkrts -Wl,-rpath=/home/bradley/cilkarts/8503/cilk/lib64
measurecilkrun.parallel: OPTFLAGS=-O2
measurecilkrun.parallel: LDFLAGS=
measurecilkrun.parallel: LOADLIBES=
measurecilkrun.parallel: CPPFLAGS=
check_measurecilkrun.parallel: CILKSCREEN=
#include <cilk.h>
#include <stdio.h>
#include "foo2.h"
extern "Cilk++"
int foo2 (int i) {
return i+1;
}
int foo (int i) {
int r;
r = cilk_spawn foo2 (i+1);
cilk_sync;
return r+1;
}
extern "C++"
void do_foo (void) {
printf("Running cilk\n");
int r = cilk::run(&foo, 3);
printf("Done r =%d\n", r);
}
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
pthread_t pt[2];
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
extern "C" void* start (void *extra __attribute__((__unused__))) {
{ int r = pthread_mutex_lock(&mutex); assert(r==0); }
printf("T%lx got lock\n", pthread_self());
sleep(1);
printf("T%lx releasing lock\n", pthread_self());
{ int r = pthread_mutex_unlock(&mutex); assert(r==0); }
return 0;
}
void create_pthread(void) {
for (int i=0; i<2; i++) {
int r = pthread_create(&pt[i], 0, start, NULL);
assert(r==0);
}
}
void join_pthread (void) {
for (int i=0; i<2; i++) {
int r = pthread_join(pt[i], NULL);
assert(r==0);
}
}
void foo (void) {
}
int cilk_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
create_pthread();
cilk_spawn foo();
cilk_sync;
join_pthread();
return 0;
}
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include "foo2.h"
#include "cilk.h"
pthread_t pt[2];
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
extern "C" void* start (void *extra __attribute__((__unused__))) {
{ int r = pthread_mutex_lock(&mutex); assert(r==0); }
printf("T%lx got lock\n", pthread_self());
sleep(1);
printf("T%lx releasing lock\n", pthread_self());
{ int r = pthread_mutex_unlock(&mutex); assert(r==0); }
return 0;
}
void create_pthread(void) {
for (int i=0; i<2; i++) {
int r = pthread_create(&pt[i], 0, start, NULL);
assert(r==0);
}
}
void join_pthread (void) {
for (int i=0; i<2; i++) {
int r = pthread_join(pt[i], NULL);
assert(r==0);
}
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
__cilkscreen_disable_instrumentation();
create_pthread();
__cilkscreen_enable_instrumentation();
do_foo();
join_pthread();
return 0;
}
/* Perform a DB->get in a cilk thread */
#include <assert.h>
#include <db.h>
#include <stdlib.h>
#include <toku_os.h>
#define DIR __FILE__ ".dir"
DB_ENV *env;
void foo (void) {
printf("foo\n");
}
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
int cilk_main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
system("rm -rf " DIR);
toku_os_mkdir(DIR, 0777);
{ int r = db_env_create(&env, 0); assert(r==0); }
{ int r = env->open(env, DIR, envflags,S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); }
cilk_spawn foo();
cilk_sync;
{ int r = env->close(env, 0); assert(r==0); }
return 0;
}
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include "cilk.h"
#include <sys/time.h>
int foo (int i) {
return i+1;
}
int fooi (void* iv) {
int *ip=(int*)iv;
return foo(*ip);
}
extern "C++" {
int N = 1000;
double tdiff (struct timeval *after, struct timeval *before)
{
return after->tv_sec - before->tv_sec + (1e-6)*(after->tv_usec - before->tv_usec);
}
void do_cilkrun (void) {
cilk::run(&foo, 0);
}
static cilk::context *ctx;
void do_cilkcxt (void) {
int i = 0;
int j __attribute__((__unused__)) = ctx->run(&fooi, (void*)&i);
}
void do_N_cilkcxt (void) {
struct timeval start,end;
gettimeofday(&start, 0);
cilk::context ctx;
int r=0;
for (int i=0; i<N; i++) {
int j=ctx.run(&fooi, (void*)&i);
r+=j;
}
gettimeofday(&end, 0);
printf("%.3fus per ctx.run (tot=%d)\n", 1e6*tdiff(&end, &start)/N, 0);
}
extern "Cilk++" int cilk_in_pt (int i) {
assert(i==0);
return 0;
}
pthread_t pt_for_cilk;
extern void* do_pt_for_cilk(void *extra __attribute__((__unused__))) {
cilk::run(&cilk_in_pt, 0);
return 0;
}
void setup_cilk_in_pthread (void) {
int r = pthread_create(&pt_for_cilk, 0, do_pt_for_cilk, NULL);
assert(r==0);
}
void do_cilk_in_pthread (void) {
}
void* do_something (void *extra __attribute__((__unused__))) {
return 0;
}
void do_pthread(void) {
pthread_t pt;
{int r = pthread_create(&pt, 0, do_something, NULL); assert(r==0);}
{int r = pthread_join(pt, NULL); assert(r==0);}
}
void measure(void (*f)(void), const char *string) {
const int maxiter=4;
for (int j=0; j<maxiter; j++) {
int M = (j+1==maxiter) ? N : 1<<j;
struct timeval start,end;
gettimeofday(&start, 0);
for (int i=0; i<M; i++) {
f();
}
gettimeofday(&end, 0);
printf("%9.3fus per call (%d calls) for %s\n", 1e6*tdiff(&end, &start)/M, M, string);
}
printf("\n");
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
measure(do_pthread, "pthread");
measure(do_cilkrun, "cilkrun");
ctx = new cilk::context;
measure(do_cilkcxt, "cilkcxt");
delete ctx;
return 0;
}
} /* end extern "C" */
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2009 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include "test.h"
#include "toku_pthread.h"
#include <db.h>
#include <sys/stat.h>
DB_ENV *env;
enum {MAX_NAME=128};
enum {NUM_DBS=1};
enum {NUM_KV_PAIRS=3};
struct kv_pair {
int64_t key;
int64_t val;
};
struct kv_pair kv_pairs[NUM_KV_PAIRS] = {{1,4},
{2,5},
{3,6}};
static int put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
src_db = src_db;
extra = extra;
uint32_t which = *(uint32_t*)dest_db->app_private;
assert(which == 0);
dbt_init(dest_key, src_key->data, src_key->size);
dbt_init(dest_val, src_val->data, src_val->size);
// printf("dest_key.data = %d\n", *(int*)dest_key->data);
// printf("dest_val.data = %d\n", *(int*)dest_val->data);
return 0;
}
static void test_loader(DB **dbs)
{
int r;
DB_TXN *txn;
DB_LOADER *loader;
uint32_t flags[NUM_DBS];
uint32_t dbt_flags[NUM_DBS];
for(int i=0;i<NUM_DBS;i++) {
flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
}
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, flags, dbt_flags, NULL);
CKERR(r);
r = loader->set_duplicate_callback(loader, NULL);
CKERR(r);
r = loader->set_poll_function(loader, NULL);
CKERR(r);
// using loader->put, put values into DB
DBT key, val;
for(int i=0;i<NUM_KV_PAIRS;i++) {
dbt_init(&key, &kv_pairs[i].key, sizeof(kv_pairs[i].key));
dbt_init(&val, &kv_pairs[i].val, sizeof(kv_pairs[i].val));
r = loader->put(loader, &key, &val);
CKERR(r);
}
// close the loader
r = loader->close(loader);
CKERR(r);
r = txn->commit(txn, 0);
CKERR(r);
// verify the DBs
DBC *cursor;
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
for(int j=0;j<NUM_DBS;j++) {
r = dbs[j]->cursor(dbs[j], txn, &cursor, 0);
CKERR(r);
for(int i=0;i<NUM_KV_PAIRS;i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
CKERR(r);
assert(*(int64_t*)key.data == kv_pairs[i].key);
assert(*(int64_t*)val.data == kv_pairs[i].val);
}
cursor->c_close(cursor);
}
r = txn->commit(txn, 0);
CKERR(r);
printf("PASS\n");
}
static void run_test(void)
{
int r;
r = system("rm -rf " ENVDIR); CKERR(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_env_create(&env, 0); CKERR(r);
r = env->set_default_bt_compare(env, int64_dbt_cmp); CKERR(r);
r = env->set_default_dup_compare(env, int64_dbt_cmp); CKERR(r);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG;
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
env->set_errfile(env, stderr);
//Disable auto-checkpointing
r = env->checkpointing_set_period(env, 0); CKERR(r);
DBT desc;
dbt_init(&desc, "foo", sizeof("foo"));
char name[MAX_NAME*2];
DB *dbs[NUM_DBS];
int idx[NUM_DBS];
for(int i=0;i<NUM_DBS;i++) {
idx[i] = i;
r = db_create(&dbs[i], env, 0); CKERR(r);
r = dbs[i]->set_descriptor(dbs[i], 1, &desc, abort_on_upgrade); CKERR(r);
dbs[i]->app_private = &idx[i];
snprintf(name, sizeof(name), "db_%04x", i);
r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
}
// -------------------------- //
test_loader(dbs);
// -------------------------- //
for(int i=0;i<NUM_DBS;i++) {
dbs[i]->close(dbs[i], 0); CKERR(r);
dbs[i] = NULL;
}
r = env->close(env, 0); CKERR(r);
}
// ------------ infrastructure ----------
static void do_args(int argc, char *argv[]);
int test_main(int argc, char **argv) {
do_args(argc, argv);
run_test();
return 0;
}
static void do_args(int argc, char *argv[]) {
int resultcode;
char *cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage:\n%s\n", cmd);
exit(resultcode);
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include "test.h"
#include "toku_pthread.h"
#include <db.h>
#include <sys/stat.h>
DB_ENV *env;
enum {MAX_NAME=128};
enum {MAX_DBS=256};
int NUM_DBS=5;
int NUM_ROWS=100000;
int CHECK_RESULTS=0;
enum {MAGIC=311};
//
// Functions to create unique key/value pairs, row generators, checkers, ... for each of NUM_DBS
//
// a is the bit-wise permute table. For DB[i], permute bits as described in a[i] using 'twiddle32'
// inv is the inverse bit-wise permute of a[]. To get the original value from a twiddled value, twiddle32 (again) with inv[]
int a[MAX_DBS][32];
int inv[MAX_DBS][32];
#if defined(__cilkplusplus) || defined (__cplusplus)
extern "C" {
#endif
// rotate right and left functions
static inline unsigned int rotr32(const unsigned int x, const unsigned int num) {
const unsigned int n = num % 32;
return (x >> n) | ( x << (32 - n));
}
static inline unsigned int rotl32(const unsigned int x, const unsigned int num) {
const unsigned int n = num % 32;
return (x << n) | ( x >> (32 - n));
}
static void generate_permute_tables(void) {
int i, j, tmp;
for(int db=0;db<MAX_DBS;db++) {
for(i=0;i<32;i++) {
a[db][i] = i;
}
for(i=0;i<32;i++) {
j = random() % (i + 1);
tmp = a[db][j];
a[db][j] = a[db][i];
a[db][i] = tmp;
}
// if(db < NUM_DBS){ printf("a[%d] = ", db); for(i=0;i<32;i++) { printf("%2d ", a[db][i]); } printf("\n");}
for(i=0;i<32;i++) {
inv[db][a[db][i]] = i;
}
}
}
// permute bits of x based on permute table bitmap
static unsigned int twiddle32(unsigned int x, int db)
{
unsigned int b = 0;
for(int i=0;i<32;i++) {
b |= (( x >> i ) & 1) << a[db][i];
}
return b;
}
// permute bits of x based on inverse permute table bitmap
static unsigned int inv_twiddle32(unsigned int x, int db)
{
unsigned int b = 0;
for(int i=0;i<32;i++) {
b |= (( x >> i ) & 1) << inv[db][i];
}
return b;
}
// generate val from key, index
static unsigned int generate_val(int key, int i) {
return rotl32((key + MAGIC), i);
}
static unsigned int pkey_for_val(int key, int i) {
return rotr32(key, i) - MAGIC;
}
static int put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
src_db = src_db;
extra = extra;
uint32_t which = *(uint32_t*)dest_db->app_private;
if ( which == 0 ) {
dbt_init(dest_key, src_key->data, src_key->size);
dbt_init(dest_val, src_val->data, src_val->size);
}
else {
unsigned int *new_key = (unsigned int *)toku_malloc(sizeof(unsigned int));
unsigned int *new_val = (unsigned int *)toku_malloc(sizeof(unsigned int));
*new_key = twiddle32(*(unsigned int*)src_key->data, which);
*new_val = generate_val(*(unsigned int*)src_key->data, which);
dbt_init(dest_key, new_key, sizeof(unsigned int));
dbt_init(dest_val, new_val, sizeof(unsigned int));
}
// printf("dest_key.data = %d\n", *(int*)dest_key->data);
// printf("dest_val.data = %d\n", *(int*)dest_val->data);
return 0;
}
#if defined(__cilkplusplus) || defined(__cplusplus)
} // extern "C"
#endif
static void check_results(DB **dbs)
{
for(int j=0;j<NUM_DBS;j++){
DBT key, val;
unsigned int k=0, v=0;
dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int));
int r;
unsigned int pkey_for_db_key;
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
DBC *cursor;
r = dbs[j]->cursor(dbs[j], txn, &cursor, 0);
CKERR(r);
for(int i=0;i<NUM_ROWS;i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
CKERR(r);
k = *(unsigned int*)key.data;
pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j);
v = *(unsigned int*)val.data;
// test that we have the expected keys and values
assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j));
// printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j));
}
{printf("."); fflush(stdout);}
r = cursor->c_close(cursor);
CKERR(r);
r = txn->commit(txn, 0);
CKERR(r);
}
printf("\nCheck OK\n");
}
static void test_loader(DB **dbs)
{
int r;
DB_TXN *txn;
DB_LOADER *loader;
uint32_t flags[MAX_DBS];
uint32_t dbt_flags[MAX_DBS];
for(int i=0;i<MAX_DBS;i++) {
flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
}
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, flags, dbt_flags, NULL);
CKERR(r);
r = loader->set_duplicate_callback(loader, NULL);
CKERR(r);
r = loader->set_poll_function(loader, NULL);
CKERR(r);
// using loader->put, put values into DB
DBT key, val;
unsigned int k, v;
for(int i=1;i<=NUM_ROWS;i++) {
k = i;
v = generate_val(i, 0);
dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val);
CKERR(r);
if ( CHECK_RESULTS) { if((i%10000) == 0){printf("."); fflush(stdout);} }
}
if( CHECK_RESULTS ) {printf("\n"); fflush(stdout);}
// close the loader
r = loader->close(loader);
CKERR(r);
r = txn->commit(txn, 0);
CKERR(r);
// verify the DBs
if ( CHECK_RESULTS ) {
check_results(dbs);
}
}
static void run_test(void)
{
int r;
r = system("rm -rf " ENVDIR); CKERR(r);
r = toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = db_env_create(&env, 0); CKERR(r);
r = env->set_default_bt_compare(env, uint_dbt_cmp); CKERR(r);
r = env->set_default_dup_compare(env, uint_dbt_cmp); CKERR(r);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
env->set_errfile(env, stderr);
//Disable auto-checkpointing
r = env->checkpointing_set_period(env, 0); CKERR(r);
DBT desc;
dbt_init(&desc, "foo", sizeof("foo"));
char name[MAX_NAME*2];
DB **dbs = (DB**)toku_malloc(sizeof(DB*) * NUM_DBS);
assert(dbs != NULL);
int idx[MAX_DBS];
for(int i=0;i<NUM_DBS;i++) {
idx[i] = i;
r = db_create(&dbs[i], env, 0); CKERR(r);
r = dbs[i]->set_descriptor(dbs[i], 1, &desc, abort_on_upgrade); CKERR(r);
dbs[i]->app_private = &idx[i];
snprintf(name, sizeof(name), "db_%04x", i);
r = dbs[i]->open(dbs[i], NULL, name, NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
}
generate_permute_tables();
// printf("running test_loader()\n");
// -------------------------- //
test_loader(dbs);
// -------------------------- //
// printf("done test_loader()\n");
for(int i=0;i<NUM_DBS;i++) {
dbs[i]->close(dbs[i], 0); CKERR(r);
dbs[i] = NULL;
}
r = env->close(env, 0); CKERR(r);
toku_free(dbs);
}
// ------------ infrastructure ----------
static void do_args(int argc, char *argv[]);
int test_main(int argc, char **argv) {
do_args(argc, argv);
run_test();
return 0;
}
static void do_args(int argc, char *argv[]) {
int resultcode;
char *cmd = argv[0];
argc--; argv++;
while (argc>0) {
if (strcmp(argv[0], "-h")==0) {
resultcode=0;
do_usage:
fprintf(stderr, "Usage: -h -c -d <num_dbs> -r <num_rows>\n%s\n", cmd);
exit(resultcode);
} else if (strcmp(argv[0], "-d")==0) {
argc--; argv++;
NUM_DBS = atoi(argv[0]);
if ( NUM_DBS > MAX_DBS ) {
fprintf(stderr, "max value for -d field is %d\n", MAX_DBS);
resultcode=1;
goto do_usage;
}
} else if (strcmp(argv[0], "-r")==0) {
argc--; argv++;
NUM_ROWS = atoi(argv[0]);
} else if (strcmp(argv[0], "-c")==0) {
CHECK_RESULTS = 1;
} else {
fprintf(stderr, "Unknown arg: %s\n", argv[0]);
resultcode=1;
goto do_usage;
}
argc--;
argv++;
}
}
This diff is collapsed.
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef __TEST_H
#define __TEST_H
#if defined(__cilkplusplus) || defined(__cplusplus)
extern "C" {
#endif
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include <toku_portability.h>
......@@ -80,9 +88,9 @@ print_engine_status(DB_ENV * UU(env)) {
static __attribute__((__unused__)) DBT *
dbt_init(DBT *dbt, void *data, u_int32_t size) {
dbt_init(DBT *dbt, const void *data, u_int32_t size) {
memset(dbt, 0, sizeof *dbt);
dbt->data = data;
dbt->data = (void*)data;
dbt->size = size;
return dbt;
}
......@@ -143,6 +151,20 @@ int_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
return 0;
}
static __attribute__((__unused__)) int
uint_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
assert(db && a && b);
assert(a->size == sizeof(unsigned int));
assert(b->size == sizeof(unsigned int));
unsigned int x = *(unsigned int *) a->data;
unsigned int y = *(unsigned int *) b->data;
if (x<y) return -1;
if (x>y) return 1;
return 0;
}
#if !TOKU_WINDOWS && !defined(BOOL_DEFINED)
#define BOOL_DEFINED
typedef enum __toku_bool { FALSE=0, TRUE=1} BOOL;
......@@ -157,27 +179,6 @@ typedef enum __toku_bool { FALSE=0, TRUE=1} BOOL;
#endif
#include <memory.h>
int test_main (int argc, char *argv[]);
int
main(int argc, char *argv[]) {
int r;
#if IS_TDB && (defined(_WIN32) || defined(_WIN64))
int rinit = toku_ydb_init();
CKERR(rinit);
#endif
#if !IS_TDB && DB_VERSION_MINOR==4 && DB_VERSION_MINOR == 7
r = db_env_set_func_malloc(toku_malloc); assert(r==0);
r = db_env_set_func_free(toku_free); assert(r==0);
r = db_env_set_func_realloc(toku_realloc); assert(r==0);
#endif
toku_os_initialize_settings(1);
r = test_main(argc, argv);
#if IS_TDB && (defined(_WIN32) || defined(_WIN64))
int rdestroy = toku_ydb_destroy();
CKERR(rdestroy);
#endif
return r;
}
static int __attribute__((__unused__))
abort_on_upgrade(DB* UU(pdb),
......@@ -249,3 +250,35 @@ toku_hard_crash_on_purpose(void) {
fflush(stderr);
}
#if defined(__cilkplusplus) || defined(__cplusplus)
}
#endif
int test_main (int argc, char *argv[]);
int
#if defined(__cilkplusplus)
cilk_main(int argc, char *argv[])
#else
main(int argc, char *argv[])
#endif
{
int r;
#if IS_TDB && (defined(_WIN32) || defined(_WIN64))
int rinit = toku_ydb_init();
CKERR(rinit);
#endif
#if !IS_TDB && DB_VERSION_MINOR==4 && DB_VERSION_MINOR == 7
r = db_env_set_func_malloc(toku_malloc); assert(r==0);
r = db_env_set_func_free(toku_free); assert(r==0);
r = db_env_set_func_realloc(toku_realloc); assert(r==0);
#endif
toku_os_initialize_settings(1);
r = test_main(argc, argv);
#if IS_TDB && (defined(_WIN32) || defined(_WIN64))
int rdestroy = toku_ydb_destroy();
CKERR(rdestroy);
#endif
return r;
}
#endif // __TEST_H
......@@ -30,6 +30,8 @@ const char *toku_copyright_string = "Copyright (c) 2007-2009 Tokutek Inc. All r
#include "dlmalloc.h"
#include "checkpoint.h"
#include "key.h"
#include "loader.h"
#include "ydb_load.h"
#ifdef TOKUTRACE
......@@ -1518,6 +1520,7 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
SENV(txn_stat);
result->txn_begin = locked_txn_begin;
#undef SENV
result->create_loader = toku_loader_create_loader;
MALLOC(result->i);
if (result->i == 0) { r = ENOMEM; goto cleanup; }
......@@ -3956,13 +3959,21 @@ create_iname_hint(const char *dname, char *hint) {
*hint = '\0';
}
// n >= 0 means to include "_L_" with hex value of n in iname
// (intended for use by loader, which will create many inames using one txnid).
static char *
create_iname(DB_ENV *env, u_int64_t id, char *hint) {
create_iname(DB_ENV *env, u_int64_t id, char *hint, int n) {
int bytes;
char inamebase[strlen(hint) +
8 + // hex version
16 + // hex id
sizeof("__.tokudb")]; // extra pieces
int bytes = snprintf(inamebase, sizeof(inamebase), "%s_%"PRIx64"_%"PRIx32".tokudb", hint, id, BRT_LAYOUT_VERSION);
8 + // hex file format version
16 + // hex id (normally the txnid)
8 + // hex value of n if non-neg
sizeof("_L___.tokudb")]; // extra pieces
if (n < 0)
bytes = snprintf(inamebase, sizeof(inamebase), "%s_%"PRIx64"_%"PRIx32".tokudb", hint, id, BRT_LAYOUT_VERSION);
else
bytes = snprintf(inamebase, sizeof(inamebase), "%s_%"PRIx64"_%"PRIx32"_L_%"PRIx32".tokudb", hint, id, BRT_LAYOUT_VERSION, n);
assert(bytes>0);
assert(bytes<=(int)sizeof(inamebase)-1);
char *rval;
......@@ -4055,7 +4066,7 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP
if (using_txns)
id = toku_txn_get_txnid(db_txn_struct_i(child)->tokutxn);
create_iname_hint(dname, hint);
iname = create_iname(db->dbenv, id, hint); // allocated memory for iname
iname = create_iname(db->dbenv, id, hint, -1); // allocated memory for iname
toku_fill_dbt(&iname_dbt, iname, strlen(iname) + 1);
r = toku_db_put(db->dbenv->i->directory, child, &dname_dbt, &iname_dbt, DB_YESOVERWRITE); // DB_YESOVERWRITE for performance only, avoid unnecessary query
}
......@@ -5229,3 +5240,70 @@ env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) {
return r;
}
/* Following functions (ydb_load_xxx()) are used by loader:
*/
// When the loader is created, it makes this call.
// For each dictionary to be loaded, replace old iname in directory
// with a newly generated iname. This will also take a write lock
// on the directory entries. The write lock will be released when
// the transaction of the loader is completed.
// If the transaction commits, the new inames are in place.
// If the transaction aborts, the old inames will be restored.
// The new inames are returned to the caller.
// It is the caller's responsibility to free them.
// Return 0 on success (could fail if write lock not available).
int
ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames[N]) {
int rval;
int i;
int using_txns = env->i->open_flags & DB_INIT_TXN;
DB_TXN * child = NULL;
u_int64_t xid = 0;
DBT dname_dbt; // holds dname
DBT iname_dbt; // holds new iname
// begin child (unless transactionless)
if (using_txns) {
rval = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1);
assert(rval == 0);
xid = toku_txn_get_txnid(db_txn_struct_i(child)->tokutxn);
}
for (i = 0; i < N; i++) {
char * dname = dbs[i]->i->dname;
toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
// now create new iname
char hint[strlen(dname) + 1];
create_iname_hint(dname, hint);
char * new_iname = create_iname(env, xid, hint, i);
new_inames[i] = new_iname;
toku_fill_dbt(&iname_dbt, new_iname, strlen(new_iname) + 1);
rval = toku_db_put(env->i->directory, child, &dname_dbt, &iname_dbt, DB_YESOVERWRITE); // DB_YESOVERWRITE necessary
if (rval) break;
}
if (using_txns) {
// close txn
if (rval == 0) { // all well so far, commit child
rval = toku_txn_commit(child, DB_TXN_NOSYNC);
assert(rval==0);
}
else { // abort child
int r2 = toku_txn_abort(child);
assert(r2==0);
for (i=0; i<N; i++) {
if (new_inames[i]) {
toku_free(new_inames[i]);
new_inames[i] = NULL;
}
}
}
}
return rval;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef YDB_LOAD_H
#define YDB_LOAD_H
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
/* ydb functions used by loader
*/
// When the loader is created, it makes this call.
// For each dictionary to be loaded, replace old iname in directory
// with a newly generated iname. This will also take a write lock
// on the directory entries. The write lock will be released when
// the transaction of the loader is completed.
// If the transaction commits, the new inames are in place.
// If the transaction aborts, the old inames will be restored.
// The new inames are returned to the caller.
// It is the caller's responsibility to free them.
// Return 0 on success (could fail if write lock not available).
int ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames[N]);
#endif
......@@ -97,6 +97,7 @@ WERROR =
else
WERROR = -Werror
endif
# -Wno-deprecated is needed on gcc 4.4.2 to make the #ident complaints go away.
# -Wno-strict-aliasing is needed on gcc 4.4.2 to make certain gratuitous warnings go away.
ifeq ($(GCCVERSION),4.4.2)
......@@ -464,6 +465,14 @@ BIN_FROM_O_FLAGS_NOLIB=$(CFLAGS) $(CPPFLAGS) $(LDFLAGS_NOLIB) $(BINOUTPUT)$@
%.$(OEXT):%.c $(DEPEND_COMPILE)
$(CC) $< -c $(CPPFLAGS) $(CFLAGS) $(OOUTPUT)$@
CILKPP=/usr/local/cilk/bin/cilk++
CILKFLAGS=$(filter-out -Wbad-function-cast -Wstrict-prototypes -Wmissing-prototypes -Wmissing-declarations -std=c99,$(CFLAGS)) $(CPPFLAGS)
%$(BINSUF):%.cilk $(DEPEND_COMPILE)
$(CILKPP) $(CILKFLAGS) $< $(LOADLIBES) -o $@
%.serial$(BINSUF): %.cilk
$(CILKPP) -fcilk-stub $(CILKFLAGS) $^ $(LOADLIBES) -o $@
%.$(AEXT):
$(AR) $(ARFLAGS) $(AROUTPUT)$@ $(filter %.$(OEXT),$^) $(patsubst %.bundle, %.bundle/*.$(OEXT), $(filter-out %.$(OEXT),$^))
ifeq ($(AEXT),a)
......
......@@ -122,9 +122,9 @@ ssize_t write(int, const void *, size_t) __attribute__((__deprecated_
ssize_t pwrite(int, const void *, size_t, off_t) __attribute__((__deprecated__));
#endif
# ifndef DONT_DEPRECATE_MALLOC
void *malloc(size_t) __attribute__((__deprecated__));
void free(void*) __attribute__((__deprecated__));
void *realloc(void*, size_t) __attribute__((__deprecated__));
extern void *malloc(size_t) __THROW __attribute__((__deprecated__)) ;
extern void free(void*) __THROW __attribute__((__deprecated__));
extern void *realloc(void*, size_t) __THROW __attribute__((__deprecated__));
# endif
# endif
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment