Commit 8be87854 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

back out change


git-svn-id: file:///svn/toku/tokudb@51266 c7de825b-a66e-492c-adef-691d508d4ae1
parent d48fef04
......@@ -42,6 +42,6 @@ int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
perf_test_main(&args);
stress_test_main(&args);
return 0;
}
......@@ -42,17 +42,17 @@ static int UU() iibench_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
}
int r = 0;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size];
dbt_init(&mult_key_dbt[0], keybuf, sizeof keybuf);
dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf);
ZERO_ARRAY(valbuf);
uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
fill_zeroed_array(valbuf, arg->cli->val_size,
arg->random_data, arg->cli->compressibility);
struct iibench_op_extra *CAST_FROM_VOIDP(info, operation_extra);
uint64_t pk = toku_sync_fetch_and_add(&info->autoincrement, 1);
fill_key_buf(pk, keybuf, arg->cli);
fill_val_buf_random(arg->random_data, valbuf, arg->cli);
dbt_init(&mult_key_dbt[0], &pk, sizeof pk);
dbt_init(&mult_val_dbt[0], valbuf, sizeof valbuf);
r = env->put_multiple(
env,
dbs[0], // source db.
......@@ -128,6 +128,6 @@ test_main(int argc, char *const argv[]) {
args.crash_on_operation_failure = false;
}
args.env_args.generate_put_callback = iibench_generate_row_for_put;
perf_test_main(&args);
stress_test_main_with_cmp(&args, stress_uint64_dbt_cmp);
return 0;
}
......@@ -51,6 +51,6 @@ test_main(int argc, char *const argv[]) {
if (args.num_put_threads > 1) {
args.crash_on_operation_failure = false;
}
perf_test_main(&args);
stress_test_main_with_cmp(&args, stress_uint64_dbt_cmp);
return 0;
}
......@@ -20,13 +20,6 @@
// The intent of this test is to measure the throughput of malloc and free
// with multiple threads.
static int xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
size_t s = 256;
void *p = toku_xmalloc(s);
toku_free(p);
return 0;
}
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
......@@ -34,7 +27,7 @@ stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = xmalloc_free_op;
myargs[i].operation = malloc_free_op;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
......@@ -43,6 +36,6 @@ int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
perf_test_main(&args);
stress_test_main(&args);
return 0;
}
......@@ -18,10 +18,6 @@
// The intent of this test is to measure the throughput of the test infrastructure executing a nop
// on multiple threads.
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
return 0;
}
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
......@@ -38,6 +34,6 @@ int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
perf_test_main(&args);
stress_test_main(&args);
return 0;
}
......@@ -62,6 +62,6 @@ int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
perf_test_main(&args);
stress_test_main(&args);
return 0;
}
......@@ -75,6 +75,6 @@ int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
perf_test_main(&args);
stress_test_main(&args);
return 0;
}
......@@ -78,6 +78,6 @@ test_main(int argc, char *const argv[]) {
args.num_update_threads = 1;
args.crash_on_operation_failure = false;
parse_stress_test_args(argc, argv, &args);
perf_test_main(&args);
stress_test_main(&args);
return 0;
}
......@@ -71,6 +71,6 @@ test_main(int argc, char *const argv[]) {
// this test is all about transactions, make the DB small
args.num_elements = 1;
args.num_DBs= 1;
perf_test_main(&args);
stress_test_main(&args);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
// The intent of this test is to measure the throughput of toku_malloc and toku_free
// with multiple threads.
static void
stress_table(DB_ENV* env, DB** dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
myargs[i].operation = xmalloc_free_op;
}
run_workers(myargs, num_threads, cli_args->num_seconds, false, cli_args);
}
int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args_for_perf();
parse_stress_test_args(argc, argv, &args);
stress_test_main(&args);
return 0;
}
......@@ -22,14 +22,6 @@
// This test is targetted at stressing the locktree, hence the small table and many update threads.
//
static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_extra, void *UU(stats_extra)) {
invariant_null(operation_extra);
if (!arg->cli->nolocktree) {
toku_env_run_lock_escalation_for_test(arg->env);
}
return 0;
}
static void
stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
......@@ -48,8 +40,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[0].operation_extra = &soe[0];
myargs[0].operation = scan_op;
myargs[1].sleep_ms = 15L * 1000;
myargs[1].operation_extra = nullptr;
// make the lock escalation thread.
// it should sleep somewhere between 10 and 20
// seconds between each escalation.
struct lock_escalation_op_extra eoe;
eoe.min_sleep_time_micros = 10UL * (1000 * 1000);
eoe.max_sleep_time_micros = 20UL * (1000 * 1000);
myargs[1].operation_extra = &eoe;
myargs[1].operation = lock_escalation_op;
// make the threads that update the db
......
......@@ -69,15 +69,9 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void
DBT dest_vals[2];
memset(dest_keys, 0, sizeof(dest_keys));
memset(dest_vals, 0, sizeof(dest_vals));
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size];
dbt_init(&key, keybuf, sizeof keybuf),
dbt_init(&val, valbuf, sizeof valbuf),
r = env->txn_begin(env, NULL, &hi_txn, 0); CKERR(r);
int i;
r = env->txn_begin(env, NULL, &hi_txn, 0);
CKERR(r);
for (i = 0; i < 1000; i++) {
DB* dbs[2];
toku_mutex_lock(&hi_lock);
......@@ -85,8 +79,11 @@ static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void
dbs[1] = hot_db;
int num_dbs = hot_db ? 2 : 1;
// do a random insertion
fill_key_buf_random(arg->random_data, keybuf, arg);
fill_val_buf_random(arg->random_data, valbuf, arg->cli);
int rand_key = random() % arg->cli->num_elements;
int rand_val = random();
DBT key, val;
dbt_init(&key, &rand_key, sizeof(rand_key)),
dbt_init(&val, &rand_val, sizeof(rand_val)),
r = env->put_multiple(
env,
db,
......
......@@ -52,11 +52,6 @@ memalign(size_t UU(alignment), size_t size)
# endif
#endif
#define MIN_VAL_SIZE sizeof(int64_t)
#define MIN_KEY_SIZE sizeof(int64_t)
#define MIN_COMPRESSIBILITY (0.0)
#define MAX_COMPRESSIBILITY (1.0)
volatile bool run_test; // should be volatile since we are communicating through this variable.
typedef struct arg *ARG;
......@@ -92,6 +87,9 @@ enum perf_output_format {
HUMAN = 0,
CSV,
TSV,
#if 0
GNUPLOT,
#endif
NUM_OUTPUT_FORMATS
};
......@@ -155,6 +153,8 @@ struct arg {
bool prelock_updates;
};
DB_TXN * const null_txn = 0;
static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
arg->cli = cli_args;
arg->dbp = dbp;
......@@ -163,7 +163,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl
arg->sleep_ms = 0;
arg->lock_type = STRESS_LOCK_NONE;
arg->txn_type = DB_TXN_SNAPSHOT;
arg->operation_extra = nullptr;
arg->operation_extra = NULL;
arg->do_prepare = false;
arg->prelock_updates = false;
}
......@@ -174,14 +174,12 @@ enum operation_type {
PTQUERIES,
NUM_OPERATION_TYPES
};
const char *operation_names[] = {
"ops",
"puts",
"ptqueries",
nullptr
NULL
};
static void increment_counter(void *extra, enum operation_type type, uint64_t inc) {
invariant(type != OPERATION);
int t = (int) type;
......@@ -401,6 +399,45 @@ tsv_print_perf_totals(const struct cli_args *cli_args, uint64_t *counters[], con
printf("\n");
}
#if 0
static void
gnuplot_print_perf_header(const struct cli_args *cli_args, const int num_threads)
{
printf("set terminal postscript solid color\n");
printf("set output \"foo.eps\"\n");
printf("set xlabel \"seconds\"\n");
printf("set xrange [0:*]\n");
printf("set ylabel \"X/s\"\n");
printf("plot ");
if (cli_args->print_thread_performance) {
for (int t = 1; t <= num_threads; ++t) {
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
const int col = (2 * ((t - 1) * (int) NUM_OPERATION_TYPES + op)) + 2;
//printf("'-' u 1:%d w lines t \"Thread %d %s\", ", col, t, operation_names[op]);
printf("'-' u 1:%d w lines t \"Thread %d %s/s\", ", col + 1, t, operation_names[op]);
}
}
}
for (int op = 0; op < (int) NUM_OPERATION_TYPES; ++op) {
const int col = (2 * (num_threads * (int) NUM_OPERATION_TYPES + op)) + 2;
//printf("'-' u 1:%d w lines t \"Total %s\", ", col);
printf("'-' u 1:%d w lines t \"Total %s/s\"%s", col + 1, operation_names[op], op == ((int) NUM_OPERATION_TYPES - 1) ? "\n" : ", ");
}
}
static void
gnuplot_print_perf_iteration(const struct cli_args *cli_args, const int current_time, uint64_t last_counters[][(int) NUM_OPERATION_TYPES], uint64_t *counters[], const int num_threads)
{
tsv_print_perf_iteration(cli_args, current_time, last_counters, counters, num_threads);
}
static void
gnuplot_print_perf_totals(const struct cli_args *UU(cli_args), uint64_t *UU(counters[]), const int UU(num_threads))
{
printf("e\n");
}
#endif
const struct perf_formatter perf_formatters[] = {
[HUMAN] = {
.header = human_print_perf_header,
......@@ -417,6 +454,13 @@ const struct perf_formatter perf_formatters[] = {
.iteration = tsv_print_perf_iteration,
.totals = tsv_print_perf_totals
},
#if 0
[GNUPLOT] = {
.header = gnuplot_print_perf_header,
.iteration = gnuplot_print_perf_iteration,
.totals = gnuplot_print_perf_totals
}
#endif
};
static int get_env_open_flags(struct cli_args *args) {
......@@ -487,7 +531,7 @@ static void *worker(void *arg_v) {
assert_zero(r);
arg->random_data = &random_data;
DB_ENV *env = arg->env;
DB_TXN *txn = nullptr;
DB_TXN *txn = NULL;
if (verbose) {
toku_pthread_t self = toku_pthread_self();
uintptr_t intself = (uintptr_t) self;
......@@ -544,6 +588,7 @@ static void *worker(void *arg_v) {
return arg;
}
typedef struct scan_cb_extra *SCAN_CB_EXTRA;
struct scan_cb_extra {
bool fast;
int64_t curr_sum;
......@@ -557,13 +602,13 @@ struct scan_op_extra {
};
static int
scan_cb(const DBT *key, const DBT *val, void *arg_v) {
struct scan_cb_extra *CAST_FROM_VOIDP(cb_extra, arg_v);
assert(key);
assert(val);
scan_cb(const DBT *a, const DBT *b, void *arg_v) {
SCAN_CB_EXTRA CAST_FROM_VOIDP(cb_extra, arg_v);
assert(a);
assert(b);
assert(cb_extra);
assert(val->size >= sizeof(int64_t));
cb_extra->curr_sum += *(int64_t *) val->data;
assert(b->size >= sizeof(int));
cb_extra->curr_sum += *(int *)b->data;
cb_extra->num_elements++;
return cb_extra->fast ? TOKUDB_CURSOR_CONTINUE : 0;
}
......@@ -576,13 +621,12 @@ static int scan_op_and_maybe_check_sum(
)
{
int r = 0;
DBC* cursor = nullptr;
DBC* cursor = NULL;
struct scan_cb_extra e = {
e.fast = sce->fast,
e.curr_sum = 0,
e.num_elements = 0,
};
struct scan_cb_extra e;
e.fast = sce->fast;
e.curr_sum = 0;
e.num_elements = 0;
{ int chk_r = db->cursor(db, txn, &cursor, 0); CKERR(chk_r); }
if (sce->prefetch) {
......@@ -633,68 +677,37 @@ static int generate_row_for_put(
return 0;
}
static uint64_t breverse(uint64_t v)
// Effect: return the bits in i, reversed
// Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
// Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct.
{
uint64_t r = v; // r will be reversed bits of v; first get LSB of v
int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end
for (v >>= 1; v; v >>= 1) {
r <<= 1;
r |= v & 1;
s--;
}
r <<= s; // shift when v's highest bits are zero
return r;
}
static void
fill_key_buf(uint64_t key, uint8_t *data, struct cli_args *args) {
invariant(args->key_size >= MIN_KEY_SIZE);
uint64_t *k = reinterpret_cast<uint64_t *>(data);
if (args->disperse_keys) {
*k = static_cast<uint64_t>(breverse(key));
} else {
*k = key;
}
if (args->key_size > sizeof(uint64_t)) {
memset(data + sizeof(uint64_t), 0, args->key_size - sizeof(uint64_t));
}
static int UU() nop(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
return 0;
}
static void
fill_key_buf_random(struct random_data *random_data, uint8_t *data, ARG arg) {
uint64_t key = randu64(random_data);
if (arg->bounded_element_range && arg->cli->num_elements > 0) {
key = key % arg->cli->num_elements;
}
fill_key_buf(key, data, arg->cli);
static int UU() xmalloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
size_t s = 256;
void *p = toku_xmalloc(s);
toku_free(p);
return 0;
}
static void
fill_val_buf(int64_t val, uint8_t *data, uint32_t val_size) {
invariant(val_size >= MIN_VAL_SIZE);
int64_t *v = reinterpret_cast<int64_t *>(data);
*v = val;
if (val_size > sizeof(int64_t)) {
memset(data + sizeof(int64_t), 0, val_size - sizeof(int64_t));
}
#if DONT_DEPRECATE_MALLOC
static int UU() malloc_free_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra), void *UU(stats_extra)) {
size_t s = 256;
void *p = malloc(s);
free(p);
return 0;
}
#endif
// Fill array with compressibility*size 0s.
// 0.0<=compressibility<=1.0
// Compressibility is the fraction of size that will be 0s (e.g. approximate fraction that will be compressed away).
// The rest will be random data.
static void
fill_val_buf_random(struct random_data *random_data, uint8_t *data, struct cli_args *args) {
invariant(args->val_size >= MIN_VAL_SIZE);
fill_zeroed_array(uint8_t *data, uint32_t size, struct random_data *random_data, double compressibility) {
//Requires: The array was zeroed since the last time 'size' was changed.
//Requires: compressibility is in range [0,1] indicating fraction that should be zeros.
// Fill in the random bytes
uint32_t num_random_bytes = (1 - args->compressibility) * args->val_size;
uint32_t num_random_bytes = (1 - compressibility) * size;
if (num_random_bytes > 0) {
uint32_t filled;
for (filled = 0; filled + sizeof(uint64_t) <= num_random_bytes; filled += sizeof(uint64_t)) {
......@@ -705,28 +718,39 @@ fill_val_buf_random(struct random_data *random_data, uint8_t *data, struct cli_a
memcpy(&data[filled], &last8, num_random_bytes - filled);
}
}
}
// Fill in the zero bytes
if (num_random_bytes < args->val_size) {
memset(data + num_random_bytes, 0, args->val_size - num_random_bytes);
}
static inline size_t
size_t_max(size_t a, size_t b) {
return (a > b) ? a : b;
}
static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, void *stats_extra) {
int r = 0;
uint8_t keybuf[arg->cli->key_size];
uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))];
uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b;
uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b;
ZERO_ARRAY(rand_key_b);
uint8_t valbuf[arg->cli->val_size];
DBT key, val;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf);
const int put_flags = get_put_flags(arg->cli);
ZERO_ARRAY(valbuf);
uint64_t puts_to_increment = 0;
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
fill_key_buf_random(arg->random_data, keybuf, arg);
fill_val_buf_random(arg->random_data, valbuf, arg->cli);
r = db->put(db, txn, &key, &val, put_flags);
rand_key_key[0] = randu64(arg->random_data);
if (arg->cli->interleave) {
rand_key_i[3] = arg->thread_idx;
} else {
rand_key_i[0] = arg->thread_idx;
}
if (arg->cli->num_elements > 0 && arg->bounded_element_range) {
rand_key_key[0] = rand_key_key[0] % arg->cli->num_elements;
}
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b);
dbt_init(&val, valbuf, sizeof valbuf);
int flags = get_put_flags(arg->cli);
r = db->put(db, txn, &key, &val, flags);
if (!ignore_errors && r != 0) {
goto cleanup;
}
......@@ -736,7 +760,6 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, vo
puts_to_increment = 0;
}
}
cleanup:
increment_counter(stats_extra, PUTS, puts_to_increment);
return r;
......@@ -765,19 +788,22 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
DB* db = arg->dbp[db_index];
int r = 0;
uint8_t keybuf[arg->cli->key_size];
uint8_t rand_key_b[size_t_max(arg->cli->key_size, sizeof(uint64_t))];
uint64_t *rand_key_key = cast_to_typeof(rand_key_key) rand_key_b;
uint16_t *rand_key_i = cast_to_typeof(rand_key_i) rand_key_b;
ZERO_ARRAY(rand_key_b);
uint8_t valbuf[arg->cli->val_size];
DBT key, val;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf);
const int put_flags = get_put_flags(arg->cli);
ZERO_ARRAY(valbuf);
uint64_t puts_to_increment = 0;
for (uint64_t i = 0; i < arg->cli->txn_size; ++i) {
fill_key_buf(i, keybuf, arg->cli);
fill_val_buf_random(arg->random_data, valbuf, arg->cli);
r = db->put(db, txn, &key, &val, put_flags);
for (uint32_t i = 0; i < arg->cli->txn_size; ++i) {
rand_key_key[0] = extra->current++;
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b);
dbt_init(&val, valbuf, sizeof valbuf);
int flags = get_put_flags(arg->cli);
r = db->put(db, txn, &key, &val, flags);
if (r != 0) {
goto cleanup;
}
......@@ -787,7 +813,6 @@ static int UU() serial_put_op(DB_TXN *txn, ARG arg, void *operation_extra, void
puts_to_increment = 0;
}
}
cleanup:
increment_counter(stats_extra, PUTS, puts_to_increment);
return r;
......@@ -802,44 +827,42 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v
uint32_t dbt_flags = 0;
r = db_create(&db_load, env, 0);
assert(r == 0);
r = db_load->open(db_load, txn, "loader-db", nullptr, DB_BTREE, DB_CREATE, 0666);
r = db_load->open(db_load, txn, "loader-db", NULL, DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
DB_LOADER *loader;
uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES;
r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags);
CKERR(r);
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
uint8_t valbuf[arg->cli->val_size];
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, valbuf, sizeof valbuf);
for (int i = 0; i < 1000; i++) {
fill_key_buf(i, keybuf, arg->cli);
fill_val_buf_random(arg->random_data, valbuf, arg->cli);
DBT key, val;
int rand_key = i;
int rand_val = myrandom_r(arg->random_data);
dbt_init(&key, &rand_key, sizeof(rand_key));
dbt_init(&val, &rand_val, sizeof(rand_val));
r = loader->put(loader, &key, &val); CKERR(r);
}
r = loader->close(loader); CKERR(r);
r = db_load->close(db_load, 0); CKERR(r);
r = env->dbremove(env, txn, "loader-db", nullptr, 0); CKERR(r);
r = env->dbremove(env, txn, "loader-db", NULL, 0); CKERR(r);
}
return 0;
}
static int UU() keyrange_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
// Pick a random DB, do a keyrange operation.
int r;
// callback is designed to run on tests with one DB
// no particular reason why, just the way it was
// originally done
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index];
int r = 0;
uint8_t keybuf[arg->cli->key_size];
int rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % arg->cli->num_elements;
}
DBT key;
dbt_init(&key, keybuf, sizeof keybuf);
fill_key_buf_random(arg->random_data, keybuf, arg);
dbt_init(&key, &rand_key, sizeof rand_key);
uint64_t less,equal,greater;
int is_exact;
r = db->key_range64(db, txn, &key, &less, &equal, &greater, &is_exact);
......@@ -867,6 +890,27 @@ static int UU() verify_op(DB_TXN* UU(txn), ARG UU(arg), void* UU(operation_extra
return r;
}
struct lock_escalation_op_extra {
// sleep somewhere between these times before running escalation.
// this will add some chaos into the mix.
uint64_t min_sleep_time_micros;
uint64_t max_sleep_time_micros;
};
static int UU() lock_escalation_op(DB_TXN *UU(txn), ARG arg, void* operation_extra, void *UU(stats_extra)) {
struct lock_escalation_op_extra *CAST_FROM_VOIDP(extra, operation_extra);
if (extra->max_sleep_time_micros > 0) {
invariant(extra->max_sleep_time_micros >= extra->min_sleep_time_micros);
uint64_t extra_sleep_time = (extra->max_sleep_time_micros - extra->min_sleep_time_micros) + 1;
uint64_t sleep_time = extra->min_sleep_time_micros + (myrandom_r(arg->random_data) % extra_sleep_time);
usleep(sleep_time);
}
if (!arg->cli->nolocktree) {
toku_env_run_lock_escalation_for_test(arg->env);
}
return 0;
}
static int UU() scan_op(DB_TXN *txn, ARG UU(arg), void* operation_extra, void *UU(stats_extra)) {
struct scan_op_extra* CAST_FROM_VOIDP(extra, operation_extra);
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
......@@ -923,24 +967,23 @@ static int dbt_do_nothing (DBT const *UU(key), DBT const *UU(row), void *UU(con
}
static int UU() ptquery_and_maybe_check_op(DB* db, DB_TXN *txn, ARG arg, bool check) {
int r = 0;
uint8_t keybuf[arg->cli->key_size];
int r;
int rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % arg->cli->num_elements;
}
DBT key, val;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, nullptr, 0);
fill_key_buf_random(arg->random_data, keybuf, arg);
dbt_init(&key, &rand_key, sizeof rand_key);
dbt_init(&val, NULL, 0);
r = db->getf_set(
db,
txn,
0,
&key,
dbt_do_nothing,
nullptr
NULL
);
if (check) {
assert(r != DB_NOTFOUND);
}
if (check) assert(r != DB_NOTFOUND);
r = 0;
return r;
}
......@@ -968,7 +1011,7 @@ static int UU() ptquery_op_no_check(DB_TXN *txn, ARG arg, void* UU(operation_ext
static int UU() cursor_create_close_op(DB_TXN *txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int db_index = arg->cli->num_DBs > 1 ? myrandom_r(arg->random_data)%arg->cli->num_DBs : 0;
DB* db = arg->dbp[db_index];
DBC* cursor = nullptr;
DBC* cursor = NULL;
int r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
r = cursor->c_close(cursor); assert(r == 0);
return 0;
......@@ -1017,14 +1060,14 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key),
void *set_extra),
void *set_extra)
{
int64_t old_int_val = 0;
int old_int_val = 0;
if (old_val) {
old_int_val = *(int64_t *) old_val->data;
old_int_val = *(int*)old_val->data;
}
assert(extra->size == sizeof(struct update_op_extra));
struct update_op_extra *CAST_FROM_VOIDP(e, extra->data);
int64_t new_int_val;
int new_int_val;
switch (e->type) {
case UPDATE_ADD_DIFF:
new_int_val = old_int_val + e->u.d.diff;
......@@ -1040,59 +1083,53 @@ static int update_op_callback(DB *UU(db), const DBT *UU(key),
assert(false);
}
uint32_t val_size = sizeof(int64_t) + e->pad_bytes;
uint8_t valbuf[val_size];
fill_val_buf(new_int_val, valbuf, val_size);
DBT new_val;
dbt_init(&new_val, valbuf, val_size);
set_val(&new_val, set_extra);
uint32_t data_size = sizeof(int) + e->pad_bytes;
char* data [data_size];
ZERO_ARRAY(data);
memcpy(data, &new_int_val, sizeof(new_int_val));
set_val(dbt_init(&new_val, data, data_size), set_extra);
return 0;
}
static int UU() update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
static int UU()update_op2(DB_TXN* txn, ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index];
int r = 0;
int curr_val_sum = 0;
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
int rand_key;
int rand_key2;
toku_sync_fetch_and_add(&update_count, 1);
struct update_op_extra extra;
ZERO_STRUCT(extra);
extra.type = UPDATE_ADD_DIFF;
extra.pad_bytes = 0;
int64_t curr_val_sum = 0;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
fill_key_buf_random(arg->random_data, keybuf, arg);
rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % (arg->cli->num_elements/2);
}
rand_key2 = arg->cli->num_elements - rand_key;
assert(rand_key != rand_key2);
extra.u.d.diff = 1;
curr_val_sum += extra.u.d.diff;
r = db->update(
db,
txn,
&key,
&val,
dbt_init(&key, &rand_key, sizeof rand_key),
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0) {
return r;
}
int64_t *rkp = (int64_t *) keybuf;
int64_t rand_key = *rkp;
invariant(rand_key != (arg->cli->num_elements - rand_key));
rand_key -= arg->cli->num_elements;
fill_key_buf(rand_key, keybuf, arg->cli);
extra.u.d.diff = -1;
r = db->update(
db,
txn,
&key,
&val,
dbt_init(&key, &rand_key2, sizeof rand_key),
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0) {
......@@ -1119,6 +1156,10 @@ static int pre_acquire_write_lock(DB *db, DB_TXN *txn,
// take the given db and do an update on it
static int
UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(stats_extra)) {
int r = 0;
int curr_val_sum = 0;
DBT key, val;
int update_key;
uint64_t old_update_count = toku_sync_fetch_and_add(&update_count, 1);
struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
struct update_op_extra extra;
......@@ -1130,14 +1171,7 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
extra.pad_bytes = 100;
}
}
int r = 0;
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
int64_t update_key;
int64_t curr_val_sum = 0;
const int update_flags = arg->cli->prelock_updates ? DB_PRELOCKED_WRITE : 0;
for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
if (arg->prelock_updates) {
if (i == 0) {
......@@ -1146,9 +1180,9 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
update_key = update_key % arg->cli->num_elements;
}
const int64_t max_key_in_table = arg->cli->num_elements - 1;
const uint32_t max_key_in_table = arg->cli->num_elements - 1;
const bool range_wraps = (update_key + arg->cli->txn_size - 1) > max_key_in_table;
int64_t left_key, right_key;
int left_key, right_key;
DBT left_key_dbt, right_key_dbt;
// acquire the range starting at the random key, plus txn_size - 1
......@@ -1184,16 +1218,15 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
}
} else {
update_key++;
if (arg->bounded_element_range) {
update_key = update_key % arg->cli->num_elements;
}
}
fill_key_buf(update_key, keybuf, arg->cli);
} else {
// just do a usual, random point update without locking first
fill_key_buf_random(arg->random_data, keybuf, arg);
update_key = myrandom_r(arg->random_data);
}
if (arg->bounded_element_range) {
update_key = update_key % arg->cli->num_elements;
}
// the last update keeps the table's sum as zero
// every other update except the last applies a random delta
......@@ -1208,15 +1241,12 @@ UU() update_op_db(DB *db, DB_TXN *txn, ARG arg, void* operation_extra, void *UU(
curr_val_sum += extra.u.d.diff;
}
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
// do the update
r = db->update(
db,
txn,
&key,
&val,
dbt_init(&key, &update_key, sizeof update_key),
dbt_init(&val, &extra, sizeof extra),
update_flags
);
if (r != 0) {
......@@ -1239,11 +1269,12 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
struct update_op_args* CAST_FROM_VOIDP(op_args, operation_extra);
assert(arg->bounded_element_range);
assert(op_args->update_history_buffer);
int r = 0;
int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index];
int curr_val_sum = 0;
DBT key, val;
int rand_key;
struct update_op_extra extra;
ZERO_STRUCT(extra);
extra.type = UPDATE_WITH_HISTORY;
......@@ -1254,44 +1285,47 @@ static int UU() update_with_history_op(DB_TXN *txn, ARG arg, void* operation_ext
extra.pad_bytes = 500;
}
}
DBT key, val;
uint8_t keybuf[arg->cli->key_size];
int64_t rand_key;
int64_t curr_val_sum = 0;
dbt_init(&key, keybuf, sizeof keybuf);
dbt_init(&val, &extra, sizeof extra);
for (uint32_t i = 0; i < arg->cli->txn_size; i++) {
fill_key_buf_random(arg->random_data, keybuf, arg);
int64_t *rkp = (int64_t *) keybuf;
rand_key = *rkp;
invariant(rand_key < arg->cli->num_elements);
if (i < arg->cli->txn_size - 1) {
extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
// just make every other value random
if (i % 2 == 0) {
extra.u.h.new_val = -extra.u.h.new_val;
}
curr_val_sum += extra.u.h.new_val;
} else {
// the last update should ensure the sum stays zero
extra.u.h.new_val = -curr_val_sum;
rand_key = myrandom_r(arg->random_data) % arg->cli->num_elements;
extra.u.h.new_val = myrandom_r(arg->random_data) % MAX_RANDOM_VAL;
// just make every other value random
if (i%2 == 0) {
extra.u.h.new_val = -extra.u.h.new_val;
}
curr_val_sum += extra.u.h.new_val;
extra.u.h.expected = op_args->update_history_buffer[rand_key];
op_args->update_history_buffer[rand_key] = extra.u.h.new_val;
r = db->update(
db,
txn,
&key,
&val,
dbt_init(&key, &rand_key, sizeof rand_key),
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0) {
return r;
}
}
//
// now put in one more to ensure that the sum stays 0
//
extra.u.h.new_val = -curr_val_sum;
rand_key = myrandom_r(arg->random_data);
if (arg->bounded_element_range) {
rand_key = rand_key % arg->cli->num_elements;
}
extra.u.h.expected = op_args->update_history_buffer[rand_key];
op_args->update_history_buffer[rand_key] = extra.u.h.new_val;
r = db->update(
db,
txn,
dbt_init(&key, &rand_key, sizeof rand_key),
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0) {
return r;
}
return r;
}
......@@ -1317,7 +1351,7 @@ static int UU() hot_op(DB_TXN *UU(txn), ARG UU(arg), void* UU(operation_extra),
int r;
for (int i = 0; run_test && i < arg->cli->num_DBs; i++) {
DB* db = arg->dbp[i];
r = db->hot_optimize(db, hot_progress_callback, nullptr);
r = db->hot_optimize(db, hot_progress_callback, NULL);
if (run_test) {
CKERR(r);
}
......@@ -1330,8 +1364,6 @@ get_ith_table_name(char *buf, size_t len, int i) {
snprintf(buf, len, "main%d", i);
}
DB_TXN * const null_txn = 0;
static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operation_extra), void *UU(stats_extra)) {
int r;
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
......@@ -1342,12 +1374,12 @@ static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operat
ZERO_ARRAY(name);
get_ith_table_name(name, sizeof(name), db_index);
r = arg->env->dbremove(arg->env, null_txn, name, nullptr, 0);
r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0);
CKERR(r);
r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0);
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, nullptr, DB_BTREE, DB_CREATE, 0666);
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
return 0;
}
......@@ -1432,7 +1464,6 @@ struct sleep_and_crash_extra {
bool is_setup;
bool threads_have_joined;
};
static void *sleep_and_crash(void *extra) {
sleep_and_crash_extra *e = static_cast<sleep_and_crash_extra *>(extra);
toku_mutex_lock(&e->mutex);
......@@ -1477,7 +1508,7 @@ static int run_workers(
int r;
const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format];
toku_mutex_t mutex = ZERO_MUTEX_INITIALIZER;
toku_mutex_init(&mutex, nullptr);
toku_mutex_init(&mutex, NULL);
struct rwlock rwlock;
rwlock_init(&rwlock);
toku_pthread_t tids[num_threads];
......@@ -1502,11 +1533,11 @@ static int run_workers(
worker_extra[i].operation_lock_mutex = &mutex;
XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters);
TOKU_DRD_IGNORE_VAR(worker_extra[i].counters);
{ int chk_r = toku_pthread_create(&tids[i], nullptr, worker, &worker_extra[i]); CKERR(chk_r); }
{ int chk_r = toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]); CKERR(chk_r); }
if (verbose)
printf("%lu created\n", (unsigned long) tids[i]);
}
{ int chk_r = toku_pthread_create(&time_tid, nullptr, test_time, &tte); CKERR(chk_r); }
{ int chk_r = toku_pthread_create(&time_tid, NULL, test_time, &tte); CKERR(chk_r); }
if (verbose)
printf("%lu created\n", (unsigned long) time_tid);
......@@ -1629,22 +1660,131 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = db->set_readpagesize(db, env_args.basement_node_size);
CKERR(r);
const int flags = DB_CREATE | (cli_args->blackhole ? DB_BLACKHOLE : 0);
r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666);
r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666);
CKERR(r);
db_res[i] = db;
}
return r;
}
static void report_overall_fill_table_progress(struct cli_args *args, int num_rows) {
static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bufsz, int val_bufsz,
void (*callback)(int idx, void *extra,
void *key, int *keysz,
void *val, int *valsz),
void *extra, void (*progress_cb)(int num_rows)) {
DB_TXN *txn = nullptr;
const int puts_per_txn = 100000;
int r = 0;
for (long i = 0; i < num_elements; ++i) {
if (txn == nullptr) {
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
}
char keybuf[key_bufsz], valbuf[val_bufsz];
memset(keybuf, 0, sizeof(keybuf));
memset(valbuf, 0, sizeof(valbuf));
int keysz, valsz;
callback(i, extra, keybuf, &keysz, valbuf, &valsz);
// let's make sure the data stored fits in the buffers we passed in
assert(keysz <= key_bufsz);
assert(valsz <= val_bufsz);
DBT key, val;
// make size of data what is specified w/input parameters
// note that key and val have sizes of
// key_bufsz and val_bufsz, which were passed into this
// function, not what was stored by the callback
r = db->put(
db,
txn,
dbt_init(&key, keybuf, key_bufsz),
dbt_init(&val, valbuf, val_bufsz),
// don't bother taking locks in the locktree
DB_PRELOCKED_WRITE
);
assert(r == 0);
if (i > 0 && i % puts_per_txn == 0) {
// don't bother fsyncing to disk.
// the caller can checkpoint if they want.
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
txn = nullptr;
if (verbose) {
progress_cb(puts_per_txn);
}
}
}
if (txn) {
r = txn->commit(txn, DB_TXN_NOSYNC);
invariant_zero(r);
}
return r;
}
static uint32_t breverse(uint32_t v)
// Effect: return the bits in i, reversed
// Notes: implementation taken from http://graphics.stanford.edu/~seander/bithacks.html#BitReverseObvious
// Rationale: just a hack to spread out the keys during loading, doesn't need to be fast but does need to be correct.
{
uint32_t r = v; // r will be reversed bits of v; first get LSB of v
int s = sizeof(v) * CHAR_BIT - 1; // extra shift needed at end
for (v >>= 1; v; v >>= 1) {
r <<= 1;
r |= v & 1;
s--;
}
r <<= s; // shift when v's highest bits are zero
return r;
}
static void zero_element_callback(int idx, void *extra, void *keyv, int *keysz, void *valv, int *valsz) {
const bool *disperse_keys = static_cast<bool *>(extra);
int *CAST_FROM_VOIDP(key, keyv);
int *CAST_FROM_VOIDP(val, valv);
if (*disperse_keys) {
*key = static_cast<int>(breverse(idx));
} else {
*key = idx;
}
*val = 0;
*keysz = sizeof(int);
*valsz = sizeof(int);
}
struct fill_table_worker_info {
DB_ENV *env;
DB *db;
int num_elements;
uint32_t key_size;
uint32_t val_size;
bool disperse_keys;
void (*progress_cb)(int num_rows);
};
static void fill_table_worker(void *arg) {
struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
int r = fill_table_from_fun(
info->env,
info->db,
info->num_elements,
info->key_size,
info->val_size,
zero_element_callback,
&info->disperse_keys,
info->progress_cb
);
invariant_zero(r);
toku_free(info);
}
static int num_tables_to_fill = 1;
static int rows_per_table = 1;
static void report_overall_fill_table_progress(int num_rows) {
// for sanitary reasons we'd like to prevent two threads
// from printing the same performance report twice.
static bool reporting;
// when was the first time measurement taken?
static uint64_t t0;
static int rows_inserted;
// when was the last report? what was its progress?
static uint64_t last_report;
static double last_progress;
......@@ -1654,9 +1794,12 @@ static void report_overall_fill_table_progress(struct cli_args *args, int num_ro
}
uint64_t rows_so_far = toku_sync_add_and_fetch(&rows_inserted, num_rows);
double progress = rows_so_far / (args->num_elements * args->num_DBs * 1.0);
double progress = rows_so_far /
(rows_per_table * num_tables_to_fill * 1.0);
if (progress > (last_progress + .01)) {
uint64_t t1 = toku_current_time_microsec();
// report no more often than once every 5 seconds, for less output.
// there is a race condition. it is probably harmless.
const uint64_t minimum_report_period = 5 * 1000000;
if (t1 > last_report + minimum_report_period
&& toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) {
......@@ -1670,68 +1813,24 @@ static void report_overall_fill_table_progress(struct cli_args *args, int num_ro
}
}
static void fill_single_table(DB_ENV *env, DB *db, struct cli_args *args, bool fill_with_zeroes) {
const int puts_per_txn = 100000;
int r = 0;
DB_TXN *txn = nullptr;
struct random_data random_data;
char random_buf[8];
r = myinitstate_r(random(), random_buf, 8, &random_data); CKERR(r);
uint8_t keybuf[args->key_size], valbuf[args->val_size];
memset(keybuf, 0, sizeof keybuf);
memset(valbuf, 0, sizeof valbuf);
DBT key, val;
dbt_init(&key, keybuf, args->key_size);
dbt_init(&val, valbuf, args->val_size);
static int fill_tables_with_zeroes(DB_ENV *env, DB **dbs, int num_DBs, int num_elements, uint32_t key_size, uint32_t val_size, bool disperse_keys) {
// set the static globals that the progress reporter uses
num_tables_to_fill = num_DBs;
rows_per_table = num_elements;
for (int i = 0; i < args->num_elements; i++) {
if (txn == nullptr) {
r = env->txn_begin(env, 0, &txn, 0); CKERR(r);
}
fill_key_buf(i, keybuf, args);
if (!fill_with_zeroes) {
fill_val_buf_random(&random_data, valbuf, args);
}
r = db->put(db, txn, &key, &val, DB_PRELOCKED_WRITE); CKERR(r);
if (i > 0 && i % puts_per_txn == 0) {
// don't bother fsyncing to disk.
// the caller can checkpoint if they want.
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
txn = nullptr;
if (verbose) {
report_overall_fill_table_progress(args, puts_per_txn);
}
}
}
if (txn) {
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
}
}
struct fill_table_worker_info {
struct cli_args *args;
DB_ENV *env;
DB *db;
bool fill_with_zeroes;
};
static void fill_table_worker(void *arg) {
struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
fill_single_table(info->env, info->db, info->args, info->fill_with_zeroes);
toku_free(info);
}
static int fill_tables(DB_ENV *env, DB **dbs, struct cli_args *args, bool fill_with_zeroes) {
const int num_cores = toku_os_get_number_processors();
KIBBUTZ kibbutz = toku_kibbutz_create(num_cores);
for (int i = 0; i < args->num_DBs; i++) {
for (int i = 0; i < num_DBs; i++) {
assert(key_size >= sizeof(int));
assert(val_size >= sizeof(int));
struct fill_table_worker_info *XCALLOC(info);
info->env = env;
info->db = dbs[i];
info->args = args;
info->fill_with_zeroes = fill_with_zeroes;
info->num_elements = num_elements;
info->key_size = key_size;
info->val_size = val_size;
info->disperse_keys = disperse_keys;
info->progress_cb = report_overall_fill_table_progress;
toku_kibbutz_enq(kibbutz, fill_table_worker, info);
}
toku_kibbutz_destroy(kibbutz);
......@@ -1766,6 +1865,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
int r;
struct env_args env_args = cli_args->env_args;
/* create the dup database file */
DB_ENV *env;
db_env_set_num_bucket_mutexes(env_args.num_bucket_mutexes);
r = db_env_create(&env, 0); assert(r == 0);
......@@ -1773,6 +1873,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
r = env->set_lk_max_memory(env, env_args.lk_max_memory); CKERR(r);
env->set_update(env, env_args.update_function);
// set the cache size to 10MB
r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r);
if (env_args.generate_put_callback) {
......@@ -1803,7 +1904,7 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = db_create(&db, env, 0);
CKERR(r);
const int flags = cli_args->blackhole ? DB_BLACKHOLE : 0;
r = db->open(db, null_txn, name, nullptr, DB_BTREE, flags, 0666);
r = db->open(db, null_txn, name, NULL, DB_BTREE, flags, 0666);
CKERR(r);
db_res[i] = db;
}
......@@ -1831,8 +1932,8 @@ static const struct env_args DEFAULT_ENV_ARGS = {
.num_bucket_mutexes = 1024,
.envdir = ENVDIR,
.update_function = update_op_callback,
.generate_put_callback = nullptr,
.generate_del_callback = nullptr,
.generate_put_callback = NULL,
.generate_del_callback = NULL,
};
static const struct env_args DEFAULT_PERF_ENV_ARGS = {
......@@ -1846,11 +1947,15 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = {
.cachetable_size = 1<<30,
.num_bucket_mutexes = 1024 * 1024,
.envdir = ENVDIR,
.update_function = nullptr,
.generate_put_callback = nullptr,
.generate_del_callback = nullptr,
.update_function = NULL,
.generate_put_callback = NULL,
.generate_del_callback = NULL,
};
#define MIN_VAL_SIZE sizeof(int)
#define MIN_KEY_SIZE sizeof(int)
#define MIN_COMPRESSIBILITY (0.0)
#define MAX_COMPRESSIBILITY (1.0)
static struct cli_args UU() get_default_args(void) {
struct cli_args DEFAULT_ARGS = {
.num_elements = 150000,
......@@ -2222,7 +2327,7 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
#define LOCAL_STRING_ARG(name_string, variable, default) \
MAKE_LOCAL_ARG(name_string, type_string, s, default, variable, "", "", "")
const char *perf_format_s = nullptr;
const char *perf_format_s = NULL;
struct arg_type arg_types[] = {
INT32_ARG_NONNEG("--num_elements", num_elements, ""),
INT32_ARG_NONNEG("--num_DBs", num_DBs, ""),
......@@ -2334,15 +2439,20 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
}
}
}
if (perf_format_s != nullptr) {
if (perf_format_s != NULL) {
if (!strcmp(perf_format_s, "human")) {
args->perf_output_format = HUMAN;
} else if (!strcmp(perf_format_s, "csv")) {
args->perf_output_format = CSV;
} else if (!strcmp(perf_format_s, "tsv")) {
args->perf_output_format = TSV;
#if 0
} else if (!strcmp(perf_format_s, "gnuplot")) {
args->perf_output_format = GNUPLOT;
#endif
} else {
fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", and \"tsv\"\n");
//fprintf(stderr, "valid values for --perf_format are \"human\", \"csv\", \"tsv\", and \"gnuplot\"\n");
do_usage(argv0, num_arg_types, arg_types);
exit(EINVAL);
}
......@@ -2358,24 +2468,35 @@ static void
stress_table(DB_ENV *, DB **, struct cli_args *);
static int
stress_dbt_cmp(DB *db, const DBT *a, const DBT *b) {
UU() stress_int_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
assert(db && a && b);
assert(a->size >= sizeof(int));
assert(b->size >= sizeof(int));
int x = *(int *) a->data;
int y = *(int *) b->data;
if (x<y) return -1;
if (x>y) return 1;
return 0;
}
// Keys are only compared by their first 8 bytes,
// interpreted as a little endian 64 bit integer..
assert(a->size >= sizeof(int64_t));
assert(b->size >= sizeof(int64_t));
static int
UU() stress_uint64_dbt_cmp(DB *db, const DBT *a, const DBT *b) {
assert(db && a && b);
assert(a->size >= sizeof(uint64_t));
assert(b->size >= sizeof(uint64_t));
int64_t x = *(int64_t *) a->data;
int64_t y = *(int64_t *) b->data;
uint64_t x = *(uint64_t *) a->data;
uint64_t y = *(uint64_t *) b->data;
if (x < y) {
return -1;
} else if (x > y) {
}
if (x > y) {
return +1;
} else {
return 0;
}
return 0;
}
......@@ -2391,49 +2512,21 @@ do_warm_cache(DB_ENV *env, DB **dbs, struct cli_args *args)
scan_arg.operation_extra = &soe;
scan_arg.operation = scan_op_no_check;
scan_arg.lock_type = STRESS_LOCK_NONE;
DB_TXN* txn = nullptr;
DB_TXN* txn = NULL;
// don't take serializable read locks when scanning.
int r = env->txn_begin(env, 0, &txn, DB_TXN_SNAPSHOT); CKERR(r);
// make sure the scan doesn't terminate early
run_test = true;
// warm up each DB in parallel
scan_op_no_check_parallel(txn, &scan_arg, &soe, nullptr);
scan_op_no_check_parallel(txn, &scan_arg, &soe, NULL);
r = txn->commit(txn,0); CKERR(r);
}
static void
UU() stress_recover(struct cli_args *args) {
DB_ENV* env = nullptr;
DB* dbs[args->num_DBs];
memset(dbs, 0, sizeof(dbs));
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_dbt_cmp,
args); CKERR(chk_r); }
DB_TXN* txn = nullptr;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r);
struct scan_op_extra soe;
soe.fast = true;
soe.fwd = true;
soe.prefetch = false;
// make sure the scan doesn't terminate early
run_test = true;
r = scan_op(txn, &recover_args, &soe, nullptr);
CKERR(r);
{ int chk_r = txn->commit(txn,0); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
}
static void
test_main(struct cli_args *args, bool fill_with_zeroes)
UU() stress_test_main_with_cmp(struct cli_args *args, int (*bt_compare)(DB *, const DBT *, const DBT *))
{
{ char *loc = setlocale(LC_NUMERIC, "en_US.UTF-8"); assert(loc); }
DB_ENV* env = nullptr;
DB_ENV* env = NULL;
DB* dbs[args->num_DBs];
memset(dbs, 0, sizeof(dbs));
db_env_enable_engine_status(args->nocrashstatus ? false : true);
......@@ -2442,17 +2535,17 @@ test_main(struct cli_args *args, bool fill_with_zeroes)
&env,
dbs,
args->num_DBs,
stress_dbt_cmp,
bt_compare,
args
);
{ int chk_r = fill_tables(env, dbs, args, fill_with_zeroes); CKERR(chk_r); }
{ int chk_r = fill_tables_with_zeroes(env, dbs, args->num_DBs, args->num_elements, args->key_size, args->val_size, args->disperse_keys); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
}
if (!args->only_create) {
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_dbt_cmp,
bt_compare,
args); CKERR(chk_r); }
if (args->warm_cache) {
do_warm_cache(env, dbs, args);
......@@ -2463,17 +2556,37 @@ test_main(struct cli_args *args, bool fill_with_zeroes)
}
static void
UU() stress_test_main(struct cli_args *args) {
// Begin the test with fixed size values equal to zero.
// This is important for correctness testing.
test_main(args, true);
UU() stress_test_main(struct cli_args *args)
{
stress_test_main_with_cmp(args, stress_int_dbt_cmp);
}
static void
UU() perf_test_main(struct cli_args *args) {
// Do not begin the test by creating a table of all zeroes.
// We want to control the row size and its compressibility.
test_main(args, false);
UU() stress_recover(struct cli_args *args) {
DB_ENV* env = NULL;
DB* dbs[args->num_DBs];
memset(dbs, 0, sizeof(dbs));
{ int chk_r = open_tables(&env,
dbs,
args->num_DBs,
stress_int_dbt_cmp,
args); CKERR(chk_r); }
DB_TXN* txn = NULL;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r);
struct scan_op_extra soe;
soe.fast = true;
soe.fwd = true;
soe.prefetch = false;
// make sure the scan doesn't terminate early
run_test = true;
r = scan_op(txn, &recover_args, &soe, NULL);
CKERR(r);
{ int chk_r = txn->commit(txn,0); CKERR(chk_r); }
{ int chk_r = close_tables(env, dbs, args->num_DBs); CKERR(chk_r); }
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment