/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: #ident "Copyright (c) 2009 Tokutek Inc. All rights reserved." #ident "$Id$" #ifndef _THREADED_STRESS_TEST_HELPERS_H_ #define _THREADED_STRESS_TEST_HELPERS_H_ #include <config.h> #include "test.h" #include <stdio.h> #include <stdlib.h> #include <locale.h> #include <unistd.h> #include <sys/stat.h> #include <db.h> #if defined(HAVE_MALLOC_H) # include <malloc.h> #elif defined(HAVE_SYS_MALLOC_H) # include <sys/malloc.h> #endif #include <math.h> #include <memory.h> #include <toku_race_tools.h> #include <portability/toku_atomic.h> #include <portability/toku_pthread.h> #include <portability/toku_random.h> #include <portability/toku_time.h> #include <util/rwlock.h> #include <util/kibbutz.h> #include <ft/ybt.h> using namespace toku; #if !defined(HAVE_MEMALIGN) # if defined(HAVE_VALLOC) static void * memalign(size_t UU(alignment), size_t size) { return valloc(size); } # else # error "no suitable aligned malloc available (checked memalign and valloc)" # endif #endif volatile bool run_test; // should be volatile since we are communicating through this variable. typedef struct arg *ARG; typedef int (*operation_t)(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra); typedef int (*test_update_callback_f)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra); typedef int (*test_generate_row_for_put_callback)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data); typedef int (*test_generate_row_for_del_callback)(DB *dest_db, DB *src_db, DBT *dest_key, const DBT *src_key, const DBT *src_data); enum stress_lock_type { STRESS_LOCK_NONE = 0, STRESS_LOCK_SHARED, STRESS_LOCK_EXCL }; struct env_args { int node_size; int basement_node_size; int rollback_node_size; int checkpointing_period; int cleaner_period; int cleaner_iterations; uint64_t lk_max_memory; uint64_t cachetable_size; uint32_t num_bucket_mutexes; const char *envdir; test_update_callback_f update_function; // update callback function test_generate_row_for_put_callback generate_put_callback; test_generate_row_for_del_callback generate_del_callback; }; enum perf_output_format { HUMAN = 0, CSV, TSV, #if 0 GNUPLOT, #endif NUM_OUTPUT_FORMATS }; struct cli_args { int num_elements; // number of elements per DB int num_DBs; // number of DBs int num_seconds; // how long test should run int join_timeout; // how long to wait for threads to join before assuming deadlocks bool only_create; // true if want to only create DBs but not run stress bool only_stress; // true if DBs are already created and want to only run stress int update_broadcast_period_ms; // specific to test_stress3 int num_ptquery_threads; // number of threads to run point queries bool do_test_and_crash; // true if we should crash after running stress test. For recovery tests. bool do_recover; // true if we should run recover int num_update_threads; // number of threads running updates int num_put_threads; // number of threads running puts bool serial_insert; bool interleave; // for insert benchmarks, whether to interleave // separate threads' puts (or segregate them) bool crash_on_operation_failure; bool print_performance; bool print_thread_performance; bool print_iteration_performance; enum perf_output_format perf_output_format; int performance_period; uint32_t txn_size; // specifies number of updates/puts/whatevers per txn uint32_t key_size; // number of bytes in vals. Must be at least 4 uint32_t val_size; // number of bytes in vals. Must be at least 4 double compressibility; // how much of each key/val (as a fraction in [0,1]) can be compressed away // First 4-8 bytes of key may be ignored struct env_args env_args; // specifies environment variables bool single_txn; bool warm_cache; // warm caches before running stress_table bool blackhole; // all message injects are no-ops. helps measure txn/logging/locktree overhead. bool nolocktree; // use this flag to avoid the locktree on insertions bool unique_checks; // use uniqueness checking during insert. makes it slow. bool nosync; // do not fsync on txn commit. useful for testing in memory performance. bool nolog; // do not log. useful for testing in memory performance. bool nocrashstatus; // do not print engine status upon crash bool prelock_updates; // update threads perform serial updates on a prelocked range bool disperse_keys; // spread the keys out during a load (by reversing the bits in the loop index) to make a wide tree we can spread out random inserts into }; struct arg { DB **dbp; // array of DBs DB_ENV* env; // environment used bool bounded_element_range; // true if elements in dictionary are bounded // by num_elements, that is, all keys in each // DB are in [0, num_elements) // false otherwise int sleep_ms; // number of milliseconds to sleep between operations uint32_t txn_type; // isolation level for txn running operation operation_t operation; // function that is the operation to be run void* operation_extra; // extra parameter passed to operation enum stress_lock_type lock_type; // states if operation must be exclusive, shared, or does not require locking struct random_data *random_data; // state for random_r int thread_idx; int num_threads; struct cli_args *cli; bool do_prepare; bool prelock_updates; }; DB_TXN * const null_txn = 0; static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) { arg->cli = cli_args; arg->dbp = dbp; arg->env = env; arg->bounded_element_range = true; arg->sleep_ms = 0; arg->lock_type = STRESS_LOCK_NONE; arg->txn_type = DB_TXN_SNAPSHOT; arg->operation_extra = NULL; arg->do_prepare = false; arg->prelock_updates = false; } enum operation_type { OPERATION = 0, PUTS, PTQUERIES, NUM_OPERATION_TYPES }; const char *operation_names[] = { "ops", "puts", "ptqueries", NULL }; static void increment_counter(void *extra, enum operation_type type, uint64_t inc) { invariant(type != OPERATION); int t = (int) type; invariant(extra); invariant(t >= 0 && t < (int) NUM_OPERATION_TYPES); uint64_t *CAST_FROM_VOIDP(counters, extra); counters[t] += inc; } struct perf_formatter { void (*header)(const struct cli_args *cli_args, const int num_threads); void (*iteration)(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads); void (*totals)(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads); }; static inline int seconds_in_this_iteration(const int current_time, const int performance_period) { const int iteration = (current_time + performance_period - 1) / performance_period; return current_time - ((iteration - 1) * performance_period); } static void human_print_perf_header(const struct cli_args *UU(cli_args), const int UU(num_threads)) {} static void human_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads) { const int secondsthisiter = seconds_in_this_iteration(current_time, cli_args->performance_period); for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { uint64_t period_total = 0; printf("%4d %s", current_time, operation_names[op]); for (int i = strlen(operation_names[op]); i < 12; ++i) { printf(" "); } for (int t = 0; t < num_threads; ++t) { const uint64_t last = last_counters[t][op]; const uint64_t current = counters[t][op]; const uint64_t this_iter = current - last; if (cli_args->print_thread_performance) { const double persecond = (double) this_iter / secondsthisiter; printf("\t%'12" PRIu64 " (%'12.1lf/s)", this_iter, persecond); } period_total += this_iter; last_counters[t][op] = current; } const double totalpersecond = (double) period_total / secondsthisiter; printf("\tTotal %'12" PRIu64 " (%'12.1lf/s)\n", period_total, totalpersecond); } fflush(stdout); } static void human_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads) { if (cli_args->print_iteration_performance) { printf("\n"); } printf("Overall performance:\n"); uint64_t overall_totals[(int) NUM_OPERATION_TYPES]; ZERO_ARRAY(overall_totals); for (int t = 0; t < num_threads; ++t) { if (cli_args->print_thread_performance) { printf("Thread %4d: ", t + 1); } for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const uint64_t current = counters[t][op]; if (cli_args->print_thread_performance) { const double persecond = (double) current / cli_args->num_seconds; printf("\t%s\t%'12" PRIu64 " (%'12.1lf/s)", operation_names[op], current, persecond); } overall_totals[op] += current; } if (cli_args->print_thread_performance) { printf("\n"); } } printf("All threads: "); for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const double totalpersecond = (double) overall_totals[op] / cli_args->num_seconds; printf("\t%s\t%'12" PRIu64 " (%'12.1lf/s)", operation_names[op], overall_totals[op], totalpersecond); } printf("\n"); } static void csv_print_perf_header(const struct cli_args *cli_args, const int num_threads) { printf("seconds"); if (cli_args->print_thread_performance) { for (int t = 1; t <= num_threads; ++t) { for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { printf(",\"Thread %d %s\",\"Thread %d %s/s\"", t, operation_names[op], t, operation_names[op]); } } } for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { printf(",\"Total %s\",\"Total %s/s\"", operation_names[op], operation_names[op]); } printf("\n"); } static void csv_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads) { const int secondsthisiter = seconds_in_this_iteration(current_time, cli_args->performance_period); printf("%d", current_time); uint64_t period_totals[(int) NUM_OPERATION_TYPES]; ZERO_ARRAY(period_totals); for (int t = 0; t < num_threads; ++t) { for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const uint64_t last = last_counters[t][op]; const uint64_t current = counters[t][op]; const uint64_t this_iter = current - last; if (cli_args->print_thread_performance) { const double persecond = (double) this_iter / secondsthisiter; printf(",%" PRIu64 ",%.1lf", this_iter, persecond); } period_totals[op] += this_iter; last_counters[t][op] = current; } } for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const double totalpersecond = (double) period_totals[op] / secondsthisiter; printf(",%" PRIu64 ",%.1lf", period_totals[op], totalpersecond); } printf("\n"); fflush(stdout); } static void csv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads) { printf("overall"); uint64_t overall_totals[(int) NUM_OPERATION_TYPES]; ZERO_ARRAY(overall_totals); for (int t = 0; t < num_threads; ++t) { for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const uint64_t current = counters[t][op]; if (cli_args->print_thread_performance) { const double persecond = (double) current / cli_args->num_seconds; printf(",%" PRIu64 ",%.1lf", current, persecond); } overall_totals[op] += current; } } for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const double totalpersecond = (double) overall_totals[op] / cli_args->num_seconds; printf(",%" PRIu64 ",%.1lf", overall_totals[op], totalpersecond); } printf("\n"); } static void tsv_print_perf_header(const struct cli_args *cli_args, const int num_threads) { printf("\"seconds\""); if (cli_args->print_thread_performance) { for (int t = 1; t <= num_threads; ++t) { for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { printf("\t\"Thread %d %s\"\t\"Thread %d %s/s\"", t, operation_names[op], t, operation_names[op]); } } } for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { printf("\t\"Total %s\"\t\"Total %s/s\"", operation_names[op], operation_names[op]); } printf("\n"); } static void tsv_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads) { const int secondsthisiter = seconds_in_this_iteration(current_time, cli_args->performance_period); printf("%d", current_time); uint64_t period_totals[(int) NUM_OPERATION_TYPES]; ZERO_ARRAY(period_totals); for (int t = 0; t < num_threads; ++t) { for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const uint64_t last = last_counters[t][op]; const uint64_t current = counters[t][op]; const uint64_t this_iter = current - last; if (cli_args->print_thread_performance) { const double persecond = (double) this_iter / secondsthisiter; printf("\t%" PRIu64 "\t%.1lf", this_iter, persecond); } period_totals[op] += this_iter; last_counters[t][op] = current; } } for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const double totalpersecond = (double) period_totals[op] / secondsthisiter; printf("\t%" PRIu64 "\t%.1lf", period_totals[op], totalpersecond); } printf("\n"); fflush(stdout); } static void tsv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], const int num_threads) { printf("\"overall\""); uint64_t overall_totals[(int) NUM_OPERATION_TYPES]; ZERO_ARRAY(overall_totals); for (int t = 0; t < num_threads; ++t) { for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const uint64_t current = counters[t][op]; if (cli_args->print_thread_performance) { const double persecond = (double) current / cli_args->num_seconds; printf("\t%" PRIu64 "\t%.1lf", current, persecond); } overall_totals[op] += current; } } for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const double totalpersecond = (double) overall_totals[op] / cli_args->num_seconds; printf("\t%" PRIu64 "\t%.1lf", overall_totals[op], totalpersecond); } printf("\n"); } #if 0 static void gnuplot_print_perf_header(const struct cli_args *cli_args, const int num_threads) { printf("set terminal postscript solid color\n"); printf("set output \"foo.eps\"\n"); printf("set xlabel \"seconds\"\n"); printf("set xrange [0:*]\n"); printf("set ylabel \"X/s\"\n"); printf("plot "); if (cli_args->print_thread_performance) { for (int t = 1; t <= num_threads; ++t) { for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const int col = (2 * ((t - 1) * (int) NUM_OPERATION_TYPES + op)) + 2; //printf("'-' u 1:%d w lines t \"Thread %d %s\", ", col, t, operation_names[op]); printf("'-' u 1:%d w lines t \"Thread %d %s/s\", ", col + 1, t, operation_names[op]); } } } for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) { const int col = (2 * (num_threads * (int) NUM_OPERATION_TYPES + op)) + 2; //printf("'-' u 1:%d w lines t \"Total %s\", ", col); printf("'-' u 1:%d w lines t \"Total %s/s\"%s", col + 1, operation_names[op], op == ((int) NUM_OPERATION_TYPES - 1) ? "\n" : ", "); } } static void gnuplot_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads) { tsv_print_perf_iteration(cli_args, current_time, last_counters, counters, num_threads); } static void gnuplot_print_perf_totals(const struct cli_args *UU(cli_args), uint64_t *UU(counters[]), const int UU(num_threads)) { printf("e\n"); } #endif const struct perf_formatter perf_formatters[] = { [HUMAN] = { .header = human_print_perf_header, .iteration = human_print_perf_iteration, .totals = human_print_perf_totals }, [CSV] = { .header = csv_print_perf_header, .iteration = csv_print_perf_iteration, .totals = csv_print_perf_totals }, [TSV] = { .header = tsv_print_perf_header, .iteration = tsv_print_perf_iteration, .totals = tsv_print_perf_totals }, #if 0 [GNUPLOT] = { .header = gnuplot_print_perf_header, .iteration = gnuplot_print_perf_iteration, .totals = gnuplot_print_perf_totals } #endif }; static int get_env_open_flags(struct cli_args *args) { int flags = DB_INIT_LOCK|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE; flags |= args->nolog ? 0 : DB_INIT_LOG; return flags; } static int get_put_flags(struct cli_args *args) { int flags = 0; flags |= args->nolocktree ? DB_PRELOCKED_WRITE : 0; flags |= args->unique_checks ? DB_NOOVERWRITE : 0; return flags; } static int get_commit_flags(struct cli_args *args) { int flags = 0; flags |= args->nosync ? DB_TXN_NOSYNC : 0; return flags; } struct worker_extra { struct arg* thread_arg; toku_mutex_t *operation_lock_mutex; struct rwlock *operation_lock; uint64_t *counters; int64_t pad[4]; // pad to 64 bytes }; static void lock_worker_op(struct worker_extra* we) { ARG arg = we->thread_arg; if (arg->lock_type != STRESS_LOCK_NONE) { toku_mutex_lock(we->operation_lock_mutex); if (arg->lock_type == STRESS_LOCK_SHARED) { rwlock_read_lock(we->operation_lock, we->operation_lock_mutex); } else if (arg->lock_type == STRESS_LOCK_EXCL) { rwlock_write_lock(we->operation_lock, we->operation_lock_mutex); } else { assert(false); } toku_mutex_unlock(we->operation_lock_mutex); } } static void unlock_worker_op(struct worker_extra* we) { ARG arg = we->thread_arg; if (arg->lock_type != STRESS_LOCK_NONE) { toku_mutex_lock(we->operation_lock_mutex); if (arg->lock_type == STRESS_LOCK_SHARED) { rwlock_read_unlock(we->operation_lock); } else if (arg->lock_type == STRESS_LOCK_EXCL) { rwlock_write_unlock(we->operation_lock); } else { assert(false); } toku_mutex_unlock(we->operation_lock_mutex); } } static void *worker(void *arg_v) { int r; struct worker_extra* CAST_FROM_VOIDP(we, arg_v); ARG arg = we->thread_arg; struct random_data random_data; ZERO_STRUCT(random_data); char *XCALLOC_N(8, random_buf); r = myinitstate_r(random(), random_buf, 8, &random_data); assert_zero(r); arg->random_data = &random_data; DB_ENV *env = arg->env; DB_TXN *txn = NULL; if (verbose) { toku_pthread_t self = toku_pthread_self(); uintptr_t intself = (uintptr_t) self; printf("%lu starting %p\n", (unsigned long) intself, arg->operation); } if (arg->cli->single_txn) { r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r); } while (run_test) { lock_worker_op(we); if (!arg->cli->single_txn) { r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r); } r = arg->operation(txn, arg, arg->operation_extra, we->counters); if (r==0 && !arg->cli->single_txn && arg->do_prepare) { uint8_t gid[DB_GID_SIZE]; memset(gid, 0, DB_GID_SIZE); uint64_t gid_val = txn->id64(txn); uint64_t *gid_count_p = cast_to_typeof(gid_count_p) gid; // make gcc --happy about -Wstrict-aliasing *gid_count_p = gid_val; int rr = txn->prepare(txn, gid); assert_zero(rr); } if (r == 0) { if (!arg->cli->single_txn) { int flags = get_commit_flags(arg->cli); int chk_r = txn->commit(txn, flags); CKERR(chk_r); } } else { if (arg->cli->crash_on_operation_failure) { CKERR(r); } else { if (!arg->cli->single_txn) { { int chk_r = txn->abort(txn); CKERR(chk_r); } } } } unlock_worker_op(we); we->counters[OPERATION]++; if (arg->sleep_ms) { usleep(arg->sleep_ms * 1000); } } if (arg->cli->single_txn) { int flags = get_commit_flags(arg->cli); int chk_r = txn->commit(txn, flags); CKERR(chk_r); } if (verbose) { toku_pthread_t self = toku_pthread_self(); uintptr_t intself = (uintptr_t) self; printf("%lu returning\n", (unsigned long) intself); } toku_free(random_buf); return arg; } typedef struct scan_cb_extra *SCAN_CB_EXTRA; struct scan_cb_extra { bool fast; int64_t curr_sum; int64_t num_elements; }; struct scan_op_extra { bool fast; bool fwd; bool prefetch; }; static int scan_cb(const DBT *a, const DBT *b, void *arg_v) { SCAN_CB_EXTRA CAST_FROM_VOIDP(cb_extra, arg_v); assert(a); assert(b); assert(cb_extra); assert(b->size >= sizeof(int)); cb_extra->curr_sum += *(int *)b->data; cb_extra->num_elements++; return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0; } static int scan_op_and_maybe_check_sum( DB* db, DB_TXN *txn, struct scan_op_extra* sce, bool check_sum ) { int r = 0; DBC* cursor = NULL; struct scan_cb_extra e; e.fast = sce->fast; e.curr_sum = 0; e.num_elements = 0; { int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); } if (sce->prefetch) { r = cursor->c_pre_acquire_range_lock(cursor, db->dbt_neg_infty(), db->dbt_pos_infty()); assert(r == 0); } while (r != DB_NOTFOUND) { if (sce->fwd) { r = cursor->c_getf_next(cursor, 0, scan_cb, &e); } else { r = cursor->c_getf_prev(cursor, 0, scan_cb, &e); } assert(r==0 || r==DB_NOTFOUND); if (!run_test) { // terminate early because this op takes a while under drd. // don't check the sum if we do this. check_sum = false; break; } } { int chk_r = cursor->c_close(cursor); CKERR(chk_r); } if (r == DB_NOTFOUND) { r = 0; } if (check_sum && e.curr_sum) { printf("e.curr_sum: %" PRId64 " e.num_elements: %" PRId64 " \n", e.curr_sum, e.num_elements); assert(false); } return r; } static int generate_row_for_put( DB *UU(dest_db), DB *UU(src_db), DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val ) { dest_key->data = src_key->data; dest_key->size = src_key->size; dest_key->flags = 0; dest_val->data = src_val->data; dest_val->size = src_val->size; dest_val->flags = 0; return 0; } static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { return 0; } static int UU() xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { size_t s = 256; void *p = toku_xmalloc(s); toku_free(p); return 0; } #if DONT_DEPRECATE_MALLOC static int UU() malloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { size_t s = 256; void *p = malloc(s); free(p); return 0; } #endif // Fill array with compressibility*size 0s. // 0.0<=compressibility<=1.0 // Compressibility is the fraction of size that will be 0s (e.g. approximate fraction that will be compressed away). // The rest will be random data. static void fill_zeroed_array(uint8_t *data, uint32_t size, struct random_data *random_data, double compressibility) { //Requires: The array was zeroed since the last time 'size' was changed. //Requires: compressibility is in range [0,1] indicating fraction that should be zeros. uint32_t num_random_bytes = (1 - compressibility) * size; if (num_random_bytes > 0) { uint32_t filled; for (filled = 0; filled + sizeof(uint64_t) <= num_random_bytes; filled += sizeof(uint64_t)) { *((uint64_t *) &data[filled]) = randu64(random_data); } if (filled != num_random_bytes) { uint64_t last8 = randu64(random_data); memcpy(&data[filled], &last8, num_random_bytes - filled); } } } static inline size_t size_t_max(size_t a, size_t b) { return (a > b) ? a : b; } static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, void *stats_extra) { int r = 0; uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))]; uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b; uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b; ZERO_ARRAY(rand_key_b); uint8_t valbuf[arg->cli->val_size]; ZERO_ARRAY(valbuf); uint64_t puts_to_increment = 0; for (uint32_t i = 0; i < arg->cli->txn_size; ++i) { rand_key_key[0] = randu64(arg->random_data); if (arg->cli->interleave) { rand_key_i[3] = arg->thread_idx; } else { rand_key_i[0] = arg->thread_idx; } fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility); DBT key, val; dbt_init(&key, &rand_key_b, sizeof rand_key_b); dbt_init(&val, valbuf, sizeof valbuf); int flags = get_put_flags(arg->cli); r = db->put(db, txn, &key, &val, flags); if (!ignore_errors && r != 0) { goto cleanup; } puts_to_increment++; if (puts_to_increment == 100) { increment_counter(stats_extra, PUTS, puts_to_increment); puts_to_increment = 0; } } cleanup: increment_counter(stats_extra, PUTS, puts_to_increment); return r; } static int UU() random_put_multiple_op(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) { const int num_dbs = arg->cli->num_DBs; DB **dbs = arg->dbp; DB_ENV *env = arg->env; DBT mult_key_dbt[num_dbs]; DBT mult_put_dbt[num_dbs]; uint32_t mult_put_flags[num_dbs]; memset(mult_key_dbt, 0, sizeof(mult_key_dbt)); memset(mult_put_dbt, 0, sizeof(mult_put_dbt)); memset(mult_put_flags, 0, sizeof(mult_put_dbt)); int r = 0; uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))]; uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b; uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b; ZERO_ARRAY(rand_key_b); uint8_t valbuf[arg->cli->val_size]; ZERO_ARRAY(valbuf); uint64_t puts_to_increment = 0; for (uint32_t i = 0; i < arg->cli->txn_size; ++i) { rand_key_key[0] = randu64(arg->random_data); if (arg->cli->interleave) { rand_key_i[3] = arg->thread_idx; } else { rand_key_i[0] = arg->thread_idx; } fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility); DBT key, val; dbt_init(&key, &rand_key_b, sizeof rand_key_b); dbt_init(&val, valbuf, sizeof valbuf); r = env->put_multiple( env, dbs[0], // source db. this is arbitrary. txn, &key, // source db key &val, // source db value num_dbs, // total number of dbs dbs, // array of dbs mult_key_dbt, // array of keys mult_put_dbt, // array of values mult_put_flags // array of flags ); if (r != 0) { goto cleanup; } puts_to_increment++; if (puts_to_increment == 100) { increment_counter(stats_extra, PUTS, puts_to_increment); puts_to_increment = 0; } } cleanup: return r; } static int UU() random_put_op(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) { int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; DB* db = arg->dbp[db_index]; return random_put_in_db(db, txn, arg, false, stats_extra); } static int UU() random_put_op_singledb(DB_TXN *txn, ARG arg, void *UU(operation_extra), void *stats_extra) { int db_index = arg->thread_idx%arg->cli->num_DBs; DB* db = arg->dbp[db_index]; return random_put_in_db(db, txn, arg, false, stats_extra); } struct serial_put_extra { uint64_t current; }; static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra) { struct serial_put_extra *CAST_FROM_VOIDP(extra, operation_extra); int db_index = arg->thread_idx % arg->cli->num_DBs; DB* db = arg->dbp[db_index]; int r = 0; uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))]; uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b; uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b; ZERO_ARRAY(rand_key_b); uint8_t valbuf[arg->cli->val_size]; ZERO_ARRAY(valbuf); uint64_t puts_to_increment = 0; for (uint32_t i = 0; i < arg->cli->txn_size; ++i) { rand_key_key[0] = extra->current++; fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility); DBT key, val; dbt_init(&key, &rand_key_b, sizeof rand_key_b); dbt_init(&val, valbuf, sizeof valbuf); int flags = get_put_flags(arg->cli); r = db->put(db, txn, &key, &val, flags); if (r != 0) { goto cleanup; } puts_to_increment++; if (puts_to_increment == 100) { increment_counter(stats_extra, PUTS, puts_to_increment); puts_to_increment = 0; } } cleanup: increment_counter(stats_extra, PUTS, puts_to_increment); return r; } static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { DB_ENV* env = arg->env; int r; for (int num = 0; num < 2; num++) { DB *db_load; uint32_t db_flags = 0; uint32_t dbt_flags = 0; r = db_create(&db_load, env, 0); assert(r == 0); r = db_load->open(db_load, txn, "loader-db", NULL, DB_BTREE, DB_CREATE, 0666); assert(r == 0); DB_LOADER *loader; uint32_t loader_flags = (num == 0) ? 0 : LOADER_USE_PUTS; r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags); CKERR(r); for (int i = 0; i < 1000; i++) { DBT key, val; int rand_key = i; int rand_val = myrandom_r(arg->random_data); dbt_init(&key, &rand_key, sizeof(rand_key)); dbt_init(&val, &rand_val, sizeof(rand_val)); r = loader->put(loader, &key, &val); CKERR(r); } r = loader->close(loader); CKERR(r); r = db_load->close(db_load, 0); CKERR(r); r = env->dbremove(env, txn, "loader-db", NULL, 0); CKERR(r); } return 0; } static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { int r; // callback is designed to run on tests with one DB // no particular reason why, just the way it was // originally done int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; DB* db = arg->dbp[db_index]; int rand_key = myrandom_r(arg->random_data); if (arg->bounded_element_range) { rand_key = rand_key % arg->cli->num_elements; } DBT key; dbt_init(&key, &rand_key, sizeof rand_key); uint64_t less,equal,greater; int is_exact; r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact); assert(r == 0); return r; } static int verify_progress_callback(void *UU(extra), float UU(progress)) { if (!run_test) { return -1; } return 0; } static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { int r = 0; for (int i = 0; i < arg->cli->num_DBs && run_test; i++) { DB* db = arg->dbp[i]; r = db->verify_with_progress(db, verify_progress_callback, nullptr, 1, 0); if (!run_test) { r = 0; } CKERR(r); } return r; } static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) { struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra); for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, true); assert_zero(r); } return 0; } static int UU() scan_op_no_check(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) { struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra); for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { int r = scan_op_and_maybe_check_sum(arg->dbp[i], txn, extra, false); assert_zero(r); } return 0; } struct scan_op_worker_info { DB *db; DB_TXN *txn; void *extra; }; static void scan_op_worker(void *arg) { struct scan_op_worker_info *CAST_FROM_VOIDP(info, arg); struct scan_op_extra *CAST_FROM_VOIDP(extra, info->extra); int r = scan_op_and_maybe_check_sum( info->db, info->txn, extra, false ); assert_zero(r); toku_free(info); } static int UU() scan_op_no_check_parallel(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) { const int num_cores = toku_os_get_number_processors(); KIBBUTZ kibbutz = toku_kibbutz_create(num_cores); for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { struct scan_op_worker_info *XCALLOC(info); info->db = arg->dbp[i]; info->txn = txn; info->extra = operation_extra; toku_kibbutz_enq(kibbutz, scan_op_worker, info); } toku_kibbutz_destroy(kibbutz); return 0; } static int dbt_do_nothing (DBT const *UU(key), DBT const *UU(row), void *UU(context)) { return 0; } static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) { int r; int rand_key = myrandom_r(arg->random_data); if (arg->bounded_element_range) { rand_key = rand_key % arg->cli->num_elements; } DBT key, val; dbt_init(&key, &rand_key, sizeof rand_key); dbt_init(&val, NULL, 0); r = db->getf_set( db, txn, 0, &key, dbt_do_nothing, NULL ); if (check) assert(r != DB_NOTFOUND); r = 0; return r; } static int UU() ptquery_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *stats_extra) { int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; DB* db = arg->dbp[db_index]; int r = ptquery_and_maybe_check_op(db, txn, arg, true); if (!r) { increment_counter(stats_extra, PTQUERIES, 1); } return r; } static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *stats_extra) { int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; DB* db = arg->dbp[db_index]; int r = ptquery_and_maybe_check_op(db, txn, arg, false); if (!r) { increment_counter(stats_extra, PTQUERIES, 1); } return r; } static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0; DB* db = arg->dbp[db_index]; DBC* cursor = NULL; int r = db->cursor(db, txn, &cursor, 0); assert(r == 0); r = cursor->c_close(cursor); assert(r == 0); return 0; } #define MAX_RANDOM_VAL 10000 enum update_type { UPDATE_ADD_DIFF, UPDATE_NEGATE, UPDATE_WITH_HISTORY }; struct update_op_extra { enum update_type type; int pad_bytes; union { struct { int diff; } d; struct { int expected; int new_val; } h; } u; }; struct update_op_args { int *update_history_buffer; int update_pad_frequency; }; static struct update_op_args UU() get_update_op_args(struct cli_args* cli_args, int* update_history_buffer) { struct update_op_args uoe; uoe.update_history_buffer = update_history_buffer; uoe.update_pad_frequency = cli_args->num_elements/100; // arbitrary return uoe; } static uint64_t update_count = 0; static int update_op_callback(DB *UU(db), const DBT *UU(key), const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra) { int old_int_val = 0; if (old_val) { old_int_val = *(int*)old_val->data; } assert(extra->size == sizeof(struct update_op_extra)); struct update_op_extra *CAST_FROM_VOIDP(e, extra->data); int new_int_val; switch (e->type) { case UPDATE_ADD_DIFF: new_int_val = old_int_val + e->u.d.diff; break; case UPDATE_NEGATE: new_int_val = -old_int_val; break; case UPDATE_WITH_HISTORY: assert(old_int_val == e->u.h.expected); new_int_val = e->u.h.new_val; break; default: assert(false); } DBT new_val; uint32_t data_size = sizeof(int) + e->pad_bytes; char* data [data_size]; ZERO_ARRAY(data); memcpy(data, &new_int_val, sizeof(new_int_val)); set_val(dbt_init(&new_val, data, data_size), set_extra); return 0; } static int UU()update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { int r; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; DB* db = arg->dbp[db_index]; int curr_val_sum = 0; DBT key, val; int rand_key; int rand_key2; toku_sync_fetch_and_add(&update_count, 1); struct update_op_extra extra; ZERO_STRUCT(extra); extra.type = UPDATE_ADD_DIFF; extra.pad_bytes = 0; for (uint32_t i = 0; i < arg->cli->txn_size; i++) { rand_key = myrandom_r(arg->random_data); if (arg->bounded_element_range) { rand_key = rand_key % (arg->cli->num_elements/2); } rand_key2 = arg->cli->num_elements - rand_key; assert(rand_key != rand_key2); extra.u.d.diff = 1; curr_val_sum += extra.u.d.diff; r = db->update( db, txn, dbt_init(&key, &rand_key, sizeof rand_key), dbt_init(&val, &extra, sizeof extra), 0 ); if (r != 0) { return r; } extra.u.d.diff = -1; r = db->update( db, txn, dbt_init(&key, &rand_key2, sizeof rand_key), dbt_init(&val, &extra, sizeof extra), 0 ); if (r != 0) { return r; } } return r; } static int pre_acquire_write_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT *right_key) { int r; DBC *cursor; r = db->cursor(db, txn, &cursor, DB_RMW); CKERR(r); int cursor_r = cursor->c_pre_acquire_range_lock(cursor, left_key, right_key); r = cursor->c_close(cursor); CKERR(r); return cursor_r; } // take the given db and do an update on it static int UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) { int r = 0; int curr_val_sum = 0; DBT key, val; int update_key; uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1); struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra); struct update_op_extra extra; ZERO_STRUCT(extra); extra.type = UPDATE_ADD_DIFF; extra.pad_bytes = 0; if (op_args->update_pad_frequency) { if (old_update_count % (2*op_args->update_pad_frequency) == old_update_count%op_args->update_pad_frequency) { extra.pad_bytes = 100; } } const int update_flags = arg->cli->prelock_updates ? DB_PRELOCKED_WRITE : 0; for (uint32_t i = 0; i < arg->cli->txn_size; i++) { if (arg->prelock_updates) { if (i == 0) { update_key = myrandom_r(arg->random_data); if (arg->bounded_element_range) { update_key = update_key % arg->cli->num_elements; } const uint32_t max_key_in_table = arg->cli->num_elements - 1; const bool range_wraps = (update_key + arg->cli->txn_size - 1) > max_key_in_table; int left_key, right_key; DBT left_key_dbt, right_key_dbt; // acquire the range starting at the random key, plus txn_size - 1 // elements, but lock no further than the end of the table. if the // range wraps around to the beginning we will handle it below. left_key = update_key; right_key = range_wraps ? max_key_in_table : (left_key + arg->cli->txn_size - 1); r = pre_acquire_write_lock( db, txn, dbt_init(&left_key_dbt, &left_key, sizeof update_key), dbt_init(&right_key_dbt, &right_key, sizeof right_key) ); if (r != 0) { return r; } // check if the right end point wrapped around to the beginning // if so, lock from 0 to the right key, modded by table size. if (range_wraps) { right_key = (left_key + arg->cli->txn_size - 1) - max_key_in_table; invariant(right_key > 0); left_key = 0; r = pre_acquire_write_lock( db, txn, dbt_init(&left_key_dbt, &left_key, sizeof update_key), dbt_init(&right_key_dbt, &right_key, sizeof right_key) ); if (r != 0) { return r; } } } else { update_key++; } } else { // just do a usual, random point update without locking first update_key = myrandom_r(arg->random_data); } if (arg->bounded_element_range) { update_key = update_key % arg->cli->num_elements; } // the last update keeps the table's sum as zero // every other update except the last applies a random delta if (i == arg->cli->txn_size - 1) { extra.u.d.diff = -curr_val_sum; } else { extra.u.d.diff = myrandom_r(arg->random_data) % MAX_RANDOM_VAL; // just make every other value random if (i%2 == 0) { extra.u.d.diff = -extra.u.d.diff; } curr_val_sum += extra.u.d.diff; } // do the update r = db->update( db, txn, dbt_init(&key, &update_key, sizeof update_key), dbt_init(&val, &extra, sizeof extra), update_flags ); if (r != 0) { return r; } } return r; } // choose a random DB and do an update on it static int UU() update_op(DB_TXN *txn, ARG arg, void* operation_extra, void *stats_extra) { int db_index = myrandom_r(arg->random_data) % arg->cli->num_DBs; DB *db = arg->dbp[db_index]; return update_op_db(db, txn, arg, operation_extra, stats_extra); } static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) { struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra); assert(arg->bounded_element_range); assert(op_args->update_history_buffer); int r; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; DB* db = arg->dbp[db_index]; int curr_val_sum = 0; DBT key, val; int rand_key; struct update_op_extra extra; ZERO_STRUCT(extra); extra.type = UPDATE_WITH_HISTORY; uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1); extra.pad_bytes = 0; if (op_args->update_pad_frequency) { if (old_update_count % (2*op_args->update_pad_frequency) != old_update_count%op_args->update_pad_frequency) { extra.pad_bytes = 500; } } for (uint32_t i = 0; i < arg->cli->txn_size; i++) { rand_key = myrandom_r(arg->random_data) % arg->cli->num_elements; extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL; // just make every other value random if (i%2 == 0) { extra.u.h.new_val = -extra.u.h.new_val; } curr_val_sum += extra.u.h.new_val; extra.u.h.expected = op_args->update_history_buffer[rand_key]; op_args->update_history_buffer[rand_key] = extra.u.h.new_val; r = db->update( db, txn, dbt_init(&key, &rand_key, sizeof rand_key), dbt_init(&val, &extra, sizeof extra), 0 ); if (r != 0) { return r; } } // // now put in one more to ensure that the sum stays 0 // extra.u.h.new_val = -curr_val_sum; rand_key = myrandom_r(arg->random_data); if (arg->bounded_element_range) { rand_key = rand_key % arg->cli->num_elements; } extra.u.h.expected = op_args->update_history_buffer[rand_key]; op_args->update_history_buffer[rand_key] = extra.u.h.new_val; r = db->update( db, txn, dbt_init(&key, &rand_key, sizeof rand_key), dbt_init(&val, &extra, sizeof extra), 0 ); if (r != 0) { return r; } return r; } static int UU() update_broadcast_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { struct update_op_extra extra; ZERO_STRUCT(extra); int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; DB* db = arg->dbp[db_index]; extra.type = UPDATE_NEGATE; extra.pad_bytes = 0; DBT val; int r = db->update_broadcast(db, txn, dbt_init(&val, &extra, sizeof extra), 0); CKERR(r); return r; } static int hot_progress_callback(void *UU(extra), float UU(progress)) { return run_test ? 0 : 1; } static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { int r; for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { DB* db = arg->dbp[i]; r = db->hot_optimize(db, hot_progress_callback, NULL); if (run_test) { CKERR(r); } } return 0; } static void get_ith_table_name(char *buf, size_t len, int i) { snprintf(buf, len, "main%d", i); } static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { int r; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; DB* db = arg->dbp[db_index]; r = (db)->close(db, 0); CKERR(r); char name[30]; ZERO_ARRAY(name); get_ith_table_name(name, sizeof(name), db_index); r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0); CKERR(r); r = db_create(&(arg->dbp[db_index]), arg->env, 0); assert(r == 0); r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666); assert(r == 0); return 0; } static inline int intmin(const int a, const int b) { if (a < b) { return a; } return b; } struct test_time_extra { int num_seconds; bool crash_at_end; struct worker_extra *wes; int num_wes; struct cli_args *cli_args; }; static void *test_time(void *arg) { struct test_time_extra* CAST_FROM_VOIDP(tte, arg); int num_seconds = tte->num_seconds; const struct perf_formatter *perf_formatter = &perf_formatters[tte->cli_args->perf_output_format]; // // if num_Seconds is set to 0, run indefinitely // if (num_seconds == 0) { num_seconds = INT32_MAX; } uint64_t last_counter_values[tte->num_wes][(int) NUM_OPERATION_TYPES]; ZERO_ARRAY(last_counter_values); uint64_t *counters[tte->num_wes]; for (int t = 0; t < tte->num_wes; ++t) { counters[t] = tte->wes[t].counters; } if (verbose) { printf("Sleeping for %d seconds\n", num_seconds); } for (int i = 0; i < num_seconds; ) { struct timeval tv[2]; const int sleeptime = intmin(tte->cli_args->performance_period, num_seconds - i); int r = gettimeofday(&tv[0], nullptr); assert_zero(r); usleep(sleeptime*1000*1000); r = gettimeofday(&tv[1], nullptr); assert_zero(r); int actual_sleeptime = tv[1].tv_sec - tv[0].tv_sec; if (abs(actual_sleeptime - sleeptime) <= 1) { // Close enough, no need to alarm the user, and we didn't check nsec. i += sleeptime; } else { if (verbose) { printf("tried to sleep %d secs, actually slept %d secs\n", sleeptime, actual_sleeptime); } i += actual_sleeptime; } if (tte->cli_args->print_performance && tte->cli_args->print_iteration_performance) { perf_formatter->iteration(tte->cli_args, i, last_counter_values, counters, tte->num_wes); } } if (verbose) { printf("should now end test\n"); } toku_sync_bool_compare_and_swap(&run_test, true, false); // make this atomic to make valgrind --tool=drd happy. if (verbose) { printf("run_test %d\n", run_test); } if (tte->crash_at_end) { toku_hard_crash_on_purpose(); } return arg; } static int run_workers( struct arg *thread_args, int num_threads, uint32_t num_seconds, bool crash_at_end, struct cli_args* cli_args ) { int r; const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format]; toku_mutex_t mutex = ZERO_MUTEX_INITIALIZER; toku_mutex_init(&mutex, NULL); struct rwlock rwlock; rwlock_init(&rwlock); toku_pthread_t tids[num_threads]; toku_pthread_t time_tid; if (cli_args->print_performance) { perf_formatter->header(cli_args, num_threads); } struct worker_extra *worker_extra = (struct worker_extra *) memalign(64, num_threads * sizeof (struct worker_extra)); // allocate worker_extra's on cache line boundaries struct test_time_extra tte; tte.num_seconds = num_seconds; tte.crash_at_end = crash_at_end; tte.wes = worker_extra; tte.num_wes = num_threads; tte.cli_args = cli_args; run_test = true; for (int i = 0; i < num_threads; ++i) { thread_args[i].thread_idx = i; thread_args[i].num_threads = num_threads; worker_extra[i].thread_arg = &thread_args[i]; worker_extra[i].operation_lock = &rwlock; worker_extra[i].operation_lock_mutex = &mutex; XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters); TOKU_DRD_IGNORE_VAR(worker_extra[i].counters); { int chk_r = toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]); CKERR(chk_r); } if (verbose) printf("%lu created\n", (unsigned long) tids[i]); } { int chk_r = toku_pthread_create(&time_tid, NULL, test_time, &tte); CKERR(chk_r); } if (verbose) printf("%lu created\n", (unsigned long) time_tid); void *ret; r = toku_pthread_join(time_tid, &ret); assert_zero(r); if (verbose) printf("%lu joined\n", (unsigned long) time_tid); // Set an alarm that will kill us if it takes too long to join all the // threads (i.e. there is some runaway thread). unsigned int remaining = alarm(cli_args->join_timeout); assert_zero(remaining); for (int i = 0; i < num_threads; ++i) { r = toku_pthread_join(tids[i], &ret); assert_zero(r); if (verbose) printf("%lu joined\n", (unsigned long) tids[i]); } // All threads joined, deschedule the alarm. remaining = alarm(0); assert(remaining > 0); if (cli_args->print_performance) { uint64_t *counters[num_threads]; for (int i = 0; i < num_threads; ++i) { counters[i] = worker_extra[i].counters; } perf_formatter->totals(cli_args, counters, num_threads); } for (int i = 0; i < num_threads; ++i) { toku_free(worker_extra[i].counters); } if (verbose) printf("ending test, pthreads have joined\n"); rwlock_destroy(&rwlock); toku_mutex_destroy(&mutex); toku_free(worker_extra); return r; } static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs, int (*bt_compare)(DB *, const DBT *, const DBT *), struct cli_args *cli_args ) { int r; struct env_args env_args = cli_args->env_args; char rmcmd[32 + strlen(env_args.envdir)]; sprintf(rmcmd, "rm -rf %s", env_args.envdir); r = system(rmcmd); CKERR(r); r = toku_os_mkdir(env_args.envdir, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0); DB_ENV *env; db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes); r = db_env_create(&env, 0); assert(r == 0); r = env->set_redzone(env, 0); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r); r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r); r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r); if (env_args.generate_put_callback) { r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback); CKERR(r); } else { r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r); } if (env_args.generate_del_callback) { r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback); CKERR(r); } int env_flags = get_env_open_flags(cli_args); r = env->open(env, env_args.envdir, env_flags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r); r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r); r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r); *env_res = env; for (int i = 0; i < num_DBs; i++) { DB *db; char name[30]; memset(name, 0, sizeof(name)); get_ith_table_name(name, sizeof(name), i); r = db_create(&db, env, 0); CKERR(r); r = db->set_flags(db, 0); CKERR(r); r = db->set_pagesize(db, env_args.node_size); CKERR(r); r = db->set_readpagesize(db, env_args.basement_node_size); CKERR(r); const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0); r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666); CKERR(r); db_res[i] = db; } return r; } static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bufsz, int val_bufsz, void (*callback)(int idx, void *extra, void *key, int *keysz, void *val, int *valsz), void *extra, void (*progress_cb)(int num_rows)) { DB_TXN *txn = nullptr; const int puts_per_txn = 100000; int r = 0; for (long i = 0; i < num_elements; ++i) { if (txn == nullptr) { r = env->txn_begin(env, 0, &txn, 0); CKERR(r); } char keybuf[key_bufsz], valbuf[val_bufsz]; memset(keybuf, 0, sizeof(keybuf)); memset(valbuf, 0, sizeof(valbuf)); int keysz, valsz; callback(i, extra, keybuf, &keysz, valbuf, &valsz); // let's make sure the data stored fits in the buffers we passed in assert(keysz <= key_bufsz); assert(valsz <= val_bufsz); DBT key, val; // make size of data what is specified w/input parameters // note that key and val have sizes of // key_bufsz and val_bufsz, which were passed into this // function, not what was stored by the callback r = db->put( db, txn, dbt_init(&key, keybuf, key_bufsz), dbt_init(&val, valbuf, val_bufsz), // don't bother taking locks in the locktree DB_PRELOCKED_WRITE ); assert(r == 0); if (i > 0 && i % puts_per_txn == 0) { // don't bother fsyncing to disk. // the caller can checkpoint if they want. r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); txn = nullptr; if (verbose) { progress_cb(puts_per_txn); } } } if (txn) { r = txn->commit(txn, DB_TXN_NOSYNC); invariant_zero(r); } return r; } static uint32_t breverse(uint32_t v) // Effect: return the bits in i, reversed // Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious // Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct. { uint32_t r = v; // r will be reversed bits of v; first get LSB of v int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end for (v >>= 1; v; v >>= 1) { r <<= 1; r |= v & 1; s--; } r <<= s; // shift when v's highest bits are zero return r; } static void zero_element_callback(int idx, void *extra, void *keyv, int *keysz, void *valv, int *valsz) { const bool *disperse_keys = static_cast<bool *>(extra); int *CAST_FROM_VOIDP(key, keyv); int *CAST_FROM_VOIDP(val, valv); if (*disperse_keys) { *key = static_cast<int>(breverse(idx)); } else { *key = idx; } *val = 0; *keysz = sizeof(int); *valsz = sizeof(int); } struct fill_table_worker_info { DB_ENV *env; DB *db; int num_elements; uint32_t key_size; uint32_t val_size; bool disperse_keys; void (*progress_cb)(int num_rows); }; static void fill_table_worker(void *arg) { struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg); int r = fill_table_from_fun( info->env, info->db, info->num_elements, info->key_size, info->val_size, zero_element_callback, &info->disperse_keys, info->progress_cb ); invariant_zero(r); toku_free(info); } static int num_tables_to_fill = 1; static int rows_per_table = 1; static void report_overall_fill_table_progress(int num_rows) { // for sanitary reasons we'd like to prevent two threads // from printing the same performance report twice. static bool reporting; // when was the first time measurement taken? static uint64_t t0; static int rows_inserted; // when was the last report? what was its progress? static uint64_t last_report; static double last_progress; if (t0 == 0) { t0 = toku_current_time_usec(); last_report = t0; } uint64_t rows_so_far = toku_sync_add_and_fetch(&rows_inserted, num_rows); double progress = rows_so_far / (rows_per_table * num_tables_to_fill * 1.0); if (progress > (last_progress + .01)) { uint64_t t1 = toku_current_time_usec(); // report no more often than once every 5 seconds, for less output. // there is a race condition. it is probably harmless. const uint64_t minimum_report_period = 5 * 1000000; if (t1 > last_report + minimum_report_period && toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) { double inserts_per_sec = (rows_so_far*1000000) / ((t1 - t0) * 1.0); printf("fill tables: %ld%% complete, %.2lf rows/sec\n", (long)(progress * 100), inserts_per_sec); last_progress = progress; last_report = t1; reporting = false; } } } static int fill_tables_with_zeroes(DB_ENV *env, DB **dbs, int num_DBs, int num_elements, uint32_t key_size, uint32_t val_size, bool disperse_keys) { // set the static globals that the progress reporter uses num_tables_to_fill = num_DBs; rows_per_table = num_elements; const int num_cores = toku_os_get_number_processors(); KIBBUTZ kibbutz = toku_kibbutz_create(num_cores); for (int i = 0; i < num_DBs; i++) { assert(key_size >= sizeof(int)); assert(val_size >= sizeof(int)); struct fill_table_worker_info *XCALLOC(info); info->env = env; info->db = dbs[i]; info->num_elements = num_elements; info->key_size = key_size; info->val_size = val_size; info->disperse_keys = disperse_keys; info->progress_cb = report_overall_fill_table_progress; toku_kibbutz_enq(kibbutz, fill_table_worker, info); } toku_kibbutz_destroy(kibbutz); return 0; } static void do_xa_recovery(DB_ENV* env) { DB_PREPLIST preplist[1]; long num_recovered= 0; int r = 0; r = env->txn_recover(env, preplist, 1, &num_recovered, DB_NEXT); while(r==0 && num_recovered > 0) { DB_TXN* recovered_txn = preplist[0].txn; if (verbose) { printf("recovering transaction with id %" PRIu64 " \n", recovered_txn->id64(recovered_txn)); } if (random() % 2 == 0) { int rr = recovered_txn->commit(recovered_txn, 0); CKERR(rr); } else { int rr = recovered_txn->abort(recovered_txn); CKERR(rr); } r = env->txn_recover(env, preplist, 1, &num_recovered, DB_NEXT); } } static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, int (*bt_compare)(DB *, const DBT *, const DBT *), struct cli_args *cli_args) { int r; struct env_args env_args = cli_args->env_args; /* create the dup database file */ DB_ENV *env; db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes); r = db_env_create(&env, 0); assert(r == 0); r = env->set_redzone(env, 0); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r); r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r); env->set_update(env, env_args.update_function); // set the cache size to 10MB r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r); if (env_args.generate_put_callback) { r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback); CKERR(r); } else { r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r); } if (env_args.generate_del_callback) { r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback); CKERR(r); } int env_flags = get_env_open_flags(cli_args); r = env->open(env, env_args.envdir, DB_RECOVER | env_flags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); do_xa_recovery(env); r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r); r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r); r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r); *env_res = env; for (int i = 0; i < num_DBs; i++) { DB *db; char name[30]; memset(name, 0, sizeof(name)); get_ith_table_name(name, sizeof(name), i); r = db_create(&db, env, 0); CKERR(r); const int flags = cli_args->blackhole ? DB_BLACKHOLE : 0; r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666); CKERR(r); db_res[i] = db; } return r; } static int close_tables(DB_ENV *env, DB** dbs, int num_DBs) { int r; for (int i = 0; i < num_DBs; i++) { r = dbs[i]->close(dbs[i], 0); CKERR(r); } r = env->close(env, 0); CKERR(r); return r; } static const struct env_args DEFAULT_ENV_ARGS = { .node_size = 4096, .basement_node_size = 1024, .rollback_node_size = 4096, .checkpointing_period = 10, .cleaner_period = 1, .cleaner_iterations = 1, .lk_max_memory = 1L * 1024 * 1024 * 1024, .cachetable_size = 300000, .num_bucket_mutexes = 1024, .envdir = ENVDIR, .update_function = update_op_callback, .generate_put_callback = NULL, .generate_del_callback = NULL, }; static const struct env_args DEFAULT_PERF_ENV_ARGS = { .node_size = 4*1024*1024, .basement_node_size = 128*1024, .rollback_node_size = 4*1024*1024, .checkpointing_period = 60, .cleaner_period = 1, .cleaner_iterations = 5, .lk_max_memory = 1L * 1024 * 1024 * 1024, .cachetable_size = 1<<30, .num_bucket_mutexes = 1024 * 1024, .envdir = ENVDIR, .update_function = NULL, .generate_put_callback = NULL, .generate_del_callback = NULL, }; #define MIN_VAL_SIZE sizeof(int) #define MIN_KEY_SIZE sizeof(int) #define MIN_COMPRESSIBILITY (0.0) #define MAX_COMPRESSIBILITY (1.0) static struct cli_args UU() get_default_args(void) { struct cli_args DEFAULT_ARGS = { .num_elements = 150000, .num_DBs = 1, .num_seconds = 180, .join_timeout = 600, .only_create = false, .only_stress = false, .update_broadcast_period_ms = 2000, .num_ptquery_threads = 1, .do_test_and_crash = false, .do_recover = false, .num_update_threads = 1, .num_put_threads = 1, .serial_insert = false, .interleave = false, .crash_on_operation_failure = true, .print_performance = false, .print_thread_performance = true, .print_iteration_performance = true, .perf_output_format = HUMAN, .performance_period = 1, .txn_size = 1000, .key_size = MIN_KEY_SIZE, .val_size = MIN_VAL_SIZE, .compressibility = 1.0, .env_args = DEFAULT_ENV_ARGS, .single_txn = false, .warm_cache = false, .blackhole = false, .nolocktree = false, .unique_checks = false, .nosync = false, .nolog = false, .nocrashstatus = false, .prelock_updates = false, .disperse_keys = false, }; return DEFAULT_ARGS; } static struct cli_args UU() get_default_args_for_perf(void) { struct cli_args args = get_default_args(); args.num_elements = 1000000; //default of 1M //args.print_performance = true; args.env_args = DEFAULT_PERF_ENV_ARGS; return args; } union val_type { int32_t i32; int64_t i64; uint32_t u32; uint64_t u64; bool b; double d; const char *s; }; struct arg_type; typedef bool (*match_fun)(struct arg_type *type, char *const argv[]); typedef int (*parse_fun)(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]); //TODO fix typedef void (*help_fun)(struct arg_type *type, int width_name, int width_type); struct type_description { const char *type_name; const match_fun matches; const parse_fun parse; const help_fun help; }; struct arg_type { const char *name; struct type_description *description; union val_type default_val; void *target; const char *help_suffix; union val_type min; union val_type max; }; #define DEFINE_NUMERIC_HELP(typename, format, member, MIN, MAX) \ static inline void \ help_##typename(struct arg_type *type, int width_name, int width_type) { \ invariant(!strncmp("--", type->name, strlen("--"))); \ fprintf(stderr, "\t%-*s %-*s ", width_name, type->name, width_type, type->description->type_name); \ fprintf(stderr, "(default %" format "%s", type->default_val.member, type->help_suffix); \ if (type->min.member != MIN) { \ fprintf(stderr, ", min %" format "%s", type->min.member, type->help_suffix); \ } \ if (type->max.member != MAX) { \ fprintf(stderr, ", max %" format "%s", type->max.member, type->help_suffix); \ } \ fprintf(stderr, ")\n"); \ } DEFINE_NUMERIC_HELP(int32, PRId32, i32, INT32_MIN, INT32_MAX) DEFINE_NUMERIC_HELP(int64, PRId64, i64, INT64_MIN, INT64_MAX) DEFINE_NUMERIC_HELP(uint32, PRIu32, u32, 0, UINT32_MAX) DEFINE_NUMERIC_HELP(uint64, PRIu64, u64, 0, UINT64_MAX) DEFINE_NUMERIC_HELP(double, ".2lf", d, -HUGE_VAL, HUGE_VAL) static inline void help_bool(struct arg_type *type, int width_name, int width_type) { invariant(strncmp("--", type->name, strlen("--"))); const char *default_value = type->default_val.b ? "yes" : "no"; fprintf(stderr, "\t--[no-]%-*s %-*s (default %s)\n", width_name - (int)strlen("--[no-]"), type->name, width_type, type->description->type_name, default_value); } static inline void help_string(struct arg_type *type, int width_name, int width_type) { invariant(!strncmp("--", type->name, strlen("--"))); const char *default_value = type->default_val.s ? type->default_val.s : ""; fprintf(stderr, "\t%-*s %-*s (default '%s')\n", width_name, type->name, width_type, type->description->type_name, default_value); } static inline bool match_name(struct arg_type *type, char *const argv[]) { invariant(!strncmp("--", type->name, strlen("--"))); return !strcmp(argv[1], type->name); } static inline bool match_bool(struct arg_type *type, char *const argv[]) { invariant(strncmp("--", type->name, strlen("--"))); const char *string = argv[1]; if (strncmp(string, "--", strlen("--"))) { return false; } string += strlen("--"); if (!strncmp(string, "no-", strlen("no-"))) { string += strlen("no-"); } return !strcmp(string, type->name); } static inline int parse_bool(struct arg_type *type, int *extra_args_consumed, int UU(argc), char *const argv[]) { const char *string = argv[1]; if (!strncmp(string, "--no-", strlen("--no-"))) { *((bool *)type->target) = false; } else { *((bool *)type->target) = true; } *extra_args_consumed = 0; return 0; } static inline int parse_string(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) { if (argc < 2) { return EINVAL; } *((const char **)type->target) = argv[2]; *extra_args_consumed = 1; return 0; } static inline int parse_uint64(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) { // Already verified name. if (argc < 2) { return EINVAL; } if (*argv[2] == '\0') { return EINVAL; } char *endptr; unsigned long long int result = strtoull(argv[2], &endptr, 0); if (*endptr != '\0') { return EINVAL; } if (result < type->min.u64 || result > type->max.u64) { return ERANGE; } *((uint64_t*)type->target) = result; *extra_args_consumed = 1; return 0; } static inline int parse_int64(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) { // Already verified name. if (argc < 2) { return EINVAL; } if (*argv[2] == '\0') { return EINVAL; } char *endptr; long long int result = strtoll(argv[2], &endptr, 0); if (*endptr != '\0') { return EINVAL; } if (result < type->min.i64 || result > type->max.i64) { return ERANGE; } *((int64_t*)type->target) = result; *extra_args_consumed = 1; return 0; } static inline int parse_uint32(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) { // Already verified name. if (argc < 2) { return EINVAL; } if (*argv[2] == '\0') { return EINVAL; } char *endptr; unsigned long int result = strtoul(argv[2], &endptr, 0); if (*endptr != '\0') { return EINVAL; } if (result < type->min.u32 || result > type->max.u32) { return ERANGE; } *((int32_t*)type->target) = result; *extra_args_consumed = 1; return 0; } static inline int parse_int32(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) { // Already verified name. if (argc < 2) { return EINVAL; } if (*argv[2] == '\0') { return EINVAL; } char *endptr; long int result = strtol(argv[2], &endptr, 0); if (*endptr != '\0') { return EINVAL; } if (result < type->min.i32 || result > type->max.i32) { return ERANGE; } *((int32_t*)type->target) = result; *extra_args_consumed = 1; return 0; } static inline int parse_double(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]) { // Already verified name. if (argc < 2) { return EINVAL; } if (*argv[2] == '\0') { return EINVAL; } char *endptr; double result = strtod(argv[2], &endptr); if (*endptr != '\0') { return EINVAL; } if (result < type->min.d || result > type->max.d) { return ERANGE; } *((double*)type->target) = result; *extra_args_consumed = 1; return 0; } // Common case (match_name). #define DECLARE_TYPE_DESCRIPTION(typename) \ struct type_description type_##typename = { \ .type_name = #typename, \ .matches = match_name, \ .parse = parse_##typename, \ .help = help_##typename \ } DECLARE_TYPE_DESCRIPTION(int32); DECLARE_TYPE_DESCRIPTION(uint32); DECLARE_TYPE_DESCRIPTION(int64); DECLARE_TYPE_DESCRIPTION(uint64); DECLARE_TYPE_DESCRIPTION(double); DECLARE_TYPE_DESCRIPTION(string); // Bools use their own match function so they are declared manually. struct type_description type_bool = { .type_name = "bool", .matches = match_bool, .parse = parse_bool, .help = help_bool }; #define ARG_MATCHES(type, rest...) type->description->matches(type, rest) #define ARG_PARSE(type, rest...) type->description->parse(type, rest) #define ARG_HELP(type, rest...) type->description->help(type, rest) static inline void do_usage(const char *argv0, int n, struct arg_type types[/*n*/]) { // fprintf(stderr, "\t--compressibility DOUBLE (default %.2f, minimum %.2f, maximum %.2f)\n", // default_args.compressibility, MIN_COMPRESSIBILITY, MAX_COMPRESSIBILITY); fprintf(stderr, "Usage:\n"); fprintf(stderr, "\t%s [-h|--help]\n", argv0); fprintf(stderr, "\t%s [OPTIONS]\n", argv0); fprintf(stderr, "\n"); fprintf(stderr, "OPTIONS are among:\n"); fprintf(stderr, "\t-q|--quiet\n"); fprintf(stderr, "\t-v|--verbose\n"); for (int i = 0; i < n; i++) { struct arg_type *type = &types[i]; ARG_HELP(type, 35, 6); } } static inline void parse_stress_test_args (int argc, char *const argv[], struct cli_args *args) { struct cli_args default_args = *args; const char *argv0=argv[0]; #define MAKE_ARG(name_string, type, member, variable, suffix, min_val, max_val) { \ .name=(name_string), \ .description=&(type), \ .default_val={.member=default_args.variable}, \ .target=&(args->variable), \ .help_suffix=(suffix), \ .min={.member=min_val}, \ .max={.member=max_val}, \ } #define MAKE_LOCAL_ARG(name_string, type, member, default, variable, suffix, min_val, max_val) { \ .name=(name_string), \ .description=&(type), \ .default_val={.member=default}, \ .target=&(variable), \ .help_suffix=(suffix), \ .min={.member=min_val}, \ .max={.member=max_val}, \ } #define UINT32_ARG(name_string, variable, suffix) \ MAKE_ARG(name_string, type_uint32, u32, variable, suffix, 0, UINT32_MAX) #define UINT32_ARG_R(name_string, variable, suffix, min, max) \ MAKE_ARG(name_string, type_uint32, u32, variable, suffix, min, max) #define UINT64_ARG(name_string, variable, suffix) \ MAKE_ARG(name_string, type_uint64, u64, variable, suffix, 0, UINT64_MAX) #define INT32_ARG_NONNEG(name_string, variable, suffix) \ MAKE_ARG(name_string, type_int32, i32, variable, suffix, 0, INT32_MAX) #define INT32_ARG_R(name_string, variable, suffix, min, max) \ MAKE_ARG(name_string, type_int32, i32, variable, suffix, min, max) #define DOUBLE_ARG_R(name_string, variable, suffix, min, max) \ MAKE_ARG(name_string, type_double, d, variable, suffix, min, max) #define BOOL_ARG(name_string, variable) \ MAKE_ARG(name_string, type_bool, b, variable, "", false, false) #define STRING_ARG(name_string, variable) \ MAKE_ARG(name_string, type_string, s, variable, "", "", "") #define LOCAL_STRING_ARG(name_string, variable, default) \ MAKE_LOCAL_ARG(name_string, type_string, s, default, variable, "", "", "") const char *perf_format_s = NULL; struct arg_type arg_types[] = { INT32_ARG_NONNEG("--num_elements", num_elements, ""), INT32_ARG_NONNEG("--num_DBs", num_DBs, ""), INT32_ARG_NONNEG("--num_seconds", num_seconds, "s"), INT32_ARG_NONNEG("--join_timeout", join_timeout, "s"), INT32_ARG_NONNEG("--node_size", env_args.node_size, " bytes"), INT32_ARG_NONNEG("--basement_node_size", env_args.basement_node_size, " bytes"), INT32_ARG_NONNEG("--rollback_node_size", env_args.rollback_node_size, " bytes"), INT32_ARG_NONNEG("--checkpointing_period", env_args.checkpointing_period, "s"), INT32_ARG_NONNEG("--cleaner_period", env_args.cleaner_period, "s"), INT32_ARG_NONNEG("--cleaner_iterations", env_args.cleaner_iterations, ""), INT32_ARG_NONNEG("--update_broadcast_period", update_broadcast_period_ms, "ms"), INT32_ARG_NONNEG("--num_ptquery_threads", num_ptquery_threads, " threads"), INT32_ARG_NONNEG("--num_put_threads", num_put_threads, " threads"), INT32_ARG_NONNEG("--num_update_threads", num_update_threads, " threads"), UINT32_ARG("--txn_size", txn_size, " rows"), UINT32_ARG("--num_bucket_mutexes", env_args.num_bucket_mutexes, " mutexes"), INT32_ARG_R("--performance_period", performance_period, "s", 1, INT32_MAX), // TODO: John thinks the cachetable size should be in megabytes // and that lock memory should be in kilobytes. Is it worth the // inconsitency for a bit of convenience when running tests? UINT64_ARG("--cachetable_size", env_args.cachetable_size, " bytes"), UINT64_ARG("--lk_max_memory", env_args.lk_max_memory, " bytes"), DOUBLE_ARG_R("--compressibility", compressibility, "", 0.0, 1.0), //TODO: when outputting help.. skip min/max that is min/max of data range. UINT32_ARG_R("--key_size", key_size, " bytes", MIN_KEY_SIZE, UINT32_MAX), UINT32_ARG_R("--val_size", val_size, " bytes", MIN_VAL_SIZE, UINT32_MAX), BOOL_ARG("serial_insert", serial_insert), BOOL_ARG("interleave", interleave), BOOL_ARG("crash_on_operation_failure", crash_on_operation_failure), BOOL_ARG("single_txn", single_txn), BOOL_ARG("warm_cache", warm_cache), BOOL_ARG("print_performance", print_performance), BOOL_ARG("print_thread_performance", print_thread_performance), BOOL_ARG("print_iteration_performance", print_iteration_performance), BOOL_ARG("only_create", only_create), BOOL_ARG("only_stress", only_stress), BOOL_ARG("test", do_test_and_crash), BOOL_ARG("recover", do_recover), BOOL_ARG("blackhole", blackhole), BOOL_ARG("nolocktree", nolocktree), BOOL_ARG("unique_checks", unique_checks), BOOL_ARG("nosync", nosync), BOOL_ARG("nolog", nolog), BOOL_ARG("nocrashstatus", nocrashstatus), BOOL_ARG("prelock_updates", prelock_updates), BOOL_ARG("disperse_keys", disperse_keys), STRING_ARG("--envdir", env_args.envdir), LOCAL_STRING_ARG("--perf_format", perf_format_s, "human"), //TODO(add --quiet, -v, -h) }; #undef UINT32_ARG #undef UINT32_ARG_R #undef UINT64_ARG #undef DOUBLE_ARG_R #undef BOOL_ARG #undef STRING_ARG #undef MAKE_ARG int num_arg_types = sizeof(arg_types) / sizeof(arg_types[0]); int resultcode = 0; while (argc > 1) { if (!strcmp(argv[1], "-v") || !strcmp(argv[1], "--verbose")) { verbose++; argv++; argc--; } else if (!strcmp(argv[1], "-q") || !strcmp(argv[1], "--quiet")) { verbose = 0; argv++; argc--; } else if (!strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) { fprintf(stderr, "HELP INVOKED\n"); do_usage(argv0, num_arg_types, arg_types); exit(0); } else { bool found = false; for (int i = 0; i < num_arg_types; i++) { struct arg_type *type = &arg_types[i]; if (ARG_MATCHES(type, argv)) { int extra_args_consumed; resultcode = ARG_PARSE(type, &extra_args_consumed, argc, argv); if (resultcode) { fprintf(stderr, "ERROR PARSING [%s]\n", argv[1]); do_usage(argv0, num_arg_types, arg_types); exit(resultcode); } found = true; argv += extra_args_consumed + 1; argc -= extra_args_consumed + 1; break; } } if (!found) { fprintf(stderr, "COULD NOT PARSE [%s]\n", argv[1]); do_usage(argv0, num_arg_types, arg_types); exit(EINVAL); } } } if (perf_format_s != NULL) { if (!strcmp(perf_format_s, "human")) { args->perf_output_format = HUMAN; } else if (!strcmp(perf_format_s, "csv")) { args->perf_output_format = CSV; } else if (!strcmp(perf_format_s, "tsv")) { args->perf_output_format = TSV; #if 0 } else if (!strcmp(perf_format_s, "gnuplot")) { args->perf_output_format = GNUPLOT; #endif } else { fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", and \"tsv\"\n"); //fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", \"tsv\", and \"gnuplot\"\n"); do_usage(argv0, num_arg_types, arg_types); exit(EINVAL); } } if (args->only_create && args->only_stress) { fprintf(stderr, "used --only_stress and --only_create\n"); do_usage(argv0, num_arg_types, arg_types); exit(EINVAL); } } static void stress_table(DB_ENV *, DB **, struct cli_args *); static int UU() stress_int_dbt_cmp (DB *db, const DBT *a, const DBT *b) { assert(db && a && b); assert(a->size >= sizeof(int)); assert(b->size >= sizeof(int)); int x = *(int *) a->data; int y = *(int *) b->data; if (x<y) return -1; if (x>y) return 1; return 0; } static int UU() stress_uint64_dbt_cmp(DB *db, const DBT *a, const DBT *b) { assert(db && a && b); assert(a->size >= sizeof(uint64_t)); assert(b->size >= sizeof(uint64_t)); uint64_t x = *(uint64_t *) a->data; uint64_t y = *(uint64_t *) b->data; if (x < y) { return -1; } if (x > y) { return +1; } return 0; } static void do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args) { struct scan_op_extra soe; soe.fast = true; soe.fwd = true; soe.prefetch = true; struct arg scan_arg; arg_init(&scan_arg, dbs, env, args); scan_arg.operation_extra = &soe; scan_arg.operation = scan_op_no_check; scan_arg.lock_type = STRESS_LOCK_NONE; DB_TXN* txn = NULL; // don't take serializable read locks when scanning. int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r); // make sure the scan doesn't terminate early run_test = true; // warm up each DB in parallel scan_op_no_check_parallel(txn, &scan_arg, &soe, NULL); r = txn->commit(txn,0); CKERR(r); } static void UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, const DBT *, const DBT *)) { { char *loc = setlocale(LC_NUMERIC, "en_US.UTF-8"); assert(loc); } DB_ENV* env = NULL; DB* dbs[args->num_DBs]; memset(dbs, 0, sizeof(dbs)); db_env_enable_engine_status(args->nocrashstatus ? false : true); if (!args->only_stress) { create_tables( &env, dbs, args->num_DBs, bt_compare, args ); { int chk_r = fill_tables_with_zeroes(env, dbs, args->num_DBs, args->num_elements, args->key_size, args->val_size, args->disperse_keys); CKERR(chk_r); } { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); } } if (!args->only_create) { { int chk_r = open_tables(&env, dbs, args->num_DBs, bt_compare, args); CKERR(chk_r); } if (args->warm_cache) { do_warm_cache(env, dbs, args); } stress_table(env, dbs, args); { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); } } } static void UU() stress_test_main(struct cli_args *args) { stress_test_main_with_cmp(args, stress_int_dbt_cmp); } static void UU() stress_recover(struct cli_args *args) { DB_ENV* env = NULL; DB* dbs[args->num_DBs]; memset(dbs, 0, sizeof(dbs)); { int chk_r = open_tables(&env, dbs, args->num_DBs, stress_int_dbt_cmp, args); CKERR(chk_r); } DB_TXN* txn = NULL; struct arg recover_args; arg_init(&recover_args, dbs, env, args); int r = env->txn_begin(env, 0, &txn, recover_args.txn_type); CKERR(r); struct scan_op_extra soe; soe.fast = true; soe.fwd = true; soe.prefetch = false; // make sure the scan doesn't terminate early run_test = true; r = scan_op(txn, &recover_args, &soe, NULL); CKERR(r); { int chk_r = txn->commit(txn,0); CKERR(chk_r); } { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); } } #endif