Commit 1be3a1f4 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

refs #5801 merge 5801 (improve threaded stress test framework) to main: now...

refs #5801 merge 5801 (improve threaded stress test framework) to main: now the loader is used for sufficiently large tables, correctness tests fill tables with zeroes while performance tests fill them with bytes based on compressibility, and both use similar code paths for key/val generation for better consistency and readability


git-svn-id: file:///svn/toku/tokudb@51638 c7de825b-a66e-492c-adef-691d508d4ae1
parent d28f7b1f
...@@ -42,6 +42,6 @@ int ...@@ -42,6 +42,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -42,17 +42,17 @@ static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void ...@@ -42,17 +42,17 @@ static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
} }
int r = 0; int r = 0;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size]; uint8_t valbuf[arg->cli->val_size];
ZERO_ARRAY(valbuf); dbt_init(&mult_key_dbt[0], keybuf, sizeof keybuf);
dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf);
uint64_t puts_to_increment = 0; uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) { for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
fill_zeroed_array(valbuf, arg->cli->val_size,
arg->random_data, arg->cli->compressibility);
struct iibench_op_extra *CAST_FROM_VOIDP(info, operation_extra); struct iibench_op_extra *CAST_FROM_VOIDP(info, operation_extra);
uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1); uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1);
dbt_init(&mult_key_dbt[0], &pk, sizeof pk); fill_key_buf(pk, keybuf, arg->cli);
dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf); fill_val_buf_random(arg->random_data, valbuf, arg->cli);
r = env->put_multiple( r = env->put_multiple(
env, env,
dbs[0], // source db. dbs[0], // source db.
...@@ -128,6 +128,6 @@ test_main(int argc, char *const argv[]) { ...@@ -128,6 +128,6 @@ test_main(int argc, char *const argv[]) {
args.crash_on_operation_failure = false; args.crash_on_operation_failure = false;
} }
args.env_args.generate_put_callback = iibench_generate_row_for_put; args.env_args.generate_put_callback = iibench_generate_row_for_put;
stress_test_main_with_cmp(&args, stress_uint64_dbt_cmp); perf_test_main(&args);
return 0; return 0;
} }
...@@ -45,12 +45,14 @@ int ...@@ -45,12 +45,14 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
args.num_elements = 0; // want to start with empty DBs args.num_elements = 0; // want to start with empty DBs
args.key_size = 8;
args.val_size = 8;
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
// when there are multiple threads, its valid for two of them to // when there are multiple threads, its valid for two of them to
// generate the same key and one of them fail with DB_LOCK_NOTGRANTED // generate the same key and one of them fail with DB_LOCK_NOTGRANTED
if (args.num_put_threads > 1) { if (args.num_put_threads > 1) {
args.crash_on_operation_failure = false; args.crash_on_operation_failure = false;
} }
stress_test_main_with_cmp(&args, stress_uint64_dbt_cmp); perf_test_main(&args);
return 0; return 0;
} }
...@@ -20,6 +20,13 @@ ...@@ -20,6 +20,13 @@
// The intent of this test is to measure the throughput of malloc and free // The intent of this test is to measure the throughput of malloc and free
// with multiple threads. // with multiple threads.
static int xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
size_t s = 256;
void *p = toku_xmalloc(s);
toku_free(p);
return 0;
}
static void static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
...@@ -27,7 +34,7 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { ...@@ -27,7 +34,7 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args); arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = malloc_free_op; myargs[i].operation = xmalloc_free_op;
} }
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args); run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
} }
...@@ -36,6 +43,6 @@ int ...@@ -36,6 +43,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
// The intent of this test is to measure the throughput of the test infrastructure executing a nop // The intent of this test is to measure the throughput of the test infrastructure executing a nop
// on multiple threads. // on multiple threads.
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
return 0;
}
static void static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
...@@ -34,6 +38,6 @@ int ...@@ -34,6 +38,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -62,6 +62,6 @@ int ...@@ -62,6 +62,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -75,6 +75,6 @@ int ...@@ -75,6 +75,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -78,6 +78,6 @@ test_main(int argc, char *const argv[]) { ...@@ -78,6 +78,6 @@ test_main(int argc, char *const argv[]) {
args.num_update_threads = 1; args.num_update_threads = 1;
args.crash_on_operation_failure = false; args.crash_on_operation_failure = false;
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -71,6 +71,6 @@ test_main(int argc, char *const argv[]) { ...@@ -71,6 +71,6 @@ test_main(int argc, char *const argv[]) {
// this test is all about transactions, make the DB small // this test is all about transactions, make the DB small
args.num_elements = 1; args.num_elements = 1;
args.num_DBs= 1; args.num_DBs= 1;
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
// The intent of this test is to measure the throughput of toku_malloc and toku_free
// with multiple threads.
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = xmalloc_free_op;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
stress_test_main(&args);
return 0;
}
...@@ -22,6 +22,14 @@ ...@@ -22,6 +22,14 @@
// This test is targetted at stressing the locktree, hence the small table and many update threads. // This test is targetted at stressing the locktree, hence the small table and many update threads.
// //
static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_extra, void *UU(stats_extra)) {
invariant_null(operation_extra);
if (!arg->cli->nolocktree) {
toku_env_run_lock_escalation_for_test(arg->env);
}
return 0;
}
static void static void
stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
...@@ -40,13 +48,8 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -40,13 +48,8 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[0].operation_extra = &soe[0]; myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the lock escalation thread. myargs[1].sleep_ms = 15L * 1000;
// it should sleep somewhere between 10 and 20 myargs[1].operation_extra = nullptr;
// seconds between each escalation.
struct lock_escalation_op_extra eoe;
eoe.min_sleep_time_micros = 10UL * (1000 * 1000);
eoe.max_sleep_time_micros = 20UL * (1000 * 1000);
myargs[1].operation_extra = &eoe;
myargs[1].operation = lock_escalation_op; myargs[1].operation = lock_escalation_op;
// make the threads that update the db // make the threads that update the db
......
...@@ -69,6 +69,13 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void ...@@ -69,6 +69,13 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void
DBT dest_vals[2]; DBT dest_vals[2];
memset(dest_keys, 0, sizeof(dest_keys)); memset(dest_keys, 0, sizeof(dest_keys));
memset(dest_vals, 0, sizeof(dest_vals)); memset(dest_vals, 0, sizeof(dest_vals));
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size];
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf);
int i; int i;
r = env->txn_begin(env, NULL, &hi_txn, 0); r = env->txn_begin(env, NULL, &hi_txn, 0);
CKERR(r); CKERR(r);
...@@ -78,12 +85,13 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void ...@@ -78,12 +85,13 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void
dbs[0] = db; dbs[0] = db;
dbs[1] = hot_db; dbs[1] = hot_db;
int num_dbs = hot_db ? 2 : 1; int num_dbs = hot_db ? 2 : 1;
// do a random insertion // do a random insertion. the assertion comes from the fact
int rand_key = random() % arg->cli->num_elements; // that the code used to generate a random key and mod it
int rand_val = random(); // by the table size manually. fill_key_buf_random will
DBT key, val; // do this iff arg->bounded_element_range is true.
dbt_init(&key, &rand_key, sizeof(rand_key)), invariant(arg->bounded_element_range);
dbt_init(&val, &rand_val, sizeof(rand_val)), fill_key_buf_random<int>(arg->random_data, keybuf, arg);
fill_val_buf_random(arg->random_data, valbuf, arg->cli);
r = env->put_multiple( r = env->put_multiple(
env, env,
db, db,
......
...@@ -4,6 +4,19 @@ ...@@ -4,6 +4,19 @@
#ident "Copyright (c) 2009 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2009 Tokutek Inc. All rights reserved."
#ident "$Id$" #ident "$Id$"
// The Way Things Work:
//
// Threaded stress tests have the following properties:
// - One or more DBs
// - One or more threads performing some number of operations per txn.
// - Correctness tests use signed 4 byte keys and signed 4 byte values. They expect
// a table with all zeroes before running.
// - Performance tests should use 8 byte keys and 8+ byte values, where the values
// are some mixture of random uncompressible garbage and zeroes, depending how
// compressible we want the data. These tests want the table to be populated
// with keys in the range [0, table_size - 1] unless disperse_keys is true,
// then the keys are scrambled up in the integer key space.
#ifndef _THREADED_STRESS_TEST_HELPERS_H_ #ifndef _THREADED_STRESS_TEST_HELPERS_H_
#define _THREADED_STRESS_TEST_HELPERS_H_ #define _THREADED_STRESS_TEST_HELPERS_H_
...@@ -11,35 +24,31 @@ ...@@ -11,35 +24,31 @@
#include "test.h" #include "test.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include <locale.h>
#include <unistd.h>
#include <sys/stat.h>
#include <db.h>
#if defined(HAVE_MALLOC_H)
# include <malloc.h>
#elif defined(HAVE_SYS_MALLOC_H)
# include <sys/malloc.h>
#endif
#include <math.h> #include <math.h>
#include <locale.h>
#include <db.h>
#include <memory.h> #include <memory.h>
#include <toku_race_tools.h> #include <toku_race_tools.h>
#include <portability/toku_atomic.h> #include <portability/toku_atomic.h>
#include <portability/toku_pthread.h> #include <portability/toku_pthread.h>
#include <portability/toku_random.h> #include <portability/toku_random.h>
#include <portability/toku_time.h> #include <portability/toku_time.h>
#include <util/rwlock.h>
#include <util/kibbutz.h>
#include <src/ydb-internal.h> #include <src/ydb-internal.h>
#include <ft/ybt.h> #include <ft/ybt.h>
using namespace toku; #include <util/rwlock.h>
#include <util/kibbutz.h>
// TODO: Move me to portability/memory.cc and toku_include/memory.h
#if defined(HAVE_MALLOC_H)
# include <malloc.h>
#elif defined(HAVE_SYS_MALLOC_H)
# include <sys/malloc.h>
#endif
#if !defined(HAVE_MEMALIGN) #if !defined(HAVE_MEMALIGN)
# if defined(HAVE_VALLOC) # if defined(HAVE_VALLOC)
static void * static void *
...@@ -52,6 +61,9 @@ memalign(size_t UU(alignment), size_t size) ...@@ -52,6 +61,9 @@ memalign(size_t UU(alignment), size_t size)
# endif # endif
#endif #endif
static const size_t min_val_size = sizeof(int32_t);
static const size_t min_key_size = sizeof(int32_t);
volatile bool run_test; // should be volatile since we are communicating through this variable. volatile bool run_test; // should be volatile since we are communicating through this variable.
typedef struct arg *ARG; typedef struct arg *ARG;
...@@ -87,9 +99,6 @@ enum perf_output_format { ...@@ -87,9 +99,6 @@ enum perf_output_format {
HUMAN = 0, HUMAN = 0,
CSV, CSV,
TSV, TSV,
#if 0
GNUPLOT,
#endif
NUM_OUTPUT_FORMATS NUM_OUTPUT_FORMATS
}; };
...@@ -153,8 +162,6 @@ struct arg { ...@@ -153,8 +162,6 @@ struct arg {
bool prelock_updates; bool prelock_updates;
}; };
DB_TXN * const null_txn = 0;
static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) { static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
arg->cli = cli_args; arg->cli = cli_args;
arg->dbp = dbp; arg->dbp = dbp;
...@@ -163,7 +170,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl ...@@ -163,7 +170,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl
arg->sleep_ms = 0; arg->sleep_ms = 0;
arg->lock_type = STRESS_LOCK_NONE; arg->lock_type = STRESS_LOCK_NONE;
arg->txn_type = DB_TXN_SNAPSHOT; arg->txn_type = DB_TXN_SNAPSHOT;
arg->operation_extra = NULL; arg->operation_extra = nullptr;
arg->do_prepare = false; arg->do_prepare = false;
arg->prelock_updates = false; arg->prelock_updates = false;
} }
...@@ -174,12 +181,14 @@ enum operation_type { ...@@ -174,12 +181,14 @@ enum operation_type {
PTQUERIES, PTQUERIES,
NUM_OPERATION_TYPES NUM_OPERATION_TYPES
}; };
const char *operation_names[] = { const char *operation_names[] = {
"ops", "ops",
"puts", "puts",
"ptqueries", "ptqueries",
NULL nullptr
}; };
static void increment_counter(void *extra, enum operation_type type, uint64_t inc) { static void increment_counter(void *extra, enum operation_type type, uint64_t inc) {
invariant(type != OPERATION); invariant(type != OPERATION);
int t = (int) type; int t = (int) type;
...@@ -399,45 +408,6 @@ tsv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], con ...@@ -399,45 +408,6 @@ tsv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], con
printf("\n"); printf("\n");
} }
#if 0
static void
gnuplot_print_perf_header(const struct cli_args *cli_args, const int num_threads)
{
printf("set terminal postscript solid color\n");
printf("set output \"foo.eps\"\n");
printf("set xlabel \"seconds\"\n");
printf("set xrange [0:*]\n");
printf("set ylabel \"X/s\"\n");
printf("plot ");
if (cli_args->print_thread_performance) {
for (int t = 1; t <= num_threads; ++t) {
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
const int col = (2 * ((t - 1) * (int) NUM_OPERATION_TYPES + op)) + 2;
//printf("'-' u 1:%d w lines t \"Thread %d %s\", ", col, t, operation_names[op]);
printf("'-' u 1:%d w lines t \"Thread %d %s/s\", ", col + 1, t, operation_names[op]);
}
}
}
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
const int col = (2 * (num_threads * (int) NUM_OPERATION_TYPES + op)) + 2;
//printf("'-' u 1:%d w lines t \"Total %s\", ", col);
printf("'-' u 1:%d w lines t \"Total %s/s\"%s", col + 1, operation_names[op], op == ((int) NUM_OPERATION_TYPES - 1) ? "\n" : ", ");
}
}
static void
gnuplot_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads)
{
tsv_print_perf_iteration(cli_args, current_time, last_counters, counters, num_threads);
}
static void
gnuplot_print_perf_totals(const struct cli_args *UU(cli_args), uint64_t *UU(counters[]), const int UU(num_threads))
{
printf("e\n");
}
#endif
const struct perf_formatter perf_formatters[] = { const struct perf_formatter perf_formatters[] = {
[HUMAN] = { [HUMAN] = {
.header = human_print_perf_header, .header = human_print_perf_header,
...@@ -454,13 +424,6 @@ const struct perf_formatter perf_formatters[] = { ...@@ -454,13 +424,6 @@ const struct perf_formatter perf_formatters[] = {
.iteration = tsv_print_perf_iteration, .iteration = tsv_print_perf_iteration,
.totals = tsv_print_perf_totals .totals = tsv_print_perf_totals
}, },
#if 0
[GNUPLOT] = {
.header = gnuplot_print_perf_header,
.iteration = gnuplot_print_perf_iteration,
.totals = gnuplot_print_perf_totals
}
#endif
}; };
static int get_env_open_flags(struct cli_args *args) { static int get_env_open_flags(struct cli_args *args) {
...@@ -499,7 +462,7 @@ static void lock_worker_op(struct worker_extra* we) { ...@@ -499,7 +462,7 @@ static void lock_worker_op(struct worker_extra* we) {
} else if (arg->lock_type == STRESS_LOCK_EXCL) { } else if (arg->lock_type == STRESS_LOCK_EXCL) {
rwlock_write_lock(we->operation_lock, we->operation_lock_mutex); rwlock_write_lock(we->operation_lock, we->operation_lock_mutex);
} else { } else {
assert(false); abort();
} }
toku_mutex_unlock(we->operation_lock_mutex); toku_mutex_unlock(we->operation_lock_mutex);
} }
...@@ -514,7 +477,7 @@ static void unlock_worker_op(struct worker_extra* we) { ...@@ -514,7 +477,7 @@ static void unlock_worker_op(struct worker_extra* we) {
} else if (arg->lock_type == STRESS_LOCK_EXCL) { } else if (arg->lock_type == STRESS_LOCK_EXCL) {
rwlock_write_unlock(we->operation_lock); rwlock_write_unlock(we->operation_lock);
} else { } else {
assert(false); abort();
} }
toku_mutex_unlock(we->operation_lock_mutex); toku_mutex_unlock(we->operation_lock_mutex);
} }
...@@ -531,7 +494,7 @@ static void *worker(void *arg_v) { ...@@ -531,7 +494,7 @@ static void *worker(void *arg_v) {
assert_zero(r); assert_zero(r);
arg->random_data = &random_data; arg->random_data = &random_data;
DB_ENV *env = arg->env; DB_ENV *env = arg->env;
DB_TXN *txn = NULL; DB_TXN *txn = nullptr;
if (verbose) { if (verbose) {
toku_pthread_t self = toku_pthread_self(); toku_pthread_t self = toku_pthread_self();
uintptr_t intself = (uintptr_t) self; uintptr_t intself = (uintptr_t) self;
...@@ -588,11 +551,10 @@ static void *worker(void *arg_v) { ...@@ -588,11 +551,10 @@ static void *worker(void *arg_v) {
return arg; return arg;
} }
typedef struct scan_cb_extra *SCAN_CB_EXTRA;
struct scan_cb_extra { struct scan_cb_extra {
bool fast; bool fast;
int64_t curr_sum; int curr_sum;
int64_t num_elements; int num_elements;
}; };
struct scan_op_extra { struct scan_op_extra {
...@@ -602,13 +564,13 @@ struct scan_op_extra { ...@@ -602,13 +564,13 @@ struct scan_op_extra {
}; };
static int static int
scan_cb(const DBT *a, const DBT *b, void *arg_v) { scan_cb(const DBT *key, const DBT *val, void *arg_v) {
SCAN_CB_EXTRA CAST_FROM_VOIDP(cb_extra, arg_v); struct scan_cb_extra *CAST_FROM_VOIDP(cb_extra, arg_v);
assert(a); assert(key);
assert(b); assert(val);
assert(cb_extra); assert(cb_extra);
assert(b->size >= sizeof(int)); assert(val->size >= sizeof(int));
cb_extra->curr_sum += *(int *)b->data; cb_extra->curr_sum += *(int *) val->data;
cb_extra->num_elements++; cb_extra->num_elements++;
return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0; return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0;
} }
...@@ -621,12 +583,13 @@ static int scan_op_and_maybe_check_sum( ...@@ -621,12 +583,13 @@ static int scan_op_and_maybe_check_sum(
) )
{ {
int r = 0; int r = 0;
DBC* cursor = NULL; DBC* cursor = nullptr;
struct scan_cb_extra e; struct scan_cb_extra e = {
e.fast = sce->fast; e.fast = sce->fast,
e.curr_sum = 0; e.curr_sum = 0,
e.num_elements = 0; e.num_elements = 0,
};
{ int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); } { int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); }
if (sce->prefetch) { if (sce->prefetch) {
...@@ -653,8 +616,8 @@ static int scan_op_and_maybe_check_sum( ...@@ -653,8 +616,8 @@ static int scan_op_and_maybe_check_sum(
r = 0; r = 0;
} }
if (check_sum && e.curr_sum) { if (check_sum && e.curr_sum) {
printf("e.curr_sum: %" PRId64 " e.num_elements: %" PRId64 " \n", e.curr_sum, e.num_elements); printf("e.curr_sum: %" PRId32 " e.num_elements: %" PRId32 " \n", e.curr_sum, e.num_elements);
assert(false); abort();
} }
return r; return r;
} }
...@@ -677,80 +640,121 @@ static int generate_row_for_put( ...@@ -677,80 +640,121 @@ static int generate_row_for_put(
return 0; return 0;
} }
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { template <typename integer_t>
return 0; static integer_t breverse(integer_t v)
// Effect: return the bits in i, reversed
// Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
// Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct.
{
integer_t r = v; // r will be reversed bits of v; first get LSB of v
int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end
for (v >>= 1; v; v >>= 1) {
r <<= 1;
r |= v & 1;
s--;
}
r <<= s; // shift when v's highest bits are zero
return r;
} }
static int UU() xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { template <typename integer_t>
size_t s = 256; static void
void *p = toku_xmalloc(s); fill_key_buf(integer_t key, uint8_t *data, struct cli_args *args) {
toku_free(p); // Effect: Fill data with a little-endian integer with the given integer_t type
return 0; // If the data buf is bigger than the integer's size, pad with zeroes.
// Requires: *data is at least sizeof(integer_t)
// Note: If you want to store 4 bytes, pass a 4 byte type. 8 bytes? 8 byte type.
// to store an 8-byte integer valued 5:
// int k = 5; fill_key_buf(k, ...) // WRONG
// int64_t k = 5; fill_key_buf(k, ...) // RIGHT
invariant(sizeof(integer_t) >= min_key_size);
invariant(sizeof(integer_t) <= args->key_size);
if (args->key_size != 4) {
invariant(args->key_size >= 8);
invariant(sizeof(key) == 8);
}
integer_t *k = reinterpret_cast<integer_t *>(data);
if (args->disperse_keys) {
*k = static_cast<integer_t>(breverse(key));
} else {
*k = key;
}
if (args->key_size > sizeof(integer_t)) {
memset(data + sizeof(integer_t), 0, args->key_size - sizeof(integer_t));
}
} }
#if DONT_DEPRECATE_MALLOC template <typename integer_t>
static int UU() malloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { static void
size_t s = 256; fill_key_buf_random(struct random_data *random_data, uint8_t *data, ARG arg) {
void *p = malloc(s); // Effect: Fill data with a random little-endian integer with the given integer_t type,
free(p); // possibly bounded by the size of the table, possibly padded with zeroes.
return 0; // Requires, Notes: see fill_key_buf()
invariant(sizeof(integer_t) <= arg->cli->key_size);
integer_t key = static_cast<integer_t>(myrandom_r(random_data));
if (arg->bounded_element_range && arg->cli->num_elements > 0) {
key = key % arg->cli->num_elements;
}
fill_key_buf(key, data, arg->cli);
}
template <typename integer_t>
static void
fill_val_buf(integer_t val, uint8_t *data, uint32_t val_size) {
// Effect, Requires, Notes: see fill_key_buf().
invariant(sizeof(integer_t) <= val_size);
integer_t *v = reinterpret_cast<integer_t *>(data);
*v = val;
if (val_size > sizeof(integer_t)) {
memset(data + sizeof(integer_t), 0, val_size - sizeof(integer_t));
}
} }
#endif
// Fill array with compressibility*size 0s. // Fill array with compressibility*size 0s.
// 0.0<=compressibility<=1.0 // 0.0<=compressibility<=1.0
// Compressibility is the fraction of size that will be 0s (e.g. approximate fraction that will be compressed away). // Compressibility is the fraction of size that will be 0s (e.g. approximate fraction that will be compressed away).
// The rest will be random data. // The rest will be random data.
static void static void
fill_zeroed_array(uint8_t *data, uint32_t size, struct random_data *random_data, double compressibility) { fill_val_buf_random(struct random_data *random_data, uint8_t *data, struct cli_args *args) {
invariant(args->val_size >= min_val_size);
//Requires: The array was zeroed since the last time 'size' was changed. //Requires: The array was zeroed since the last time 'size' was changed.
//Requires: compressibility is in range [0,1] indicating fraction that should be zeros. //Requires: compressibility is in range [0,1] indicating fraction that should be zeros.
uint32_t num_random_bytes = (1 - compressibility) * size; // Fill in the random bytes
uint32_t num_random_bytes = (1 - args->compressibility) * args->val_size;
if (num_random_bytes > 0) { if (num_random_bytes > 0) {
uint32_t filled; uint32_t filled;
for (filled = 0; filled + sizeof(uint64_t) <= num_random_bytes; filled += sizeof(uint64_t)) { for (filled = 0; filled + sizeof(uint64_t) <= num_random_bytes; filled += sizeof(uint64_t)) {
*((uint64_t *) &data[filled]) = randu64(random_data); *((uint64_t *) &data[filled]) = myrandom_r(random_data);
} }
if (filled != num_random_bytes) { if (filled != num_random_bytes) {
uint64_t last8 = randu64(random_data); uint64_t last8 = myrandom_r(random_data);
memcpy(&data[filled], &last8, num_random_bytes - filled); memcpy(&data[filled], &last8, num_random_bytes - filled);
} }
} }
}
static inline size_t // Fill in the zero bytes
size_t_max(size_t a, size_t b) { if (num_random_bytes < args->val_size) {
return (a > b) ? a : b; memset(data + num_random_bytes, 0, args->val_size - num_random_bytes);
}
} }
static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, void *stats_extra) { static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, void *stats_extra) {
int r = 0; int r = 0;
uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))]; uint8_t keybuf[arg->cli->key_size];
uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b;
uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b;
ZERO_ARRAY(rand_key_b);
uint8_t valbuf[arg->cli->val_size]; uint8_t valbuf[arg->cli->val_size];
ZERO_ARRAY(valbuf);
uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
rand_key_key[0] = randu64(arg->random_data);
if (arg->cli->interleave) {
rand_key_i[3] = arg->thread_idx;
} else {
rand_key_i[0] = arg->thread_idx;
}
if (arg->cli->num_elements > 0 && arg->bounded_element_range) {
rand_key_key[0] = rand_key_key[0] % arg->cli->num_elements;
}
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
DBT key, val; DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b); dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf); dbt_init(&val, valbuf, sizeof valbuf);
int flags = get_put_flags(arg->cli); const int put_flags = get_put_flags(arg->cli);
r = db->put(db, txn, &key, &val, flags);
uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
fill_key_buf_random<uint64_t>(arg->random_data, keybuf, arg);
fill_val_buf_random(arg->random_data, valbuf, arg->cli);
r = db->put(db, txn, &key, &val, put_flags);
if (!ignore_errors && r != 0) { if (!ignore_errors && r != 0) {
goto cleanup; goto cleanup;
} }
...@@ -760,6 +764,7 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, vo ...@@ -760,6 +764,7 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, vo
puts_to_increment = 0; puts_to_increment = 0;
} }
} }
cleanup: cleanup:
increment_counter(stats_extra, PUTS, puts_to_increment); increment_counter(stats_extra, PUTS, puts_to_increment);
return r; return r;
...@@ -788,22 +793,25 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void ...@@ -788,22 +793,25 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int r = 0; int r = 0;
uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))]; uint8_t keybuf[arg->cli->key_size];
uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b;
uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b;
ZERO_ARRAY(rand_key_b);
uint8_t valbuf[arg->cli->val_size]; uint8_t valbuf[arg->cli->val_size];
ZERO_ARRAY(valbuf);
uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
rand_key_key[0] = extra->current++;
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
DBT key, val; DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b); dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf); dbt_init(&val, valbuf, sizeof valbuf);
int flags = get_put_flags(arg->cli); const int put_flags = get_put_flags(arg->cli);
r = db->put(db, txn, &key, &val, flags);
uint64_t puts_to_increment = 0;
for (uint64_t i = 0; i < arg->cli->txn_size; ++i) {
// TODO: Change perf_insert to pass a single serial_put_op_extra
// to each insertion thread so they share the current key,
// and use a sync fetch an add here. This way you can measure
// the true performance of multiple threads appending unique
// keys to the end of a tree.
uint64_t k = extra->current++;
fill_key_buf(k, keybuf, arg->cli);
fill_val_buf_random(arg->random_data, valbuf, arg->cli);
r = db->put(db, txn, &key, &val, put_flags);
if (r != 0) { if (r != 0) {
goto cleanup; goto cleanup;
} }
...@@ -813,6 +821,7 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void ...@@ -813,6 +821,7 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
puts_to_increment = 0; puts_to_increment = 0;
} }
} }
cleanup: cleanup:
increment_counter(stats_extra, PUTS, puts_to_increment); increment_counter(stats_extra, PUTS, puts_to_increment);
return r; return r;
...@@ -827,42 +836,44 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v ...@@ -827,42 +836,44 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v
uint32_t dbt_flags = 0; uint32_t dbt_flags = 0;
r = db_create(&db_load, env, 0); r = db_create(&db_load, env, 0);
assert(r == 0); assert(r == 0);
r = db_load->open(db_load, txn, "loader-db", NULL, DB_BTREE, DB_CREATE, 0666); r = db_load->open(db_load, txn, "loader-db", nullptr, DB_BTREE, DB_CREATE, 0666);
assert(r == 0); assert(r == 0);
DB_LOADER *loader; DB_LOADER *loader;
uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES; uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES;
r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags); r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags);
CKERR(r); CKERR(r);
for (int i = 0; i < 1000; i++) {
DBT key, val; DBT key, val;
int rand_key = i; uint8_t keybuf[arg->cli->key_size];
int rand_val = myrandom_r(arg->random_data); uint8_t valbuf[arg->cli->val_size];
dbt_init(&key, &rand_key, sizeof(rand_key)); dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &rand_val, sizeof(rand_val)); dbt_init(&val, valbuf, sizeof valbuf);
for (int i = 0; i < 1000; i++) {
fill_key_buf(i, keybuf, arg->cli);
fill_val_buf_random(arg->random_data, valbuf, arg->cli);
r = loader->put(loader, &key, &val); CKERR(r); r = loader->put(loader, &key, &val); CKERR(r);
} }
r = loader->close(loader); CKERR(r); r = loader->close(loader); CKERR(r);
r = db_load->close(db_load, 0); CKERR(r); r = db_load->close(db_load, 0); CKERR(r);
r = env->dbremove(env, txn, "loader-db", NULL, 0); CKERR(r); r = env->dbremove(env, txn, "loader-db", nullptr, 0); CKERR(r);
} }
return 0; return 0;
} }
static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r; // Pick a random DB, do a keyrange operation.
// callback is designed to run on tests with one DB
// no particular reason why, just the way it was
// originally done
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) { int r = 0;
rand_key = rand_key % arg->cli->num_elements; uint8_t keybuf[arg->cli->key_size];
}
DBT key; DBT key;
dbt_init(&key, &rand_key, sizeof rand_key); dbt_init(&key, keybuf, sizeof keybuf);
fill_key_buf_random<int>(arg->random_data, keybuf, arg);
uint64_t less,equal,greater; uint64_t less,equal,greater;
int is_exact; int is_exact;
r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact); r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact);
...@@ -890,27 +901,6 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra ...@@ -890,27 +901,6 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra
return r; return r;
} }
struct lock_escalation_op_extra {
// sleep somewhere between these times before running escalation.
// this will add some chaos into the mix.
uint64_t min_sleep_time_micros;
uint64_t max_sleep_time_micros;
};
static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_extra, void *UU(stats_extra)) {
struct lock_escalation_op_extra *CAST_FROM_VOIDP(extra, operation_extra);
if (extra->max_sleep_time_micros > 0) {
invariant(extra->max_sleep_time_micros >= extra->min_sleep_time_micros);
uint64_t extra_sleep_time = (extra->max_sleep_time_micros - extra->min_sleep_time_micros) + 1;
uint64_t sleep_time = extra->min_sleep_time_micros + (myrandom_r(arg->random_data) % extra_sleep_time);
usleep(sleep_time);
}
if (!arg->cli->nolocktree) {
toku_env_run_lock_escalation_for_test(arg->env);
}
return 0;
}
static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) { static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) {
struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra); struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
...@@ -950,7 +940,8 @@ static void scan_op_worker(void *arg) { ...@@ -950,7 +940,8 @@ static void scan_op_worker(void *arg) {
static int UU() scan_op_no_check_parallel(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) { static int UU() scan_op_no_check_parallel(DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
const int num_cores = toku_os_get_number_processors(); const int num_cores = toku_os_get_number_processors();
KIBBUTZ kibbutz = toku_kibbutz_create(num_cores); const int num_workers = arg->cli->num_DBs < num_cores ? arg->cli->num_DBs : num_cores;
KIBBUTZ kibbutz = toku_kibbutz_create(num_workers);
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
struct scan_op_worker_info *XCALLOC(info); struct scan_op_worker_info *XCALLOC(info);
info->db = arg->dbp[i]; info->db = arg->dbp[i];
...@@ -967,23 +958,24 @@ static int dbt_do_nothing (DBT const *UU(key), DBT const *UU(row), void *UU(con ...@@ -967,23 +958,24 @@ static int dbt_do_nothing (DBT const *UU(key), DBT const *UU(row), void *UU(con
} }
static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) { static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) {
int r; int r = 0;
int rand_key = myrandom_r(arg->random_data); uint8_t keybuf[arg->cli->key_size];
if (arg->bounded_element_range) {
rand_key = rand_key % arg->cli->num_elements;
}
DBT key, val; DBT key, val;
dbt_init(&key, &rand_key, sizeof rand_key); dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, NULL, 0); dbt_init(&val, nullptr, 0);
fill_key_buf_random<int>(arg->random_data, keybuf, arg);
r = db->getf_set( r = db->getf_set(
db, db,
txn, txn,
0, 0,
&key, &key,
dbt_do_nothing, dbt_do_nothing,
NULL nullptr
); );
if (check) assert(r != DB_NOTFOUND); if (check) {
assert(r != DB_NOTFOUND);
}
r = 0; r = 0;
return r; return r;
} }
...@@ -1011,7 +1003,7 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext ...@@ -1011,7 +1003,7 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext
static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0; int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
DBC* cursor = NULL; DBC* cursor = nullptr;
int r = db->cursor(db, txn, &cursor, 0); assert(r == 0); int r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
r = cursor->c_close(cursor); assert(r == 0); r = cursor->c_close(cursor); assert(r == 0);
return 0; return 0;
...@@ -1062,7 +1054,7 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key), ...@@ -1062,7 +1054,7 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key),
{ {
int old_int_val = 0; int old_int_val = 0;
if (old_val) { if (old_val) {
old_int_val = *(int*)old_val->data; old_int_val = *(int *) old_val->data;
} }
assert(extra->size == sizeof(struct update_op_extra)); assert(extra->size == sizeof(struct update_op_extra));
struct update_op_extra *CAST_FROM_VOIDP(e, extra->data); struct update_op_extra *CAST_FROM_VOIDP(e, extra->data);
...@@ -1080,56 +1072,62 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key), ...@@ -1080,56 +1072,62 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key),
new_int_val = e->u.h.new_val; new_int_val = e->u.h.new_val;
break; break;
default: default:
assert(false); abort();
} }
uint32_t val_size = sizeof(int) + e->pad_bytes;
uint8_t valbuf[val_size];
fill_val_buf(new_int_val, valbuf, val_size);
DBT new_val; DBT new_val;
uint32_t data_size = sizeof(int) + e->pad_bytes; dbt_init(&new_val, valbuf, val_size);
char* data [data_size]; set_val(&new_val, set_extra);
ZERO_ARRAY(data);
memcpy(data, &new_int_val, sizeof(new_int_val));
set_val(dbt_init(&new_val, data, data_size), set_extra);
return 0; return 0;
} }
static int UU()update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int curr_val_sum = 0;
int r = 0;
DBT key, val; DBT key, val;
int rand_key; uint8_t keybuf[arg->cli->key_size];
int rand_key2;
toku_sync_fetch_and_add(&update_count, 1); toku_sync_fetch_and_add(&update_count, 1);
struct update_op_extra extra; struct update_op_extra extra;
ZERO_STRUCT(extra); ZERO_STRUCT(extra);
extra.type = UPDATE_ADD_DIFF; extra.type = UPDATE_ADD_DIFF;
extra.pad_bytes = 0; extra.pad_bytes = 0;
int curr_val_sum = 0;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
for (uint32_t i = 0; i < arg->cli->txn_size; i++) { for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
rand_key = myrandom_r(arg->random_data); fill_key_buf_random<int>(arg->random_data, keybuf, arg);
if (arg->bounded_element_range) {
rand_key = rand_key % (arg->cli->num_elements/2);
}
rand_key2 = arg->cli->num_elements - rand_key;
assert(rand_key != rand_key2);
extra.u.d.diff = 1; extra.u.d.diff = 1;
curr_val_sum += extra.u.d.diff; curr_val_sum += extra.u.d.diff;
r = db->update( r = db->update(
db, db,
txn, txn,
dbt_init(&key, &rand_key, sizeof rand_key), &key,
dbt_init(&val, &extra, sizeof extra), &val,
0 0
); );
if (r != 0) { if (r != 0) {
return r; return r;
} }
int *rkp = (int *) keybuf;
int rand_key = *rkp;
invariant(rand_key != (arg->cli->num_elements - rand_key));
rand_key -= arg->cli->num_elements;
fill_key_buf(rand_key, keybuf, arg->cli);
extra.u.d.diff = -1; extra.u.d.diff = -1;
r = db->update( r = db->update(
db, db,
txn, txn,
dbt_init(&key, &rand_key2, sizeof rand_key), &key,
dbt_init(&val, &extra, sizeof extra), &val,
0 0
); );
if (r != 0) { if (r != 0) {
...@@ -1156,10 +1154,6 @@ static int pre_acquire_write_lock(DB *db, DB_TXN *txn, ...@@ -1156,10 +1154,6 @@ static int pre_acquire_write_lock(DB *db, DB_TXN *txn,
// take the given db and do an update on it // take the given db and do an update on it
static int static int
UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) { UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
int r = 0;
int curr_val_sum = 0;
DBT key, val;
int update_key;
uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1); uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1);
struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra); struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
struct update_op_extra extra; struct update_op_extra extra;
...@@ -1171,7 +1165,14 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1171,7 +1165,14 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
extra.pad_bytes = 100; extra.pad_bytes = 100;
} }
} }
int r = 0;
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
int update_key;
int curr_val_sum = 0;
const int update_flags = arg->cli->prelock_updates ? DB_PRELOCKED_WRITE : 0; const int update_flags = arg->cli->prelock_updates ? DB_PRELOCKED_WRITE : 0;
for (uint32_t i = 0; i < arg->cli->txn_size; i++) { for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
if (arg->prelock_updates) { if (arg->prelock_updates) {
if (i == 0) { if (i == 0) {
...@@ -1180,8 +1181,8 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1180,8 +1181,8 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
update_key = update_key % arg->cli->num_elements; update_key = update_key % arg->cli->num_elements;
} }
const uint32_t max_key_in_table = arg->cli->num_elements - 1; const int max_key_in_table = arg->cli->num_elements - 1;
const bool range_wraps = (update_key + arg->cli->txn_size - 1) > max_key_in_table; const bool range_wraps = (update_key + (int) arg->cli->txn_size - 1) > max_key_in_table;
int left_key, right_key; int left_key, right_key;
DBT left_key_dbt, right_key_dbt; DBT left_key_dbt, right_key_dbt;
...@@ -1218,15 +1219,16 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1218,15 +1219,16 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
} }
} else { } else {
update_key++; update_key++;
if (arg->bounded_element_range) {
update_key = update_key % arg->cli->num_elements;
}
} }
fill_key_buf(update_key, keybuf, arg->cli);
} else { } else {
// just do a usual, random point update without locking first // just do a usual, random point update without locking first
update_key = myrandom_r(arg->random_data); fill_key_buf_random<int>(arg->random_data, keybuf, arg);
} }
if (arg->bounded_element_range) {
update_key = update_key % arg->cli->num_elements;
}
// the last update keeps the table's sum as zero // the last update keeps the table's sum as zero
// every other update except the last applies a random delta // every other update except the last applies a random delta
...@@ -1241,12 +1243,15 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1241,12 +1243,15 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
curr_val_sum += extra.u.d.diff; curr_val_sum += extra.u.d.diff;
} }
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
// do the update // do the update
r = db->update( r = db->update(
db, db,
txn, txn,
dbt_init(&key, &update_key, sizeof update_key), &key,
dbt_init(&val, &extra, sizeof extra), &val,
update_flags update_flags
); );
if (r != 0) { if (r != 0) {
...@@ -1269,12 +1274,11 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext ...@@ -1269,12 +1274,11 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra); struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
assert(arg->bounded_element_range); assert(arg->bounded_element_range);
assert(op_args->update_history_buffer); assert(op_args->update_history_buffer);
int r;
int r = 0;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int curr_val_sum = 0;
DBT key, val;
int rand_key;
struct update_op_extra extra; struct update_op_extra extra;
ZERO_STRUCT(extra); ZERO_STRUCT(extra);
extra.type = UPDATE_WITH_HISTORY; extra.type = UPDATE_WITH_HISTORY;
...@@ -1285,47 +1289,44 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext ...@@ -1285,47 +1289,44 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
extra.pad_bytes = 500; extra.pad_bytes = 500;
} }
} }
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
int rand_key;
int curr_val_sum = 0;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
for (uint32_t i = 0; i < arg->cli->txn_size; i++) { for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
rand_key = myrandom_r(arg->random_data) % arg->cli->num_elements; fill_key_buf_random<int>(arg->random_data, keybuf, arg);
int *rkp = (int *) keybuf;
rand_key = *rkp;
invariant(rand_key < arg->cli->num_elements);
if (i < arg->cli->txn_size - 1) {
extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL; extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
// just make every other value random // just make every other value random
if (i%2 == 0) { if (i % 2 == 0) {
extra.u.h.new_val = -extra.u.h.new_val; extra.u.h.new_val = -extra.u.h.new_val;
} }
curr_val_sum += extra.u.h.new_val; curr_val_sum += extra.u.h.new_val;
extra.u.h.expected = op_args->update_history_buffer[rand_key]; } else {
op_args->update_history_buffer[rand_key] = extra.u.h.new_val; // the last update should ensure the sum stays zero
r = db->update(
db,
txn,
dbt_init(&key, &rand_key, sizeof rand_key),
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0) {
return r;
}
}
//
// now put in one more to ensure that the sum stays 0
//
extra.u.h.new_val = -curr_val_sum; extra.u.h.new_val = -curr_val_sum;
rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % arg->cli->num_elements;
} }
extra.u.h.expected = op_args->update_history_buffer[rand_key]; extra.u.h.expected = op_args->update_history_buffer[rand_key];
op_args->update_history_buffer[rand_key] = extra.u.h.new_val; op_args->update_history_buffer[rand_key] = extra.u.h.new_val;
r = db->update( r = db->update(
db, db,
txn, txn,
dbt_init(&key, &rand_key, sizeof rand_key), &key,
dbt_init(&val, &extra, sizeof extra), &val,
0 0
); );
if (r != 0) { if (r != 0) {
return r; return r;
} }
}
return r; return r;
} }
...@@ -1351,7 +1352,7 @@ static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra), ...@@ -1351,7 +1352,7 @@ static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra),
int r; int r;
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
DB* db = arg->dbp[i]; DB* db = arg->dbp[i];
r = db->hot_optimize(db, hot_progress_callback, NULL); r = db->hot_optimize(db, hot_progress_callback, nullptr);
if (run_test) { if (run_test) {
CKERR(r); CKERR(r);
} }
...@@ -1364,6 +1365,8 @@ get_ith_table_name(char *buf, size_t len, int i) { ...@@ -1364,6 +1365,8 @@ get_ith_table_name(char *buf, size_t len, int i) {
snprintf(buf, len, "main%d", i); snprintf(buf, len, "main%d", i);
} }
DB_TXN * const null_txn = 0;
static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r; int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
...@@ -1374,12 +1377,12 @@ static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operat ...@@ -1374,12 +1377,12 @@ static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operat
ZERO_ARRAY(name); ZERO_ARRAY(name);
get_ith_table_name(name, sizeof(name), db_index); get_ith_table_name(name, sizeof(name), db_index);
r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0); r = arg->env->dbremove(arg->env, null_txn, name, nullptr, 0);
CKERR(r); CKERR(r);
r = db_create(&(arg->dbp[db_index]), arg->env, 0); r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0); assert(r == 0);
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666); r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, nullptr, DB_BTREE, DB_CREATE, 0666);
assert(r == 0); assert(r == 0);
return 0; return 0;
} }
...@@ -1464,6 +1467,7 @@ struct sleep_and_crash_extra { ...@@ -1464,6 +1467,7 @@ struct sleep_and_crash_extra {
bool is_setup; bool is_setup;
bool threads_have_joined; bool threads_have_joined;
}; };
static void *sleep_and_crash(void *extra) { static void *sleep_and_crash(void *extra) {
sleep_and_crash_extra *e = static_cast<sleep_and_crash_extra *>(extra); sleep_and_crash_extra *e = static_cast<sleep_and_crash_extra *>(extra);
toku_mutex_lock(&e->mutex); toku_mutex_lock(&e->mutex);
...@@ -1508,7 +1512,7 @@ static int run_workers( ...@@ -1508,7 +1512,7 @@ static int run_workers(
int r; int r;
const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format]; const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format];
toku_mutex_t mutex = ZERO_MUTEX_INITIALIZER; toku_mutex_t mutex = ZERO_MUTEX_INITIALIZER;
toku_mutex_init(&mutex, NULL); toku_mutex_init(&mutex, nullptr);
struct rwlock rwlock; struct rwlock rwlock;
rwlock_init(&rwlock); rwlock_init(&rwlock);
toku_pthread_t tids[num_threads]; toku_pthread_t tids[num_threads];
...@@ -1533,11 +1537,11 @@ static int run_workers( ...@@ -1533,11 +1537,11 @@ static int run_workers(
worker_extra[i].operation_lock_mutex = &mutex; worker_extra[i].operation_lock_mutex = &mutex;
XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters); XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters);
TOKU_DRD_IGNORE_VAR(worker_extra[i].counters); TOKU_DRD_IGNORE_VAR(worker_extra[i].counters);
{ int chk_r = toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]); CKERR(chk_r); } { int chk_r = toku_pthread_create(&tids[i], nullptr, worker, &worker_extra[i]); CKERR(chk_r); }
if (verbose) if (verbose)
printf("%lu created\n", (unsigned long) tids[i]); printf("%lu created\n", (unsigned long) tids[i]);
} }
{ int chk_r = toku_pthread_create(&time_tid, NULL, test_time, &tte); CKERR(chk_r); } { int chk_r = toku_pthread_create(&time_tid, nullptr, test_time, &tte); CKERR(chk_r); }
if (verbose) if (verbose)
printf("%lu created\n", (unsigned long) time_tid); printf("%lu created\n", (unsigned long) time_tid);
...@@ -1660,131 +1664,22 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1660,131 +1664,22 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = db->set_readpagesize(db, env_args.basement_node_size); r = db->set_readpagesize(db, env_args.basement_node_size);
CKERR(r); CKERR(r);
const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0); const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0);
r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666); r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666);
CKERR(r); CKERR(r);
db_res[i] = db; db_res[i] = db;
} }
return r; return r;
} }
static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bufsz, int val_bufsz, static void report_overall_fill_table_progress(struct cli_args *args, int num_rows) {
void (*callback)(int idx, void *extra,
void *key, int *keysz,
void *val, int *valsz),
void *extra, void (*progress_cb)(int num_rows)) {
DB_TXN *txn = nullptr;
const int puts_per_txn = 100000;
int r = 0;
for (long i = 0; i < num_elements; ++i) {
if (txn == nullptr) {
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
}
char keybuf[key_bufsz], valbuf[val_bufsz];
memset(keybuf, 0, sizeof(keybuf));
memset(valbuf, 0, sizeof(valbuf));
int keysz, valsz;
callback(i, extra, keybuf, &keysz, valbuf, &valsz);
// let's make sure the data stored fits in the buffers we passed in
assert(keysz <= key_bufsz);
assert(valsz <= val_bufsz);
DBT key, val;
// make size of data what is specified w/input parameters
// note that key and val have sizes of
// key_bufsz and val_bufsz, which were passed into this
// function, not what was stored by the callback
r = db->put(
db,
txn,
dbt_init(&key, keybuf, key_bufsz),
dbt_init(&val, valbuf, val_bufsz),
// don't bother taking locks in the locktree
DB_PRELOCKED_WRITE
);
assert(r == 0);
if (i > 0 && i % puts_per_txn == 0) {
// don't bother fsyncing to disk.
// the caller can checkpoint if they want.
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
txn = nullptr;
if (verbose) {
progress_cb(puts_per_txn);
}
}
}
if (txn) {
r = txn->commit(txn, DB_TXN_NOSYNC);
invariant_zero(r);
}
return r;
}
static uint32_t breverse(uint32_t v)
// Effect: return the bits in i, reversed
// Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
// Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct.
{
uint32_t r = v; // r will be reversed bits of v; first get LSB of v
int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end
for (v >>= 1; v; v >>= 1) {
r <<= 1;
r |= v & 1;
s--;
}
r <<= s; // shift when v's highest bits are zero
return r;
}
static void zero_element_callback(int idx, void *extra, void *keyv, int *keysz, void *valv, int *valsz) {
const bool *disperse_keys = static_cast<bool *>(extra);
int *CAST_FROM_VOIDP(key, keyv);
int *CAST_FROM_VOIDP(val, valv);
if (*disperse_keys) {
*key = static_cast<int>(breverse(idx));
} else {
*key = idx;
}
*val = 0;
*keysz = sizeof(int);
*valsz = sizeof(int);
}
struct fill_table_worker_info {
DB_ENV *env;
DB *db;
int num_elements;
uint32_t key_size;
uint32_t val_size;
bool disperse_keys;
void (*progress_cb)(int num_rows);
};
static void fill_table_worker(void *arg) {
struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
int r = fill_table_from_fun(
info->env,
info->db,
info->num_elements,
info->key_size,
info->val_size,
zero_element_callback,
&info->disperse_keys,
info->progress_cb
);
invariant_zero(r);
toku_free(info);
}
static int num_tables_to_fill = 1;
static int rows_per_table = 1;
static void report_overall_fill_table_progress(int num_rows) {
// for sanitary reasons we'd like to prevent two threads // for sanitary reasons we'd like to prevent two threads
// from printing the same performance report twice. // from printing the same performance report twice.
static bool reporting; static bool reporting;
// when was the first time measurement taken? // when was the first time measurement taken?
static uint64_t t0; static uint64_t t0;
static int rows_inserted; static int rows_inserted;
// when was the last report? what was its progress? // when was the last report? what was its progress?
static uint64_t last_report; static uint64_t last_report;
static double last_progress; static double last_progress;
...@@ -1794,12 +1689,9 @@ static void report_overall_fill_table_progress(int num_rows) { ...@@ -1794,12 +1689,9 @@ static void report_overall_fill_table_progress(int num_rows) {
} }
uint64_t rows_so_far = toku_sync_add_and_fetch(&rows_inserted, num_rows); uint64_t rows_so_far = toku_sync_add_and_fetch(&rows_inserted, num_rows);
double progress = rows_so_far / double progress = rows_so_far / (args->num_elements * args->num_DBs * 1.0);
(rows_per_table * num_tables_to_fill * 1.0);
if (progress > (last_progress + .01)) { if (progress > (last_progress + .01)) {
uint64_t t1 = toku_current_time_microsec(); uint64_t t1 = toku_current_time_microsec();
// report no more often than once every 5 seconds, for less output.
// there is a race condition. it is probably harmless.
const uint64_t minimum_report_period = 5 * 1000000; const uint64_t minimum_report_period = 5 * 1000000;
if (t1 > last_report + minimum_report_period if (t1 > last_report + minimum_report_period
&& toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) { && toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) {
...@@ -1813,24 +1705,94 @@ static void report_overall_fill_table_progress(int num_rows) { ...@@ -1813,24 +1705,94 @@ static void report_overall_fill_table_progress(int num_rows) {
} }
} }
static int fill_tables_with_zeroes(DB_ENV *env, DB **dbs, int num_DBs, int num_elements, uint32_t key_size, uint32_t val_size, bool disperse_keys) { static void fill_single_table(DB_ENV *env, DB *db, struct cli_args *args, bool fill_with_zeroes) {
// set the static globals that the progress reporter uses const int min_size_for_loader = 1 * 1000 * 1000;
num_tables_to_fill = num_DBs; const int puts_per_txn = 10 * 1000;;
rows_per_table = num_elements;
int r = 0;
DB_TXN *txn = nullptr;
DB_LOADER *loader = nullptr;
struct random_data random_data;
char random_buf[8];
r = myinitstate_r(random(), random_buf, 8, &random_data); CKERR(r);
uint8_t keybuf[args->key_size], valbuf[args->val_size];
memset(keybuf, 0, sizeof keybuf);
memset(valbuf, 0, sizeof valbuf);
DBT key, val;
dbt_init(&key, keybuf, args->key_size);
dbt_init(&val, valbuf, args->val_size);
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
if (args->num_elements >= min_size_for_loader) {
uint32_t db_flags = DB_PRELOCKED_WRITE;
uint32_t dbt_flags = 0;
r = env->create_loader(env, txn, &loader, db, 1, &db, &db_flags, &dbt_flags, 0); CKERR(r);
}
for (int i = 0; i < args->num_elements; i++) {
if (fill_with_zeroes) {
// Legacy test, 4 byte signed keys and 4 byte zero values.
const int k = i;
const int zero = 0;
fill_key_buf(k, keybuf, args);
fill_val_buf(zero, valbuf, args->val_size);
} else {
// Modern test, >= 8 byte unsigned keys, >= 8 byte random values.
const uint64_t k = i;
fill_key_buf(k, keybuf, args);
fill_val_buf_random(&random_data, valbuf, args);
}
r = loader ? loader->put(loader, &key, &val) :
db->put(db, txn, &key, &val, DB_PRELOCKED_WRITE);
CKERR(r);
if (i > 0 && i % puts_per_txn == 0) {
if (verbose) {
report_overall_fill_table_progress(args, puts_per_txn);
}
// begin a new txn if we're not using the loader,
// don't bother fsyncing to disk.
if (loader == nullptr) {
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
}
}
}
if (loader) {
r = loader->close(loader);
}
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
}
struct fill_table_worker_info {
struct cli_args *args;
DB_ENV *env;
DB *db;
bool fill_with_zeroes;
};
static void fill_table_worker(void *arg) {
struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
fill_single_table(info->env, info->db, info->args, info->fill_with_zeroes);
toku_free(info);
}
static int fill_tables(DB_ENV *env, DB **dbs, struct cli_args *args, bool fill_with_zeroes) {
const int num_cores = toku_os_get_number_processors(); const int num_cores = toku_os_get_number_processors();
KIBBUTZ kibbutz = toku_kibbutz_create(num_cores); // Use at most cores / 2 worker threads, since we want some other cores to
for (int i = 0; i < num_DBs; i++) { // be used for internal engine work (ie: flushes, loader threads, etc).
assert(key_size >= sizeof(int)); const int max_num_workers = (num_cores + 1) / 2;
assert(val_size >= sizeof(int)); const int num_workers = args->num_DBs < max_num_workers ? args->num_DBs : max_num_workers;
KIBBUTZ kibbutz = toku_kibbutz_create(num_workers);
for (int i = 0; i < args->num_DBs; i++) {
struct fill_table_worker_info *XCALLOC(info); struct fill_table_worker_info *XCALLOC(info);
info->env = env; info->env = env;
info->db = dbs[i]; info->db = dbs[i];
info->num_elements = num_elements; info->args = args;
info->key_size = key_size; info->fill_with_zeroes = fill_with_zeroes;
info->val_size = val_size;
info->disperse_keys = disperse_keys;
info->progress_cb = report_overall_fill_table_progress;
toku_kibbutz_enq(kibbutz, fill_table_worker, info); toku_kibbutz_enq(kibbutz, fill_table_worker, info);
} }
toku_kibbutz_destroy(kibbutz); toku_kibbutz_destroy(kibbutz);
...@@ -1865,7 +1827,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1865,7 +1827,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
int r; int r;
struct env_args env_args = cli_args->env_args; struct env_args env_args = cli_args->env_args;
/* create the dup database file */
DB_ENV *env; DB_ENV *env;
db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes); db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes);
r = db_env_create(&env, 0); assert(r == 0); r = db_env_create(&env, 0); assert(r == 0);
...@@ -1873,7 +1834,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1873,7 +1834,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = env->set_default_bt_compare(env, bt_compare); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r); r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r);
env->set_update(env, env_args.update_function); env->set_update(env, env_args.update_function);
// set the cache size to 10MB
r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r); r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r);
if (env_args.generate_put_callback) { if (env_args.generate_put_callback) {
...@@ -1904,7 +1864,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1904,7 +1864,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = db_create(&db, env, 0); r = db_create(&db, env, 0);
CKERR(r); CKERR(r);
const int flags = cli_args->blackhole ? DB_BLACKHOLE : 0; const int flags = cli_args->blackhole ? DB_BLACKHOLE : 0;
r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666); r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666);
CKERR(r); CKERR(r);
db_res[i] = db; db_res[i] = db;
} }
...@@ -1932,8 +1892,8 @@ static const struct env_args DEFAULT_ENV_ARGS = { ...@@ -1932,8 +1892,8 @@ static const struct env_args DEFAULT_ENV_ARGS = {
.num_bucket_mutexes = 1024, .num_bucket_mutexes = 1024,
.envdir = ENVDIR, .envdir = ENVDIR,
.update_function = update_op_callback, .update_function = update_op_callback,
.generate_put_callback = NULL, .generate_put_callback = nullptr,
.generate_del_callback = NULL, .generate_del_callback = nullptr,
}; };
static const struct env_args DEFAULT_PERF_ENV_ARGS = { static const struct env_args DEFAULT_PERF_ENV_ARGS = {
...@@ -1947,15 +1907,11 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = { ...@@ -1947,15 +1907,11 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = {
.cachetable_size = 1<<30, .cachetable_size = 1<<30,
.num_bucket_mutexes = 1024 * 1024, .num_bucket_mutexes = 1024 * 1024,
.envdir = ENVDIR, .envdir = ENVDIR,
.update_function = NULL, .update_function = nullptr,
.generate_put_callback = NULL, .generate_put_callback = nullptr,
.generate_del_callback = NULL, .generate_del_callback = nullptr,
}; };
#define MIN_VAL_SIZE sizeof(int)
#define MIN_KEY_SIZE sizeof(int)
#define MIN_COMPRESSIBILITY (0.0)
#define MAX_COMPRESSIBILITY (1.0)
static struct cli_args UU() get_default_args(void) { static struct cli_args UU() get_default_args(void) {
struct cli_args DEFAULT_ARGS = { struct cli_args DEFAULT_ARGS = {
.num_elements = 150000, .num_elements = 150000,
...@@ -1979,8 +1935,8 @@ static struct cli_args UU() get_default_args(void) { ...@@ -1979,8 +1935,8 @@ static struct cli_args UU() get_default_args(void) {
.perf_output_format = HUMAN, .perf_output_format = HUMAN,
.performance_period = 1, .performance_period = 1,
.txn_size = 1000, .txn_size = 1000,
.key_size = MIN_KEY_SIZE, .key_size = min_key_size,
.val_size = MIN_VAL_SIZE, .val_size = min_val_size,
.compressibility = 1.0, .compressibility = 1.0,
.env_args = DEFAULT_ENV_ARGS, .env_args = DEFAULT_ENV_ARGS,
.single_txn = false, .single_txn = false,
...@@ -2002,6 +1958,8 @@ static struct cli_args UU() get_default_args_for_perf(void) { ...@@ -2002,6 +1958,8 @@ static struct cli_args UU() get_default_args_for_perf(void) {
args.num_elements = 1000000; //default of 1M args.num_elements = 1000000; //default of 1M
//args.print_performance = true; //args.print_performance = true;
args.env_args = DEFAULT_PERF_ENV_ARGS; args.env_args = DEFAULT_PERF_ENV_ARGS;
args.key_size = sizeof(uint64_t);
args.val_size = sizeof(uint64_t);
return args; return args;
} }
...@@ -2019,7 +1977,6 @@ struct arg_type; ...@@ -2019,7 +1977,6 @@ struct arg_type;
typedef bool (*match_fun)(struct arg_type *type, char *const argv[]); typedef bool (*match_fun)(struct arg_type *type, char *const argv[]);
typedef int (*parse_fun)(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]); typedef int (*parse_fun)(struct arg_type *type, int *extra_args_consumed, int argc, char *const argv[]);
//TODO fix
typedef void (*help_fun)(struct arg_type *type, int width_name, int width_type); typedef void (*help_fun)(struct arg_type *type, int width_name, int width_type);
struct type_description { struct type_description {
...@@ -2271,8 +2228,6 @@ struct type_description type_bool = { ...@@ -2271,8 +2228,6 @@ struct type_description type_bool = {
static inline void static inline void
do_usage(const char *argv0, int n, struct arg_type types[/*n*/]) { do_usage(const char *argv0, int n, struct arg_type types[/*n*/]) {
// fprintf(stderr, "\t--compressibility DOUBLE (default %.2f, minimum %.2f, maximum %.2f)\n",
// default_args.compressibility, MIN_COMPRESSIBILITY, MAX_COMPRESSIBILITY);
fprintf(stderr, "Usage:\n"); fprintf(stderr, "Usage:\n");
fprintf(stderr, "\t%s [-h|--help]\n", argv0); fprintf(stderr, "\t%s [-h|--help]\n", argv0);
fprintf(stderr, "\t%s [OPTIONS]\n", argv0); fprintf(stderr, "\t%s [OPTIONS]\n", argv0);
...@@ -2327,7 +2282,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -2327,7 +2282,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
#define LOCAL_STRING_ARG(name_string, variable, default) \ #define LOCAL_STRING_ARG(name_string, variable, default) \
MAKE_LOCAL_ARG(name_string, type_string, s, default, variable, "", "", "") MAKE_LOCAL_ARG(name_string, type_string, s, default, variable, "", "", "")
const char *perf_format_s = NULL; const char *perf_format_s = nullptr;
struct arg_type arg_types[] = { struct arg_type arg_types[] = {
INT32_ARG_NONNEG("--num_elements", num_elements, ""), INT32_ARG_NONNEG("--num_elements", num_elements, ""),
INT32_ARG_NONNEG("--num_DBs", num_DBs, ""), INT32_ARG_NONNEG("--num_DBs", num_DBs, ""),
...@@ -2358,8 +2313,8 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -2358,8 +2313,8 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
DOUBLE_ARG_R("--compressibility", compressibility, "", 0.0, 1.0), DOUBLE_ARG_R("--compressibility", compressibility, "", 0.0, 1.0),
//TODO: when outputting help.. skip min/max that is min/max of data range. //TODO: when outputting help.. skip min/max that is min/max of data range.
UINT32_ARG_R("--key_size", key_size, " bytes", MIN_KEY_SIZE, UINT32_MAX), UINT32_ARG_R("--key_size", key_size, " bytes", min_key_size, UINT32_MAX),
UINT32_ARG_R("--val_size", val_size, " bytes", MIN_VAL_SIZE, UINT32_MAX), UINT32_ARG_R("--val_size", val_size, " bytes", min_val_size, UINT32_MAX),
BOOL_ARG("serial_insert", serial_insert), BOOL_ARG("serial_insert", serial_insert),
BOOL_ARG("interleave", interleave), BOOL_ARG("interleave", interleave),
...@@ -2439,20 +2394,15 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -2439,20 +2394,15 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
} }
} }
} }
if (perf_format_s != NULL) { if (perf_format_s != nullptr) {
if (!strcmp(perf_format_s, "human")) { if (!strcmp(perf_format_s, "human")) {
args->perf_output_format = HUMAN; args->perf_output_format = HUMAN;
} else if (!strcmp(perf_format_s, "csv")) { } else if (!strcmp(perf_format_s, "csv")) {
args->perf_output_format = CSV; args->perf_output_format = CSV;
} else if (!strcmp(perf_format_s, "tsv")) { } else if (!strcmp(perf_format_s, "tsv")) {
args->perf_output_format = TSV; args->perf_output_format = TSV;
#if 0
} else if (!strcmp(perf_format_s, "gnuplot")) {
args->perf_output_format = GNUPLOT;
#endif
} else { } else {
fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", and \"tsv\"\n"); fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", and \"tsv\"\n");
//fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", \"tsv\", and \"gnuplot\"\n");
do_usage(argv0, num_arg_types, arg_types); do_usage(argv0, num_arg_types, arg_types);
exit(EINVAL); exit(EINVAL);
} }
...@@ -2467,39 +2417,49 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -2467,39 +2417,49 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
static void static void
stress_table(DB_ENV *, DB **, struct cli_args *); stress_table(DB_ENV *, DB **, struct cli_args *);
static int template<typename integer_t>
UU() stress_int_dbt_cmp (DB *db, const DBT *a, const DBT *b) { static int int_cmp(integer_t x, integer_t y) {
assert(db && a && b); if (x < y) {
assert(a->size >= sizeof(int)); return -1;
assert(b->size >= sizeof(int)); } else if (x > y) {
return +1;
} else {
return 0;
}
}
static int
stress_dbt_cmp_legacy(const DBT *a, const DBT *b) {
int x = *(int *) a->data; int x = *(int *) a->data;
int y = *(int *) b->data; int y = *(int *) b->data;
return int_cmp(x, y);
if (x<y) return -1;
if (x>y) return 1;
return 0;
} }
static int static int
UU() stress_uint64_dbt_cmp(DB *db, const DBT *a, const DBT *b) { stress_dbt_cmp(const DBT *a, const DBT *b) {
assert(db && a && b); // Keys are only compared by their first 8 bytes,
assert(a->size >= sizeof(uint64_t)); // interpreted as a little endian 64 bit integers.
assert(b->size >= sizeof(uint64_t)); // The rest of the key is just padding.
uint64_t x = *(uint64_t *) a->data; uint64_t x = *(uint64_t *) a->data;
uint64_t y = *(uint64_t *) b->data; uint64_t y = *(uint64_t *) b->data;
return int_cmp(x, y);
}
if (x < y) { static int
return -1; stress_cmp(DB *db, const DBT *a, const DBT *b) {
} assert(db && a && b);
if (x > y) { assert(a->size == b->size);
return +1;
if (a->size == sizeof(int)) {
// Legacy comparison: keys must be >= 4 bytes
return stress_dbt_cmp_legacy(a, b);
} else {
// Modern comparison: keys must be >= 8 bytes
invariant(a->size >= sizeof(uint64_t));
return stress_dbt_cmp(a, b);
} }
return 0;
} }
static void static void
do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args) do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args)
{ {
...@@ -2512,21 +2472,50 @@ do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args) ...@@ -2512,21 +2472,50 @@ do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args)
scan_arg.operation_extra = &soe; scan_arg.operation_extra = &soe;
scan_arg.operation = scan_op_no_check; scan_arg.operation = scan_op_no_check;
scan_arg.lock_type = STRESS_LOCK_NONE; scan_arg.lock_type = STRESS_LOCK_NONE;
DB_TXN* txn = NULL; DB_TXN* txn = nullptr;
// don't take serializable read locks when scanning. // don't take serializable read locks when scanning.
int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r); int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
// make sure the scan doesn't terminate early // make sure the scan doesn't terminate early
run_test = true; run_test = true;
// warm up each DB in parallel // warm up each DB in parallel
scan_op_no_check_parallel(txn, &scan_arg, &soe, NULL); scan_op_no_check_parallel(txn, &scan_arg, &soe, nullptr);
r = txn->commit(txn,0); CKERR(r); r = txn->commit(txn,0); CKERR(r);
} }
static void static void
UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, const DBT *, const DBT *)) UU() stress_recover(struct cli_args *args) {
DB_ENV* env = nullptr;
DB* dbs[args->num_DBs];
memset(dbs, 0, sizeof(dbs));
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_cmp,
args); CKERR(chk_r); }
DB_TXN* txn = nullptr;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r);
struct scan_op_extra soe = {
.fast = true,
.fwd = true,
.prefetch = false
};
// make sure the scan doesn't terminate early
run_test = true;
r = scan_op(txn, &recover_args, &soe, nullptr);
CKERR(r);
{ int chk_r = txn->commit(txn,0); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
}
static void
test_main(struct cli_args *args, bool fill_with_zeroes)
{ {
{ char *loc = setlocale(LC_NUMERIC, "en_US.UTF-8"); assert(loc); } { char *loc = setlocale(LC_NUMERIC, "en_US.UTF-8"); assert(loc); }
DB_ENV* env = NULL; DB_ENV* env = nullptr;
DB* dbs[args->num_DBs]; DB* dbs[args->num_DBs];
memset(dbs, 0, sizeof(dbs)); memset(dbs, 0, sizeof(dbs));
db_env_enable_engine_status(args->nocrashstatus ? false : true); db_env_enable_engine_status(args->nocrashstatus ? false : true);
...@@ -2535,17 +2524,17 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co ...@@ -2535,17 +2524,17 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co
&env, &env,
dbs, dbs,
args->num_DBs, args->num_DBs,
bt_compare, stress_cmp,
args args
); );
{ int chk_r = fill_tables_with_zeroes(env, dbs, args->num_DBs, args->num_elements, args->key_size, args->val_size, args->disperse_keys); CKERR(chk_r); } { int chk_r = fill_tables(env, dbs, args, fill_with_zeroes); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); } { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
} }
if (!args->only_create) { if (!args->only_create) {
{ int chk_r = open_tables(&env, { int chk_r = open_tables(&env,
dbs, dbs,
args->num_DBs, args->num_DBs,
bt_compare, stress_cmp,
args); CKERR(chk_r); } args); CKERR(chk_r); }
if (args->warm_cache) { if (args->warm_cache) {
do_warm_cache(env, dbs, args); do_warm_cache(env, dbs, args);
...@@ -2556,37 +2545,17 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co ...@@ -2556,37 +2545,17 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co
} }
static void static void
UU() stress_test_main(struct cli_args *args) UU() stress_test_main(struct cli_args *args) {
{ // Begin the test with fixed size values equal to zero.
stress_test_main_with_cmp(args, stress_int_dbt_cmp); // This is important for correctness testing.
test_main(args, true);
} }
static void static void
UU() stress_recover(struct cli_args *args) { UU() perf_test_main(struct cli_args *args) {
DB_ENV* env = NULL; // Do not begin the test by creating a table of all zeroes.
DB* dbs[args->num_DBs]; // We want to control the row size and its compressibility.
memset(dbs, 0, sizeof(dbs)); test_main(args, false);
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_int_dbt_cmp,
args); CKERR(chk_r); }
DB_TXN* txn = NULL;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r);
struct scan_op_extra soe;
soe.fast = true;
soe.fwd = true;
soe.prefetch = false;
// make sure the scan doesn't terminate early
run_test = true;
r = scan_op(txn, &recover_args, &soe, NULL);
CKERR(r);
{ int chk_r = txn->commit(txn,0); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
} }
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment