Commit 05eeb46a authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

fixes #5801 perf framework has been improved in a few ways:

- minimum key/val size is up to 8 bytes. now perf tests and stress tests have a consistent mechanism for generating keys and values, reducing headache.
- perf tests use a "main" function that generates tables based on the provided key/val size and compressibility, while stress (correctness) tests use a main function that generates tables with a provided (default in all cases, I think) key/val size and fully zero'd values, so the table is in the "correct" zero-sum state. previously this was causing perf tests to generate zer-valued rows always, no matter what row size you wanted or how much compressibility was requested, which clearly skewed performance results (ie: your 100 byte rows are only 8 bytes)
- renamed legacy NULL pointers to nullptr
- moved some "special" operations to the perf/stress tests that use them instead of the global header, to keep things more tidy.



git-svn-id: file:///svn/toku/tokudb@51258 c7de825b-a66e-492c-adef-691d508d4ae1
parent 39099003
...@@ -42,6 +42,6 @@ int ...@@ -42,6 +42,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -42,17 +42,17 @@ static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void ...@@ -42,17 +42,17 @@ static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
} }
int r = 0; int r = 0;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size]; uint8_t valbuf[arg->cli->val_size];
ZERO_ARRAY(valbuf); dbt_init(&mult_key_dbt[0], keybuf, sizeof keybuf);
dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf);
uint64_t puts_to_increment = 0; uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) { for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
fill_zeroed_array(valbuf, arg->cli->val_size,
arg->random_data, arg->cli->compressibility);
struct iibench_op_extra *CAST_FROM_VOIDP(info, operation_extra); struct iibench_op_extra *CAST_FROM_VOIDP(info, operation_extra);
uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1); uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1);
dbt_init(&mult_key_dbt[0], &pk, sizeof pk); fill_key_buf(pk, keybuf, arg->cli);
dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf); fill_val_buf_random(arg->random_data, valbuf, arg->cli);
r = env->put_multiple( r = env->put_multiple(
env, env,
dbs[0], // source db. dbs[0], // source db.
...@@ -128,6 +128,6 @@ test_main(int argc, char *const argv[]) { ...@@ -128,6 +128,6 @@ test_main(int argc, char *const argv[]) {
args.crash_on_operation_failure = false; args.crash_on_operation_failure = false;
} }
args.env_args.generate_put_callback = iibench_generate_row_for_put; args.env_args.generate_put_callback = iibench_generate_row_for_put;
stress_test_main_with_cmp(&args, stress_uint64_dbt_cmp); perf_test_main(&args);
return 0; return 0;
} }
...@@ -51,6 +51,6 @@ test_main(int argc, char *const argv[]) { ...@@ -51,6 +51,6 @@ test_main(int argc, char *const argv[]) {
if (args.num_put_threads > 1) { if (args.num_put_threads > 1) {
args.crash_on_operation_failure = false; args.crash_on_operation_failure = false;
} }
stress_test_main_with_cmp(&args, stress_uint64_dbt_cmp); perf_test_main(&args);
return 0; return 0;
} }
...@@ -20,6 +20,13 @@ ...@@ -20,6 +20,13 @@
// The intent of this test is to measure the throughput of malloc and free // The intent of this test is to measure the throughput of malloc and free
// with multiple threads. // with multiple threads.
static int xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
size_t s = 256;
void *p = toku_xmalloc(s);
toku_free(p);
return 0;
}
static void static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
...@@ -27,7 +34,7 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { ...@@ -27,7 +34,7 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
struct arg myargs[num_threads]; struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) { for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args); arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = malloc_free_op; myargs[i].operation = xmalloc_free_op;
} }
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args); run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
} }
...@@ -36,6 +43,6 @@ int ...@@ -36,6 +43,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
// The intent of this test is to measure the throughput of the test infrastructure executing a nop // The intent of this test is to measure the throughput of the test infrastructure executing a nop
// on multiple threads. // on multiple threads.
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
return 0;
}
static void static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) { stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n"); if (verbose) printf("starting creation of pthreads\n");
...@@ -34,6 +38,6 @@ int ...@@ -34,6 +38,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -62,6 +62,6 @@ int ...@@ -62,6 +62,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -75,6 +75,6 @@ int ...@@ -75,6 +75,6 @@ int
test_main(int argc, char *const argv[]) { test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf(); struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -78,6 +78,6 @@ test_main(int argc, char *const argv[]) { ...@@ -78,6 +78,6 @@ test_main(int argc, char *const argv[]) {
args.num_update_threads = 1; args.num_update_threads = 1;
args.crash_on_operation_failure = false; args.crash_on_operation_failure = false;
parse_stress_test_args(argc, argv, &args); parse_stress_test_args(argc, argv, &args);
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
...@@ -71,6 +71,6 @@ test_main(int argc, char *const argv[]) { ...@@ -71,6 +71,6 @@ test_main(int argc, char *const argv[]) {
// this test is all about transactions, make the DB small // this test is all about transactions, make the DB small
args.num_elements = 1; args.num_elements = 1;
args.num_DBs= 1; args.num_DBs= 1;
stress_test_main(&args); perf_test_main(&args);
return 0; return 0;
} }
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
// The intent of this test is to measure the throughput of toku_malloc and toku_free
// with multiple threads.
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = xmalloc_free_op;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
stress_test_main(&args);
return 0;
}
...@@ -22,6 +22,14 @@ ...@@ -22,6 +22,14 @@
// This test is targetted at stressing the locktree, hence the small table and many update threads. // This test is targetted at stressing the locktree, hence the small table and many update threads.
// //
static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_extra, void *UU(stats_extra)) {
invariant_null(operation_extra);
if (!arg->cli->nolocktree) {
toku_env_run_lock_escalation_for_test(arg->env);
}
return 0;
}
static void static void
stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
...@@ -40,13 +48,8 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) { ...@@ -40,13 +48,8 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[0].operation_extra = &soe[0]; myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op; myargs[0].operation = scan_op;
// make the lock escalation thread. myargs[1].sleep_ms = 15L * 1000;
// it should sleep somewhere between 10 and 20 myargs[1].operation_extra = nullptr;
// seconds between each escalation.
struct lock_escalation_op_extra eoe;
eoe.min_sleep_time_micros = 10UL * (1000 * 1000);
eoe.max_sleep_time_micros = 20UL * (1000 * 1000);
myargs[1].operation_extra = &eoe;
myargs[1].operation = lock_escalation_op; myargs[1].operation = lock_escalation_op;
// make the threads that update the db // make the threads that update the db
......
...@@ -69,9 +69,15 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void ...@@ -69,9 +69,15 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void
DBT dest_vals[2]; DBT dest_vals[2];
memset(dest_keys, 0, sizeof(dest_keys)); memset(dest_keys, 0, sizeof(dest_keys));
memset(dest_vals, 0, sizeof(dest_vals)); memset(dest_vals, 0, sizeof(dest_vals));
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size];
dbt_init(&key, keybuf, sizeof keybuf),
dbt_init(&val, valbuf, sizeof valbuf),
r = env->txn_begin(env, NULL, &hi_txn, 0); CKERR(r);
int i; int i;
r = env->txn_begin(env, NULL, &hi_txn, 0);
CKERR(r);
for (i = 0; i < 1000; i++) { for (i = 0; i < 1000; i++) {
DB* dbs[2]; DB* dbs[2];
toku_mutex_lock(&hi_lock); toku_mutex_lock(&hi_lock);
...@@ -79,11 +85,8 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void ...@@ -79,11 +85,8 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void
dbs[1] = hot_db; dbs[1] = hot_db;
int num_dbs = hot_db ? 2 : 1; int num_dbs = hot_db ? 2 : 1;
// do a random insertion // do a random insertion
int rand_key = random() % arg->cli->num_elements; fill_key_buf_random(arg->random_data, keybuf, arg);
int rand_val = random(); fill_val_buf_random(arg->random_data, valbuf, arg->cli);
DBT key, val;
dbt_init(&key, &rand_key, sizeof(rand_key)),
dbt_init(&val, &rand_val, sizeof(rand_val)),
r = env->put_multiple( r = env->put_multiple(
env, env,
db, db,
......
...@@ -52,6 +52,11 @@ memalign(size_t UU(alignment), size_t size) ...@@ -52,6 +52,11 @@ memalign(size_t UU(alignment), size_t size)
# endif # endif
#endif #endif
#define MIN_VAL_SIZE sizeof(int64_t)
#define MIN_KEY_SIZE sizeof(int64_t)
#define MIN_COMPRESSIBILITY (0.0)
#define MAX_COMPRESSIBILITY (1.0)
volatile bool run_test; // should be volatile since we are communicating through this variable. volatile bool run_test; // should be volatile since we are communicating through this variable.
typedef struct arg *ARG; typedef struct arg *ARG;
...@@ -87,9 +92,6 @@ enum perf_output_format { ...@@ -87,9 +92,6 @@ enum perf_output_format {
HUMAN = 0, HUMAN = 0,
CSV, CSV,
TSV, TSV,
#if 0
GNUPLOT,
#endif
NUM_OUTPUT_FORMATS NUM_OUTPUT_FORMATS
}; };
...@@ -153,8 +155,6 @@ struct arg { ...@@ -153,8 +155,6 @@ struct arg {
bool prelock_updates; bool prelock_updates;
}; };
DB_TXN * const null_txn = 0;
static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) { static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
arg->cli = cli_args; arg->cli = cli_args;
arg->dbp = dbp; arg->dbp = dbp;
...@@ -163,7 +163,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl ...@@ -163,7 +163,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl
arg->sleep_ms = 0; arg->sleep_ms = 0;
arg->lock_type = STRESS_LOCK_NONE; arg->lock_type = STRESS_LOCK_NONE;
arg->txn_type = DB_TXN_SNAPSHOT; arg->txn_type = DB_TXN_SNAPSHOT;
arg->operation_extra = NULL; arg->operation_extra = nullptr;
arg->do_prepare = false; arg->do_prepare = false;
arg->prelock_updates = false; arg->prelock_updates = false;
} }
...@@ -174,12 +174,14 @@ enum operation_type { ...@@ -174,12 +174,14 @@ enum operation_type {
PTQUERIES, PTQUERIES,
NUM_OPERATION_TYPES NUM_OPERATION_TYPES
}; };
const char *operation_names[] = { const char *operation_names[] = {
"ops", "ops",
"puts", "puts",
"ptqueries", "ptqueries",
NULL nullptr
}; };
static void increment_counter(void *extra, enum operation_type type, uint64_t inc) { static void increment_counter(void *extra, enum operation_type type, uint64_t inc) {
invariant(type != OPERATION); invariant(type != OPERATION);
int t = (int) type; int t = (int) type;
...@@ -399,45 +401,6 @@ tsv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], con ...@@ -399,45 +401,6 @@ tsv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], con
printf("\n"); printf("\n");
} }
#if 0
static void
gnuplot_print_perf_header(const struct cli_args *cli_args, const int num_threads)
{
printf("set terminal postscript solid color\n");
printf("set output \"foo.eps\"\n");
printf("set xlabel \"seconds\"\n");
printf("set xrange [0:*]\n");
printf("set ylabel \"X/s\"\n");
printf("plot ");
if (cli_args->print_thread_performance) {
for (int t = 1; t <= num_threads; ++t) {
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
const int col = (2 * ((t - 1) * (int) NUM_OPERATION_TYPES + op)) + 2;
//printf("'-' u 1:%d w lines t \"Thread %d %s\", ", col, t, operation_names[op]);
printf("'-' u 1:%d w lines t \"Thread %d %s/s\", ", col + 1, t, operation_names[op]);
}
}
}
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
const int col = (2 * (num_threads * (int) NUM_OPERATION_TYPES + op)) + 2;
//printf("'-' u 1:%d w lines t \"Total %s\", ", col);
printf("'-' u 1:%d w lines t \"Total %s/s\"%s", col + 1, operation_names[op], op == ((int) NUM_OPERATION_TYPES - 1) ? "\n" : ", ");
}
}
static void
gnuplot_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads)
{
tsv_print_perf_iteration(cli_args, current_time, last_counters, counters, num_threads);
}
static void
gnuplot_print_perf_totals(const struct cli_args *UU(cli_args), uint64_t *UU(counters[]), const int UU(num_threads))
{
printf("e\n");
}
#endif
const struct perf_formatter perf_formatters[] = { const struct perf_formatter perf_formatters[] = {
[HUMAN] = { [HUMAN] = {
.header = human_print_perf_header, .header = human_print_perf_header,
...@@ -454,13 +417,6 @@ const struct perf_formatter perf_formatters[] = { ...@@ -454,13 +417,6 @@ const struct perf_formatter perf_formatters[] = {
.iteration = tsv_print_perf_iteration, .iteration = tsv_print_perf_iteration,
.totals = tsv_print_perf_totals .totals = tsv_print_perf_totals
}, },
#if 0
[GNUPLOT] = {
.header = gnuplot_print_perf_header,
.iteration = gnuplot_print_perf_iteration,
.totals = gnuplot_print_perf_totals
}
#endif
}; };
static int get_env_open_flags(struct cli_args *args) { static int get_env_open_flags(struct cli_args *args) {
...@@ -531,7 +487,7 @@ static void *worker(void *arg_v) { ...@@ -531,7 +487,7 @@ static void *worker(void *arg_v) {
assert_zero(r); assert_zero(r);
arg->random_data = &random_data; arg->random_data = &random_data;
DB_ENV *env = arg->env; DB_ENV *env = arg->env;
DB_TXN *txn = NULL; DB_TXN *txn = nullptr;
if (verbose) { if (verbose) {
toku_pthread_t self = toku_pthread_self(); toku_pthread_t self = toku_pthread_self();
uintptr_t intself = (uintptr_t) self; uintptr_t intself = (uintptr_t) self;
...@@ -588,7 +544,6 @@ static void *worker(void *arg_v) { ...@@ -588,7 +544,6 @@ static void *worker(void *arg_v) {
return arg; return arg;
} }
typedef struct scan_cb_extra *SCAN_CB_EXTRA;
struct scan_cb_extra { struct scan_cb_extra {
bool fast; bool fast;
int64_t curr_sum; int64_t curr_sum;
...@@ -602,13 +557,13 @@ struct scan_op_extra { ...@@ -602,13 +557,13 @@ struct scan_op_extra {
}; };
static int static int
scan_cb(const DBT *a, const DBT *b, void *arg_v) { scan_cb(const DBT *key, const DBT *val, void *arg_v) {
SCAN_CB_EXTRA CAST_FROM_VOIDP(cb_extra, arg_v); struct scan_cb_extra *CAST_FROM_VOIDP(cb_extra, arg_v);
assert(a); assert(key);
assert(b); assert(val);
assert(cb_extra); assert(cb_extra);
assert(b->size >= sizeof(int)); assert(val->size >= sizeof(int64_t));
cb_extra->curr_sum += *(int *)b->data; cb_extra->curr_sum += *(int64_t *) val->data;
cb_extra->num_elements++; cb_extra->num_elements++;
return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0; return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0;
} }
...@@ -621,12 +576,13 @@ static int scan_op_and_maybe_check_sum( ...@@ -621,12 +576,13 @@ static int scan_op_and_maybe_check_sum(
) )
{ {
int r = 0; int r = 0;
DBC* cursor = NULL; DBC* cursor = nullptr;
struct scan_cb_extra e; struct scan_cb_extra e = {
e.fast = sce->fast; e.fast = sce->fast,
e.curr_sum = 0; e.curr_sum = 0,
e.num_elements = 0; e.num_elements = 0,
};
{ int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); } { int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); }
if (sce->prefetch) { if (sce->prefetch) {
...@@ -677,37 +633,68 @@ static int generate_row_for_put( ...@@ -677,37 +633,68 @@ static int generate_row_for_put(
return 0; return 0;
} }
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { static uint64_t breverse(uint64_t v)
return 0; // Effect: return the bits in i, reversed
// Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
// Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct.
{
uint64_t r = v; // r will be reversed bits of v; first get LSB of v
int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end
for (v >>= 1; v; v >>= 1) {
r <<= 1;
r |= v & 1;
s--;
}
r <<= s; // shift when v's highest bits are zero
return r;
} }
static int UU() xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { static void
size_t s = 256; fill_key_buf(uint64_t key, uint8_t *data, struct cli_args *args) {
void *p = toku_xmalloc(s); invariant(args->key_size >= MIN_KEY_SIZE);
toku_free(p); uint64_t *k = reinterpret_cast<uint64_t *>(data);
return 0; if (args->disperse_keys) {
*k = static_cast<uint64_t>(breverse(key));
} else {
*k = key;
}
if (args->key_size > sizeof(uint64_t)) {
memset(data + sizeof(uint64_t), 0, args->key_size - sizeof(uint64_t));
}
} }
#if DONT_DEPRECATE_MALLOC static void
static int UU() malloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) { fill_key_buf_random(struct random_data *random_data, uint8_t *data, ARG arg) {
size_t s = 256; uint64_t key = randu64(random_data);
void *p = malloc(s); if (arg->bounded_element_range && arg->cli->num_elements > 0) {
free(p); key = key % arg->cli->num_elements;
return 0; }
fill_key_buf(key, data, arg->cli);
}
static void
fill_val_buf(int64_t val, uint8_t *data, uint32_t val_size) {
invariant(val_size >= MIN_VAL_SIZE);
int64_t *v = reinterpret_cast<int64_t *>(data);
*v = val;
if (val_size > sizeof(int64_t)) {
memset(data + sizeof(int64_t), 0, val_size - sizeof(int64_t));
}
} }
#endif
// Fill array with compressibility*size 0s. // Fill array with compressibility*size 0s.
// 0.0<=compressibility<=1.0 // 0.0<=compressibility<=1.0
// Compressibility is the fraction of size that will be 0s (e.g. approximate fraction that will be compressed away). // Compressibility is the fraction of size that will be 0s (e.g. approximate fraction that will be compressed away).
// The rest will be random data. // The rest will be random data.
static void static void
fill_zeroed_array(uint8_t *data, uint32_t size, struct random_data *random_data, double compressibility) { fill_val_buf_random(struct random_data *random_data, uint8_t *data, struct cli_args *args) {
invariant(args->val_size >= MIN_VAL_SIZE);
//Requires: The array was zeroed since the last time 'size' was changed. //Requires: The array was zeroed since the last time 'size' was changed.
//Requires: compressibility is in range [0,1] indicating fraction that should be zeros. //Requires: compressibility is in range [0,1] indicating fraction that should be zeros.
uint32_t num_random_bytes = (1 - compressibility) * size; // Fill in the random bytes
uint32_t num_random_bytes = (1 - args->compressibility) * args->val_size;
if (num_random_bytes > 0) { if (num_random_bytes > 0) {
uint32_t filled; uint32_t filled;
for (filled = 0; filled + sizeof(uint64_t) <= num_random_bytes; filled += sizeof(uint64_t)) { for (filled = 0; filled + sizeof(uint64_t) <= num_random_bytes; filled += sizeof(uint64_t)) {
...@@ -718,39 +705,28 @@ fill_zeroed_array(uint8_t *data, uint32_t size, struct random_data *random_data, ...@@ -718,39 +705,28 @@ fill_zeroed_array(uint8_t *data, uint32_t size, struct random_data *random_data,
memcpy(&data[filled], &last8, num_random_bytes - filled); memcpy(&data[filled], &last8, num_random_bytes - filled);
} }
} }
}
static inline size_t // Fill in the zero bytes
size_t_max(size_t a, size_t b) { if (num_random_bytes < args->val_size) {
return (a > b) ? a : b; memset(data + num_random_bytes, 0, args->val_size - num_random_bytes);
}
} }
static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, void *stats_extra) { static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, void *stats_extra) {
int r = 0; int r = 0;
uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))]; uint8_t keybuf[arg->cli->key_size];
uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b;
uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b;
ZERO_ARRAY(rand_key_b);
uint8_t valbuf[arg->cli->val_size]; uint8_t valbuf[arg->cli->val_size];
ZERO_ARRAY(valbuf);
DBT key, val;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf);
const int put_flags = get_put_flags(arg->cli);
uint64_t puts_to_increment = 0; uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) { for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
rand_key_key[0] = randu64(arg->random_data); fill_key_buf_random(arg->random_data, keybuf, arg);
if (arg->cli->interleave) { fill_val_buf_random(arg->random_data, valbuf, arg->cli);
rand_key_i[3] = arg->thread_idx; r = db->put(db, txn, &key, &val, put_flags);
} else {
rand_key_i[0] = arg->thread_idx;
}
if (arg->cli->num_elements > 0 && arg->bounded_element_range) {
rand_key_key[0] = rand_key_key[0] % arg->cli->num_elements;
}
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b);
dbt_init(&val, valbuf, sizeof valbuf);
int flags = get_put_flags(arg->cli);
r = db->put(db, txn, &key, &val, flags);
if (!ignore_errors && r != 0) { if (!ignore_errors && r != 0) {
goto cleanup; goto cleanup;
} }
...@@ -760,6 +736,7 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, vo ...@@ -760,6 +736,7 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, vo
puts_to_increment = 0; puts_to_increment = 0;
} }
} }
cleanup: cleanup:
increment_counter(stats_extra, PUTS, puts_to_increment); increment_counter(stats_extra, PUTS, puts_to_increment);
return r; return r;
...@@ -788,22 +765,19 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void ...@@ -788,22 +765,19 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int r = 0; int r = 0;
uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))]; uint8_t keybuf[arg->cli->key_size];
uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b;
uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b;
ZERO_ARRAY(rand_key_b);
uint8_t valbuf[arg->cli->val_size]; uint8_t valbuf[arg->cli->val_size];
ZERO_ARRAY(valbuf);
DBT key, val;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf);
const int put_flags = get_put_flags(arg->cli);
uint64_t puts_to_increment = 0; uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) { for (uint64_t i = 0; i < arg->cli->txn_size; ++i) {
rand_key_key[0] = extra->current++; fill_key_buf(i, keybuf, arg->cli);
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility); fill_val_buf_random(arg->random_data, valbuf, arg->cli);
DBT key, val; r = db->put(db, txn, &key, &val, put_flags);
dbt_init(&key, &rand_key_b, sizeof rand_key_b);
dbt_init(&val, valbuf, sizeof valbuf);
int flags = get_put_flags(arg->cli);
r = db->put(db, txn, &key, &val, flags);
if (r != 0) { if (r != 0) {
goto cleanup; goto cleanup;
} }
...@@ -813,6 +787,7 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void ...@@ -813,6 +787,7 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
puts_to_increment = 0; puts_to_increment = 0;
} }
} }
cleanup: cleanup:
increment_counter(stats_extra, PUTS, puts_to_increment); increment_counter(stats_extra, PUTS, puts_to_increment);
return r; return r;
...@@ -827,42 +802,44 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v ...@@ -827,42 +802,44 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v
uint32_t dbt_flags = 0; uint32_t dbt_flags = 0;
r = db_create(&db_load, env, 0); r = db_create(&db_load, env, 0);
assert(r == 0); assert(r == 0);
r = db_load->open(db_load, txn, "loader-db", NULL, DB_BTREE, DB_CREATE, 0666); r = db_load->open(db_load, txn, "loader-db", nullptr, DB_BTREE, DB_CREATE, 0666);
assert(r == 0); assert(r == 0);
DB_LOADER *loader; DB_LOADER *loader;
uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES; uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES;
r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags); r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags);
CKERR(r); CKERR(r);
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size];
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf);
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
DBT key, val; fill_key_buf(i, keybuf, arg->cli);
int rand_key = i; fill_val_buf_random(arg->random_data, valbuf, arg->cli);
int rand_val = myrandom_r(arg->random_data);
dbt_init(&key, &rand_key, sizeof(rand_key));
dbt_init(&val, &rand_val, sizeof(rand_val));
r = loader->put(loader, &key, &val); CKERR(r); r = loader->put(loader, &key, &val); CKERR(r);
} }
r = loader->close(loader); CKERR(r); r = loader->close(loader); CKERR(r);
r = db_load->close(db_load, 0); CKERR(r); r = db_load->close(db_load, 0); CKERR(r);
r = env->dbremove(env, txn, "loader-db", NULL, 0); CKERR(r); r = env->dbremove(env, txn, "loader-db", nullptr, 0); CKERR(r);
} }
return 0; return 0;
} }
static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r; // Pick a random DB, do a keyrange operation.
// callback is designed to run on tests with one DB
// no particular reason why, just the way it was
// originally done
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) { int r = 0;
rand_key = rand_key % arg->cli->num_elements; uint8_t keybuf[arg->cli->key_size];
}
DBT key; DBT key;
dbt_init(&key, &rand_key, sizeof rand_key); dbt_init(&key, keybuf, sizeof keybuf);
fill_key_buf_random(arg->random_data, keybuf, arg);
uint64_t less,equal,greater; uint64_t less,equal,greater;
int is_exact; int is_exact;
r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact); r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact);
...@@ -890,27 +867,6 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra ...@@ -890,27 +867,6 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra
return r; return r;
} }
struct lock_escalation_op_extra {
// sleep somewhere between these times before running escalation.
// this will add some chaos into the mix.
uint64_t min_sleep_time_micros;
uint64_t max_sleep_time_micros;
};
static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_extra, void *UU(stats_extra)) {
struct lock_escalation_op_extra *CAST_FROM_VOIDP(extra, operation_extra);
if (extra->max_sleep_time_micros > 0) {
invariant(extra->max_sleep_time_micros >= extra->min_sleep_time_micros);
uint64_t extra_sleep_time = (extra->max_sleep_time_micros - extra->min_sleep_time_micros) + 1;
uint64_t sleep_time = extra->min_sleep_time_micros + (myrandom_r(arg->random_data) % extra_sleep_time);
usleep(sleep_time);
}
if (!arg->cli->nolocktree) {
toku_env_run_lock_escalation_for_test(arg->env);
}
return 0;
}
static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) { static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) {
struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra); struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
...@@ -967,23 +923,24 @@ static int dbt_do_nothing (DBT const *UU(key), DBT const *UU(row), void *UU(con ...@@ -967,23 +923,24 @@ static int dbt_do_nothing (DBT const *UU(key), DBT const *UU(row), void *UU(con
} }
static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) { static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) {
int r; int r = 0;
int rand_key = myrandom_r(arg->random_data); uint8_t keybuf[arg->cli->key_size];
if (arg->bounded_element_range) {
rand_key = rand_key % arg->cli->num_elements;
}
DBT key, val; DBT key, val;
dbt_init(&key, &rand_key, sizeof rand_key); dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, NULL, 0); dbt_init(&val, nullptr, 0);
fill_key_buf_random(arg->random_data, keybuf, arg);
r = db->getf_set( r = db->getf_set(
db, db,
txn, txn,
0, 0,
&key, &key,
dbt_do_nothing, dbt_do_nothing,
NULL nullptr
); );
if (check) assert(r != DB_NOTFOUND); if (check) {
assert(r != DB_NOTFOUND);
}
r = 0; r = 0;
return r; return r;
} }
...@@ -1011,7 +968,7 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext ...@@ -1011,7 +968,7 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext
static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0; int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
DBC* cursor = NULL; DBC* cursor = nullptr;
int r = db->cursor(db, txn, &cursor, 0); assert(r == 0); int r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
r = cursor->c_close(cursor); assert(r == 0); r = cursor->c_close(cursor); assert(r == 0);
return 0; return 0;
...@@ -1060,14 +1017,14 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key), ...@@ -1060,14 +1017,14 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key),
void *set_extra), void *set_extra),
void *set_extra) void *set_extra)
{ {
int old_int_val = 0; int64_t old_int_val = 0;
if (old_val) { if (old_val) {
old_int_val = *(int*)old_val->data; old_int_val = *(int64_t *) old_val->data;
} }
assert(extra->size == sizeof(struct update_op_extra)); assert(extra->size == sizeof(struct update_op_extra));
struct update_op_extra *CAST_FROM_VOIDP(e, extra->data); struct update_op_extra *CAST_FROM_VOIDP(e, extra->data);
int new_int_val; int64_t new_int_val;
switch (e->type) { switch (e->type) {
case UPDATE_ADD_DIFF: case UPDATE_ADD_DIFF:
new_int_val = old_int_val + e->u.d.diff; new_int_val = old_int_val + e->u.d.diff;
...@@ -1083,53 +1040,58 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key), ...@@ -1083,53 +1040,58 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key),
assert(false); assert(false);
} }
uint32_t val_size = sizeof(int64_t) + e->pad_bytes;
uint8_t valbuf[val_size];
fill_val_buf(new_int_val, valbuf, val_size);
DBT new_val; DBT new_val;
uint32_t data_size = sizeof(int) + e->pad_bytes; dbt_init(&new_val, valbuf, val_size);
char* data [data_size]; set_val(&new_val, set_extra);
ZERO_ARRAY(data);
memcpy(data, &new_int_val, sizeof(new_int_val));
set_val(dbt_init(&new_val, data, data_size), set_extra);
return 0; return 0;
} }
static int UU()update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int curr_val_sum = 0;
int r = 0;
DBT key, val; DBT key, val;
int rand_key; uint8_t keybuf[arg->cli->key_size];
int rand_key2;
toku_sync_fetch_and_add(&update_count, 1); toku_sync_fetch_and_add(&update_count, 1);
struct update_op_extra extra; struct update_op_extra extra;
ZERO_STRUCT(extra); ZERO_STRUCT(extra);
extra.type = UPDATE_ADD_DIFF; extra.type = UPDATE_ADD_DIFF;
extra.pad_bytes = 0; extra.pad_bytes = 0;
int64_t curr_val_sum = 0;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
for (uint32_t i = 0; i < arg->cli->txn_size; i++) { for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
rand_key = myrandom_r(arg->random_data); fill_key_buf_random(arg->random_data, keybuf, arg);
if (arg->bounded_element_range) {
rand_key = rand_key % (arg->cli->num_elements/2);
}
rand_key2 = arg->cli->num_elements - rand_key;
assert(rand_key != rand_key2);
extra.u.d.diff = 1; extra.u.d.diff = 1;
curr_val_sum += extra.u.d.diff; curr_val_sum += extra.u.d.diff;
r = db->update( r = db->update(
db, db,
txn, txn,
dbt_init(&key, &rand_key, sizeof rand_key), &key,
dbt_init(&val, &extra, sizeof extra), &val,
0 0
); );
if (r != 0) { if (r != 0) {
return r; return r;
} }
int64_t rand_key = *(int64_t *) keybuf;
invariant(rand_key != (arg->cli->num_elements - rand_key));
rand_key -= arg->cli->num_elements;
fill_key_buf(rand_key, keybuf, arg->cli);
extra.u.d.diff = -1; extra.u.d.diff = -1;
r = db->update( r = db->update(
db, db,
txn, txn,
dbt_init(&key, &rand_key2, sizeof rand_key), &key,
dbt_init(&val, &extra, sizeof extra), &val,
0 0
); );
if (r != 0) { if (r != 0) {
...@@ -1156,10 +1118,6 @@ static int pre_acquire_write_lock(DB *db, DB_TXN *txn, ...@@ -1156,10 +1118,6 @@ static int pre_acquire_write_lock(DB *db, DB_TXN *txn,
// take the given db and do an update on it // take the given db and do an update on it
static int static int
UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) { UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
int r = 0;
int curr_val_sum = 0;
DBT key, val;
int update_key;
uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1); uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1);
struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra); struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
struct update_op_extra extra; struct update_op_extra extra;
...@@ -1171,7 +1129,14 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1171,7 +1129,14 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
extra.pad_bytes = 100; extra.pad_bytes = 100;
} }
} }
int r = 0;
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
int64_t update_key;
int64_t curr_val_sum = 0;
const int update_flags = arg->cli->prelock_updates ? DB_PRELOCKED_WRITE : 0; const int update_flags = arg->cli->prelock_updates ? DB_PRELOCKED_WRITE : 0;
for (uint32_t i = 0; i < arg->cli->txn_size; i++) { for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
if (arg->prelock_updates) { if (arg->prelock_updates) {
if (i == 0) { if (i == 0) {
...@@ -1180,9 +1145,9 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1180,9 +1145,9 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
update_key = update_key % arg->cli->num_elements; update_key = update_key % arg->cli->num_elements;
} }
const uint32_t max_key_in_table = arg->cli->num_elements - 1; const int64_t max_key_in_table = arg->cli->num_elements - 1;
const bool range_wraps = (update_key + arg->cli->txn_size - 1) > max_key_in_table; const bool range_wraps = (update_key + arg->cli->txn_size - 1) > max_key_in_table;
int left_key, right_key; int64_t left_key, right_key;
DBT left_key_dbt, right_key_dbt; DBT left_key_dbt, right_key_dbt;
// acquire the range starting at the random key, plus txn_size - 1 // acquire the range starting at the random key, plus txn_size - 1
...@@ -1218,15 +1183,16 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1218,15 +1183,16 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
} }
} else { } else {
update_key++; update_key++;
if (arg->bounded_element_range) {
update_key = update_key % arg->cli->num_elements;
}
} }
fill_key_buf(update_key, keybuf, arg->cli);
} else { } else {
// just do a usual, random point update without locking first // just do a usual, random point update without locking first
update_key = myrandom_r(arg->random_data); fill_key_buf_random(arg->random_data, keybuf, arg);
} }
if (arg->bounded_element_range) {
update_key = update_key % arg->cli->num_elements;
}
// the last update keeps the table's sum as zero // the last update keeps the table's sum as zero
// every other update except the last applies a random delta // every other update except the last applies a random delta
...@@ -1241,12 +1207,15 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU( ...@@ -1241,12 +1207,15 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
curr_val_sum += extra.u.d.diff; curr_val_sum += extra.u.d.diff;
} }
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
// do the update // do the update
r = db->update( r = db->update(
db, db,
txn, txn,
dbt_init(&key, &update_key, sizeof update_key), &key,
dbt_init(&val, &extra, sizeof extra), &val,
update_flags update_flags
); );
if (r != 0) { if (r != 0) {
...@@ -1269,12 +1238,11 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext ...@@ -1269,12 +1238,11 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra); struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
assert(arg->bounded_element_range); assert(arg->bounded_element_range);
assert(op_args->update_history_buffer); assert(op_args->update_history_buffer);
int r;
int r = 0;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
int curr_val_sum = 0;
DBT key, val;
int rand_key;
struct update_op_extra extra; struct update_op_extra extra;
ZERO_STRUCT(extra); ZERO_STRUCT(extra);
extra.type = UPDATE_WITH_HISTORY; extra.type = UPDATE_WITH_HISTORY;
...@@ -1285,47 +1253,43 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext ...@@ -1285,47 +1253,43 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
extra.pad_bytes = 500; extra.pad_bytes = 500;
} }
} }
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
int64_t rand_key;
int64_t curr_val_sum = 0;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
for (uint32_t i = 0; i < arg->cli->txn_size; i++) { for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
rand_key = myrandom_r(arg->random_data) % arg->cli->num_elements; fill_key_buf_random(arg->random_data, keybuf, arg);
extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL; rand_key = *(int64_t *) keybuf;
// just make every other value random invariant(rand_key < arg->cli->num_elements);
if (i%2 == 0) { if (i < arg->cli->txn_size - 1) {
extra.u.h.new_val = -extra.u.h.new_val; extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
// just make every other value random
if (i % 2 == 0) {
extra.u.h.new_val = -extra.u.h.new_val;
}
curr_val_sum += extra.u.h.new_val;
} else {
// the last update should ensure the sum stays zero
extra.u.h.new_val = -curr_val_sum;
} }
curr_val_sum += extra.u.h.new_val;
extra.u.h.expected = op_args->update_history_buffer[rand_key]; extra.u.h.expected = op_args->update_history_buffer[rand_key];
op_args->update_history_buffer[rand_key] = extra.u.h.new_val; op_args->update_history_buffer[rand_key] = extra.u.h.new_val;
r = db->update( r = db->update(
db, db,
txn, txn,
dbt_init(&key, &rand_key, sizeof rand_key), &key,
dbt_init(&val, &extra, sizeof extra), &val,
0 0
); );
if (r != 0) { if (r != 0) {
return r; return r;
} }
} }
//
// now put in one more to ensure that the sum stays 0
//
extra.u.h.new_val = -curr_val_sum;
rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % arg->cli->num_elements;
}
extra.u.h.expected = op_args->update_history_buffer[rand_key];
op_args->update_history_buffer[rand_key] = extra.u.h.new_val;
r = db->update(
db,
txn,
dbt_init(&key, &rand_key, sizeof rand_key),
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0) {
return r;
}
return r; return r;
} }
...@@ -1351,7 +1315,7 @@ static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra), ...@@ -1351,7 +1315,7 @@ static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra),
int r; int r;
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) { for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
DB* db = arg->dbp[i]; DB* db = arg->dbp[i];
r = db->hot_optimize(db, hot_progress_callback, NULL); r = db->hot_optimize(db, hot_progress_callback, nullptr);
if (run_test) { if (run_test) {
CKERR(r); CKERR(r);
} }
...@@ -1364,6 +1328,8 @@ get_ith_table_name(char *buf, size_t len, int i) { ...@@ -1364,6 +1328,8 @@ get_ith_table_name(char *buf, size_t len, int i) {
snprintf(buf, len, "main%d", i); snprintf(buf, len, "main%d", i);
} }
DB_TXN * const null_txn = 0;
static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) { static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r; int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
...@@ -1374,12 +1340,12 @@ static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operat ...@@ -1374,12 +1340,12 @@ static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operat
ZERO_ARRAY(name); ZERO_ARRAY(name);
get_ith_table_name(name, sizeof(name), db_index); get_ith_table_name(name, sizeof(name), db_index);
r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0); r = arg->env->dbremove(arg->env, null_txn, name, nullptr, 0);
CKERR(r); CKERR(r);
r = db_create(&(arg->dbp[db_index]), arg->env, 0); r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0); assert(r == 0);
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666); r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, nullptr, DB_BTREE, DB_CREATE, 0666);
assert(r == 0); assert(r == 0);
return 0; return 0;
} }
...@@ -1464,6 +1430,7 @@ struct sleep_and_crash_extra { ...@@ -1464,6 +1430,7 @@ struct sleep_and_crash_extra {
bool is_setup; bool is_setup;
bool threads_have_joined; bool threads_have_joined;
}; };
static void *sleep_and_crash(void *extra) { static void *sleep_and_crash(void *extra) {
sleep_and_crash_extra *e = static_cast<sleep_and_crash_extra *>(extra); sleep_and_crash_extra *e = static_cast<sleep_and_crash_extra *>(extra);
toku_mutex_lock(&e->mutex); toku_mutex_lock(&e->mutex);
...@@ -1508,7 +1475,7 @@ static int run_workers( ...@@ -1508,7 +1475,7 @@ static int run_workers(
int r; int r;
const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format]; const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format];
toku_mutex_t mutex = ZERO_MUTEX_INITIALIZER; toku_mutex_t mutex = ZERO_MUTEX_INITIALIZER;
toku_mutex_init(&mutex, NULL); toku_mutex_init(&mutex, nullptr);
struct rwlock rwlock; struct rwlock rwlock;
rwlock_init(&rwlock); rwlock_init(&rwlock);
toku_pthread_t tids[num_threads]; toku_pthread_t tids[num_threads];
...@@ -1533,11 +1500,11 @@ static int run_workers( ...@@ -1533,11 +1500,11 @@ static int run_workers(
worker_extra[i].operation_lock_mutex = &mutex; worker_extra[i].operation_lock_mutex = &mutex;
XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters); XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters);
TOKU_DRD_IGNORE_VAR(worker_extra[i].counters); TOKU_DRD_IGNORE_VAR(worker_extra[i].counters);
{ int chk_r = toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]); CKERR(chk_r); } { int chk_r = toku_pthread_create(&tids[i], nullptr, worker, &worker_extra[i]); CKERR(chk_r); }
if (verbose) if (verbose)
printf("%lu created\n", (unsigned long) tids[i]); printf("%lu created\n", (unsigned long) tids[i]);
} }
{ int chk_r = toku_pthread_create(&time_tid, NULL, test_time, &tte); CKERR(chk_r); } { int chk_r = toku_pthread_create(&time_tid, nullptr, test_time, &tte); CKERR(chk_r); }
if (verbose) if (verbose)
printf("%lu created\n", (unsigned long) time_tid); printf("%lu created\n", (unsigned long) time_tid);
...@@ -1660,131 +1627,22 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1660,131 +1627,22 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = db->set_readpagesize(db, env_args.basement_node_size); r = db->set_readpagesize(db, env_args.basement_node_size);
CKERR(r); CKERR(r);
const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0); const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0);
r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666); r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666);
CKERR(r); CKERR(r);
db_res[i] = db; db_res[i] = db;
} }
return r; return r;
} }
static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bufsz, int val_bufsz, static void report_overall_fill_table_progress(struct cli_args *args, int num_rows) {
void (*callback)(int idx, void *extra,
void *key, int *keysz,
void *val, int *valsz),
void *extra, void (*progress_cb)(int num_rows)) {
DB_TXN *txn = nullptr;
const int puts_per_txn = 100000;
int r = 0;
for (long i = 0; i < num_elements; ++i) {
if (txn == nullptr) {
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
}
char keybuf[key_bufsz], valbuf[val_bufsz];
memset(keybuf, 0, sizeof(keybuf));
memset(valbuf, 0, sizeof(valbuf));
int keysz, valsz;
callback(i, extra, keybuf, &keysz, valbuf, &valsz);
// let's make sure the data stored fits in the buffers we passed in
assert(keysz <= key_bufsz);
assert(valsz <= val_bufsz);
DBT key, val;
// make size of data what is specified w/input parameters
// note that key and val have sizes of
// key_bufsz and val_bufsz, which were passed into this
// function, not what was stored by the callback
r = db->put(
db,
txn,
dbt_init(&key, keybuf, key_bufsz),
dbt_init(&val, valbuf, val_bufsz),
// don't bother taking locks in the locktree
DB_PRELOCKED_WRITE
);
assert(r == 0);
if (i > 0 && i % puts_per_txn == 0) {
// don't bother fsyncing to disk.
// the caller can checkpoint if they want.
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
txn = nullptr;
if (verbose) {
progress_cb(puts_per_txn);
}
}
}
if (txn) {
r = txn->commit(txn, DB_TXN_NOSYNC);
invariant_zero(r);
}
return r;
}
static uint32_t breverse(uint32_t v)
// Effect: return the bits in i, reversed
// Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
// Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct.
{
uint32_t r = v; // r will be reversed bits of v; first get LSB of v
int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end
for (v >>= 1; v; v >>= 1) {
r <<= 1;
r |= v & 1;
s--;
}
r <<= s; // shift when v's highest bits are zero
return r;
}
static void zero_element_callback(int idx, void *extra, void *keyv, int *keysz, void *valv, int *valsz) {
const bool *disperse_keys = static_cast<bool *>(extra);
int *CAST_FROM_VOIDP(key, keyv);
int *CAST_FROM_VOIDP(val, valv);
if (*disperse_keys) {
*key = static_cast<int>(breverse(idx));
} else {
*key = idx;
}
*val = 0;
*keysz = sizeof(int);
*valsz = sizeof(int);
}
struct fill_table_worker_info {
DB_ENV *env;
DB *db;
int num_elements;
uint32_t key_size;
uint32_t val_size;
bool disperse_keys;
void (*progress_cb)(int num_rows);
};
static void fill_table_worker(void *arg) {
struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
int r = fill_table_from_fun(
info->env,
info->db,
info->num_elements,
info->key_size,
info->val_size,
zero_element_callback,
&info->disperse_keys,
info->progress_cb
);
invariant_zero(r);
toku_free(info);
}
static int num_tables_to_fill = 1;
static int rows_per_table = 1;
static void report_overall_fill_table_progress(int num_rows) {
// for sanitary reasons we'd like to prevent two threads // for sanitary reasons we'd like to prevent two threads
// from printing the same performance report twice. // from printing the same performance report twice.
static bool reporting; static bool reporting;
// when was the first time measurement taken? // when was the first time measurement taken?
static uint64_t t0; static uint64_t t0;
static int rows_inserted; static int rows_inserted;
// when was the last report? what was its progress? // when was the last report? what was its progress?
static uint64_t last_report; static uint64_t last_report;
static double last_progress; static double last_progress;
...@@ -1794,12 +1652,9 @@ static void report_overall_fill_table_progress(int num_rows) { ...@@ -1794,12 +1652,9 @@ static void report_overall_fill_table_progress(int num_rows) {
} }
uint64_t rows_so_far = toku_sync_add_and_fetch(&rows_inserted, num_rows); uint64_t rows_so_far = toku_sync_add_and_fetch(&rows_inserted, num_rows);
double progress = rows_so_far / double progress = rows_so_far / (args->num_elements * args->num_DBs * 1.0);
(rows_per_table * num_tables_to_fill * 1.0);
if (progress > (last_progress + .01)) { if (progress > (last_progress + .01)) {
uint64_t t1 = toku_current_time_microsec(); uint64_t t1 = toku_current_time_microsec();
// report no more often than once every 5 seconds, for less output.
// there is a race condition. it is probably harmless.
const uint64_t minimum_report_period = 5 * 1000000; const uint64_t minimum_report_period = 5 * 1000000;
if (t1 > last_report + minimum_report_period if (t1 > last_report + minimum_report_period
&& toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) { && toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) {
...@@ -1813,24 +1668,68 @@ static void report_overall_fill_table_progress(int num_rows) { ...@@ -1813,24 +1668,68 @@ static void report_overall_fill_table_progress(int num_rows) {
} }
} }
static int fill_tables_with_zeroes(DB_ENV *env, DB **dbs, int num_DBs, int num_elements, uint32_t key_size, uint32_t val_size, bool disperse_keys) { static void fill_single_table(DB_ENV *env, DB *db, struct cli_args *args, bool fill_with_zeroes) {
// set the static globals that the progress reporter uses const int puts_per_txn = 100000;
num_tables_to_fill = num_DBs;
rows_per_table = num_elements; int r = 0;
DB_TXN *txn = nullptr;
struct random_data random_data;
char random_buf[8];
r = myinitstate_r(random(), random_buf, 8, &random_data); CKERR(r);
uint8_t keybuf[args->key_size], valbuf[args->val_size];
memset(keybuf, 0, sizeof keybuf);
memset(valbuf, 0, sizeof valbuf);
DBT key, val;
dbt_init(&key, keybuf, args->key_size);
dbt_init(&val, valbuf, args->val_size);
for (int i = 0; i < args->num_elements; i++) {
if (txn == nullptr) {
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
}
fill_key_buf(i, keybuf, args);
if (!fill_with_zeroes) {
fill_val_buf_random(&random_data, valbuf, args);
}
r = db->put(db, txn, &key, &val, DB_PRELOCKED_WRITE); CKERR(r);
if (i > 0 && i % puts_per_txn == 0) {
// don't bother fsyncing to disk.
// the caller can checkpoint if they want.
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
txn = nullptr;
if (verbose) {
report_overall_fill_table_progress(args, puts_per_txn);
}
}
}
if (txn) {
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
}
}
struct fill_table_worker_info {
struct cli_args *args;
DB_ENV *env;
DB *db;
bool fill_with_zeroes;
};
static void fill_table_worker(void *arg) {
struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
fill_single_table(info->env, info->db, info->args, info->fill_with_zeroes);
toku_free(info);
}
static int fill_tables(DB_ENV *env, DB **dbs, struct cli_args *args, bool fill_with_zeroes) {
const int num_cores = toku_os_get_number_processors(); const int num_cores = toku_os_get_number_processors();
KIBBUTZ kibbutz = toku_kibbutz_create(num_cores); KIBBUTZ kibbutz = toku_kibbutz_create(num_cores);
for (int i = 0; i < num_DBs; i++) { for (int i = 0; i < args->num_DBs; i++) {
assert(key_size >= sizeof(int));
assert(val_size >= sizeof(int));
struct fill_table_worker_info *XCALLOC(info); struct fill_table_worker_info *XCALLOC(info);
info->env = env; info->env = env;
info->db = dbs[i]; info->db = dbs[i];
info->num_elements = num_elements; info->args = args;
info->key_size = key_size; info->fill_with_zeroes = fill_with_zeroes;
info->val_size = val_size;
info->disperse_keys = disperse_keys;
info->progress_cb = report_overall_fill_table_progress;
toku_kibbutz_enq(kibbutz, fill_table_worker, info); toku_kibbutz_enq(kibbutz, fill_table_worker, info);
} }
toku_kibbutz_destroy(kibbutz); toku_kibbutz_destroy(kibbutz);
...@@ -1865,7 +1764,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1865,7 +1764,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
int r; int r;
struct env_args env_args = cli_args->env_args; struct env_args env_args = cli_args->env_args;
/* create the dup database file */
DB_ENV *env; DB_ENV *env;
db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes); db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes);
r = db_env_create(&env, 0); assert(r == 0); r = db_env_create(&env, 0); assert(r == 0);
...@@ -1873,7 +1771,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1873,7 +1771,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = env->set_default_bt_compare(env, bt_compare); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r); r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r);
env->set_update(env, env_args.update_function); env->set_update(env, env_args.update_function);
// set the cache size to 10MB
r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r); r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r);
if (env_args.generate_put_callback) { if (env_args.generate_put_callback) {
...@@ -1904,7 +1801,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1904,7 +1801,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = db_create(&db, env, 0); r = db_create(&db, env, 0);
CKERR(r); CKERR(r);
const int flags = cli_args->blackhole ? DB_BLACKHOLE : 0; const int flags = cli_args->blackhole ? DB_BLACKHOLE : 0;
r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666); r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666);
CKERR(r); CKERR(r);
db_res[i] = db; db_res[i] = db;
} }
...@@ -1932,8 +1829,8 @@ static const struct env_args DEFAULT_ENV_ARGS = { ...@@ -1932,8 +1829,8 @@ static const struct env_args DEFAULT_ENV_ARGS = {
.num_bucket_mutexes = 1024, .num_bucket_mutexes = 1024,
.envdir = ENVDIR, .envdir = ENVDIR,
.update_function = update_op_callback, .update_function = update_op_callback,
.generate_put_callback = NULL, .generate_put_callback = nullptr,
.generate_del_callback = NULL, .generate_del_callback = nullptr,
}; };
static const struct env_args DEFAULT_PERF_ENV_ARGS = { static const struct env_args DEFAULT_PERF_ENV_ARGS = {
...@@ -1947,15 +1844,11 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = { ...@@ -1947,15 +1844,11 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = {
.cachetable_size = 1<<30, .cachetable_size = 1<<30,
.num_bucket_mutexes = 1024 * 1024, .num_bucket_mutexes = 1024 * 1024,
.envdir = ENVDIR, .envdir = ENVDIR,
.update_function = NULL, .update_function = nullptr,
.generate_put_callback = NULL, .generate_put_callback = nullptr,
.generate_del_callback = NULL, .generate_del_callback = nullptr,
}; };
#define MIN_VAL_SIZE sizeof(int)
#define MIN_KEY_SIZE sizeof(int)
#define MIN_COMPRESSIBILITY (0.0)
#define MAX_COMPRESSIBILITY (1.0)
static struct cli_args UU() get_default_args(void) { static struct cli_args UU() get_default_args(void) {
struct cli_args DEFAULT_ARGS = { struct cli_args DEFAULT_ARGS = {
.num_elements = 150000, .num_elements = 150000,
...@@ -2327,7 +2220,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -2327,7 +2220,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
#define LOCAL_STRING_ARG(name_string, variable, default) \ #define LOCAL_STRING_ARG(name_string, variable, default) \
MAKE_LOCAL_ARG(name_string, type_string, s, default, variable, "", "", "") MAKE_LOCAL_ARG(name_string, type_string, s, default, variable, "", "", "")
const char *perf_format_s = NULL; const char *perf_format_s = nullptr;
struct arg_type arg_types[] = { struct arg_type arg_types[] = {
INT32_ARG_NONNEG("--num_elements", num_elements, ""), INT32_ARG_NONNEG("--num_elements", num_elements, ""),
INT32_ARG_NONNEG("--num_DBs", num_DBs, ""), INT32_ARG_NONNEG("--num_DBs", num_DBs, ""),
...@@ -2439,20 +2332,15 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -2439,20 +2332,15 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
} }
} }
} }
if (perf_format_s != NULL) { if (perf_format_s != nullptr) {
if (!strcmp(perf_format_s, "human")) { if (!strcmp(perf_format_s, "human")) {
args->perf_output_format = HUMAN; args->perf_output_format = HUMAN;
} else if (!strcmp(perf_format_s, "csv")) { } else if (!strcmp(perf_format_s, "csv")) {
args->perf_output_format = CSV; args->perf_output_format = CSV;
} else if (!strcmp(perf_format_s, "tsv")) { } else if (!strcmp(perf_format_s, "tsv")) {
args->perf_output_format = TSV; args->perf_output_format = TSV;
#if 0
} else if (!strcmp(perf_format_s, "gnuplot")) {
args->perf_output_format = GNUPLOT;
#endif
} else { } else {
fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", and \"tsv\"\n"); fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", and \"tsv\"\n");
//fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", \"tsv\", and \"gnuplot\"\n");
do_usage(argv0, num_arg_types, arg_types); do_usage(argv0, num_arg_types, arg_types);
exit(EINVAL); exit(EINVAL);
} }
...@@ -2468,35 +2356,24 @@ static void ...@@ -2468,35 +2356,24 @@ static void
stress_table(DB_ENV *, DB **, struct cli_args *); stress_table(DB_ENV *, DB **, struct cli_args *);
static int static int
UU() stress_int_dbt_cmp (DB *db, const DBT *a, const DBT *b) { stress_dbt_cmp(DB *db, const DBT *a, const DBT *b) {
assert(db && a && b); assert(db && a && b);
assert(a->size >= sizeof(int));
assert(b->size >= sizeof(int));
int x = *(int *) a->data;
int y = *(int *) b->data;
if (x<y) return -1;
if (x>y) return 1;
return 0;
}
static int // Keys are only compared by their first 8 bytes,
UU() stress_uint64_dbt_cmp(DB *db, const DBT *a, const DBT *b) { // interpreted as a little endian 64 bit integer..
assert(db && a && b); assert(a->size >= sizeof(int64_t));
assert(a->size >= sizeof(uint64_t)); assert(b->size >= sizeof(int64_t));
assert(b->size >= sizeof(uint64_t));
uint64_t x = *(uint64_t *) a->data; int64_t x = *(int64_t *) a->data;
uint64_t y = *(uint64_t *) b->data; int64_t y = *(int64_t *) b->data;
if (x < y) { if (x < y) {
return -1; return -1;
} } else if (x > y) {
if (x > y) {
return +1; return +1;
} else {
return 0;
} }
return 0;
} }
...@@ -2512,21 +2389,49 @@ do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args) ...@@ -2512,21 +2389,49 @@ do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args)
scan_arg.operation_extra = &soe; scan_arg.operation_extra = &soe;
scan_arg.operation = scan_op_no_check; scan_arg.operation = scan_op_no_check;
scan_arg.lock_type = STRESS_LOCK_NONE; scan_arg.lock_type = STRESS_LOCK_NONE;
DB_TXN* txn = NULL; DB_TXN* txn = nullptr;
// don't take serializable read locks when scanning. // don't take serializable read locks when scanning.
int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r); int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
// make sure the scan doesn't terminate early // make sure the scan doesn't terminate early
run_test = true; run_test = true;
// warm up each DB in parallel // warm up each DB in parallel
scan_op_no_check_parallel(txn, &scan_arg, &soe, NULL); scan_op_no_check_parallel(txn, &scan_arg, &soe, nullptr);
r = txn->commit(txn,0); CKERR(r); r = txn->commit(txn,0); CKERR(r);
} }
static void static void
UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, const DBT *, const DBT *)) UU() stress_recover(struct cli_args *args) {
DB_ENV* env = nullptr;
DB* dbs[args->num_DBs];
memset(dbs, 0, sizeof(dbs));
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_dbt_cmp,
args); CKERR(chk_r); }
DB_TXN* txn = nullptr;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r);
struct scan_op_extra soe;
soe.fast = true;
soe.fwd = true;
soe.prefetch = false;
// make sure the scan doesn't terminate early
run_test = true;
r = scan_op(txn, &recover_args, &soe, nullptr);
CKERR(r);
{ int chk_r = txn->commit(txn,0); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
}
static void
test_main(struct cli_args *args, bool fill_with_zeroes)
{ {
{ char *loc = setlocale(LC_NUMERIC, "en_US.UTF-8"); assert(loc); } { char *loc = setlocale(LC_NUMERIC, "en_US.UTF-8"); assert(loc); }
DB_ENV* env = NULL; DB_ENV* env = nullptr;
DB* dbs[args->num_DBs]; DB* dbs[args->num_DBs];
memset(dbs, 0, sizeof(dbs)); memset(dbs, 0, sizeof(dbs));
db_env_enable_engine_status(args->nocrashstatus ? false : true); db_env_enable_engine_status(args->nocrashstatus ? false : true);
...@@ -2535,17 +2440,17 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co ...@@ -2535,17 +2440,17 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co
&env, &env,
dbs, dbs,
args->num_DBs, args->num_DBs,
bt_compare, stress_dbt_cmp,
args args
); );
{ int chk_r = fill_tables_with_zeroes(env, dbs, args->num_DBs, args->num_elements, args->key_size, args->val_size, args->disperse_keys); CKERR(chk_r); } { int chk_r = fill_tables(env, dbs, args, fill_with_zeroes); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); } { int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
} }
if (!args->only_create) { if (!args->only_create) {
{ int chk_r = open_tables(&env, { int chk_r = open_tables(&env,
dbs, dbs,
args->num_DBs, args->num_DBs,
bt_compare, stress_dbt_cmp,
args); CKERR(chk_r); } args); CKERR(chk_r); }
if (args->warm_cache) { if (args->warm_cache) {
do_warm_cache(env, dbs, args); do_warm_cache(env, dbs, args);
...@@ -2556,37 +2461,17 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co ...@@ -2556,37 +2461,17 @@ UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, co
} }
static void static void
UU() stress_test_main(struct cli_args *args) UU() stress_test_main(struct cli_args *args) {
{ // Begin the test with fixed size values equal to zero.
stress_test_main_with_cmp(args, stress_int_dbt_cmp); // This is important for correctness testing.
test_main(args, true);
} }
static void static void
UU() stress_recover(struct cli_args *args) { UU() perf_test_main(struct cli_args *args) {
DB_ENV* env = NULL; // Do not begin the test by creating a table of all zeroes.
DB* dbs[args->num_DBs]; // We want to control the row size and its compressibility.
memset(dbs, 0, sizeof(dbs)); test_main(args, false);
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_int_dbt_cmp,
args); CKERR(chk_r); }
DB_TXN* txn = NULL;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r);
struct scan_op_extra soe;
soe.fast = true;
soe.fwd = true;
soe.prefetch = false;
// make sure the scan doesn't terminate early
run_test = true;
r = scan_op(txn, &recover_args, &soe, NULL);
CKERR(r);
{ int chk_r = txn->commit(txn,0); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
} }
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment