Commit 50ceb080 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:4144] add command line flags --num_update_threads, --update_txn_size,

and --no-crash_on_update_failure


git-svn-id: file:///svn/toku/tokudb@36985 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2aec9642
......@@ -50,10 +50,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 5 + cli_args->num_ptquery_threads;
const int num_threads = 4 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env);
arg_init(&myargs[i], n, dbp, env, cli_args);
}
// make the forward fast scanner
......@@ -77,10 +77,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].operation = scan_op;
// make the guy that updates the db
myargs[4].operation = update_op;
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation = update_op;
}
// make the guy that does point queries
for (int i = 5; i < num_threads; i++) {
for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op;
}
......
......@@ -20,14 +20,14 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
int n = cli_args->num_elements;
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 1;
const int num_threads = cli_args->num_update_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env);
// make the guy that updates the db
myargs[i].operation = update_op;
}
// make the guy that updates the db
myargs[0].operation = update_op;
int num_seconds = random() % cli_args->time_of_test;
run_workers(myargs, num_threads, num_seconds, true);
......
......@@ -43,17 +43,17 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
//
// the threads that we want:
// - one thread constantly updating random values
// - some threads constantly updating random values
// - one thread doing table scan with bulk fetch
// - one thread doing table scan without bulk fetch
// - one thread doing random point queries
// - some threads doing random point queries
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 5 + cli_args->num_ptquery_threads;
const int num_threads = 4 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env);
arg_init(&myargs[i], n, dbp, env, cli_args);
}
// make the forward fast scanner
......@@ -77,10 +77,12 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].operation = scan_op;
// make the guy that updates the db
myargs[4].operation = update_op;
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation = update_op;
}
// make the guy that does point queries
for (int i = 5; i < num_threads; i++) {
for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op;
}
......
......@@ -44,10 +44,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 5 + cli_args->num_ptquery_threads;
const int num_threads = 4 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env);
arg_init(&myargs[i], n, dbp, env, cli_args);
}
// make the forward fast scanner
......@@ -71,11 +71,13 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].operation = scan_op;
// make the guy that updates the db
myargs[4].bounded_update_range = false;
myargs[4].operation = update_op;
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].bounded_update_range = false;
myargs[i].operation = update_op;
}
// make the guy that does point queries
for (int i = 5; i < num_threads; i++) {
for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].bounded_update_range = false;
myargs[i].operation = ptquery_op_no_check;
}
......
......@@ -42,10 +42,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// - one thread doing random point queries
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 6 + cli_args->num_ptquery_threads;
const int num_threads = 5 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env);
arg_init(&myargs[i], n, dbp, env, cli_args);
}
// make the forward fast scanner
......@@ -69,16 +69,18 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].operation = scan_op;
// make the guy that updates the db
myargs[4].lock_type = STRESS_LOCK_SHARED;
myargs[4].operation = update_op;
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].lock_type = STRESS_LOCK_SHARED;
myargs[i].operation = update_op;
}
// make the guy that sends update broadcasts
myargs[5].lock_type = STRESS_LOCK_EXCL;
myargs[5].sleep_ms = cli_args->update_broadcast_period_ms;
myargs[5].operation = update_broadcast_op;
myargs[4 + cli_args->num_update_threads].lock_type = STRESS_LOCK_EXCL;
myargs[4 + cli_args->num_update_threads].sleep_ms = cli_args->update_broadcast_period_ms;
myargs[4 + cli_args->num_update_threads].operation = update_broadcast_op;
// make the guys that do point queries
for (int i = 6; i < num_threads; i++) {
for (int i = 5 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op;
}
......
......@@ -42,10 +42,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
// - one thread doing random point queries
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 5 + cli_args->num_ptquery_threads;
const int num_threads = 4 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env);
arg_init(&myargs[i], n, dbp, env, cli_args);
}
// make the forward fast scanner
......@@ -69,18 +69,22 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].operation = scan_op_no_check;
// make the guy that updates the db
myargs[4].update_history_buffer = toku_xmalloc(n * (sizeof myargs[4].update_history_buffer[0]));
memset(myargs[4].update_history_buffer, 0, n * (sizeof myargs[4].update_history_buffer[0]));
myargs[4].operation = update_with_history_op;
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].update_history_buffer = toku_xmalloc(n * (sizeof myargs[i].update_history_buffer[0]));
memset(myargs[i].update_history_buffer, 0, n * (sizeof myargs[i].update_history_buffer[0]));
myargs[i].operation = update_with_history_op;
}
// make the guys that do point queries
for (int i = 5; i < num_threads; i++) {
for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op;
}
run_workers(myargs, num_threads, cli_args->time_of_test, false);
toku_free(myargs[4].update_history_buffer);
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
toku_free(myargs[i].update_history_buffer);
}
}
int
......
......@@ -24,10 +24,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 5 + cli_args->num_ptquery_threads;
const int num_threads = 4 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env);
arg_init(&myargs[i], n, dbp, env, cli_args);
}
// make the forward fast scanner
myargs[0].fast = TRUE;
......@@ -40,12 +40,15 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[1].operation = scan_op;
// make the guy that updates the db
myargs[2].operation = update_op;
myargs[3].operation = loader_op;
myargs[4].operation = keyrange_op;
myargs[2].operation = loader_op;
myargs[3].operation = keyrange_op;
for (int i = 4; i < 4 + cli_args->num_update_threads; ++i) {
myargs[i].operation = update_op;
}
// make the guy that does point queries
for (int i = 5; i < num_threads; i++) {
for (int i = 4 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].operation = ptquery_op;
}
run_workers(myargs, num_threads, cli_args->time_of_test, false);
......
......@@ -44,10 +44,10 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
//
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 7 + cli_args->num_ptquery_threads;
const int num_threads = 6 + cli_args->num_update_threads + cli_args->num_ptquery_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], n, dbp, env);
arg_init(&myargs[i], n, dbp, env, cli_args);
}
// make the forward fast scanner
......@@ -74,22 +74,24 @@ stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
myargs[3].lock_type = STRESS_LOCK_SHARED;
myargs[3].operation = scan_op;
// make the guy that updates the db
myargs[4].bounded_update_range = false;
myargs[4].lock_type = STRESS_LOCK_SHARED;
myargs[4].operation = update_op;
// make the guy that removes and recreates the db
myargs[4].lock_type = STRESS_LOCK_EXCL;
myargs[4].sleep_ms = 2000; // maybe make this a runtime param at some point
myargs[4].operation = remove_and_recreate_me;
myargs[5].lock_type = STRESS_LOCK_EXCL;
myargs[5].sleep_ms = 2000; // maybe make this a runtime param at some point
myargs[5].operation = remove_and_recreate_me;
myargs[5].operation = truncate_me;
myargs[6].lock_type = STRESS_LOCK_EXCL;
myargs[6].sleep_ms = 2000; // maybe make this a runtime param at some point
myargs[6].operation = truncate_me;
// make the guy that updates the db
for (int i = 6; i < 6 + cli_args->num_update_threads; ++i) {
myargs[i].bounded_update_range = false;
myargs[i].lock_type = STRESS_LOCK_SHARED;
myargs[i].operation = update_op;
}
// make the guy that does point queries
for (int i = 7; i < num_threads; i++) {
for (int i = 6 + cli_args->num_update_threads; i < num_threads; i++) {
myargs[i].lock_type = STRESS_LOCK_SHARED;
myargs[i].bounded_update_range = false;
myargs[i].operation = ptquery_op_no_check;
......
......@@ -49,11 +49,35 @@ struct arg {
toku_pthread_mutex_t *broadcast_lock_mutex;
struct rwlock *broadcast_lock;
int update_pad_frequency;
bool crash_on_update_failure;
u_int32_t update_txn_size;
};
struct cli_args {
int num_elements;
int time_of_test;
int node_size;
int basement_node_size;
u_int64_t cachetable_size;
bool only_create;
bool only_stress;
int checkpointing_period;
int cleaner_period;
int cleaner_iterations;
int update_broadcast_period_ms;
int num_ptquery_threads;
test_update_callback_f update_function;
bool do_test_and_crash;
bool do_recover;
char *envdir;
int num_update_threads;
bool crash_on_update_failure;
u_int32_t update_txn_size;
};
DB_TXN * const null_txn = 0;
static void arg_init(struct arg *arg, int n, DB **dbp, DB_ENV *env) {
static void arg_init(struct arg *arg, int n, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
arg->n = n;
arg->dbp = dbp;
arg->env = env;
......@@ -66,6 +90,8 @@ static void arg_init(struct arg *arg, int n, DB **dbp, DB_ENV *env) {
arg->txn_type = DB_TXN_SNAPSHOT;
arg->update_history_buffer = NULL;
arg->update_pad_frequency = n/100; // bit arbitrary. Just want dictionary to grow and shrink so splits and merges occur
arg->crash_on_update_failure = cli_args->crash_on_update_failure;
arg->update_txn_size = cli_args->update_txn_size;
}
static void *worker(void *arg_v) {
......@@ -89,8 +115,16 @@ static void *worker(void *arg_v) {
}
int r = env->txn_begin(env, 0, &txn, arg->txn_type); CKERR(r);
r = arg->operation(env, dbp, txn, arg); CKERR(r);
r = arg->operation(env, dbp, txn, arg);
if (r == 0) {
CHK(txn->commit(txn,0));
} else {
if (arg->crash_on_update_failure) {
CKERR(r);
} else {
CHK(txn->abort(txn));
}
}
toku_pthread_mutex_lock(arg->broadcast_lock_mutex);
if (arg->lock_type == STRESS_LOCK_SHARED) {
......@@ -337,7 +371,7 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
memset(&extra, 0, sizeof(extra));
extra.type = UPDATE_ADD_DIFF;
extra.pad_bytes = 0;
for (u_int32_t i = 0; i < 500; i++) {
for (u_int32_t i = 0; i < arg->update_txn_size; i++) {
rand_key = random();
if (arg->bounded_update_range) {
rand_key = rand_key % (arg->n/2);
......@@ -353,6 +387,9 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r);
extra.u.d.diff = -1;
r = db->update(
......@@ -362,6 +399,9 @@ static int UU()update_op2(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r);
}
return r;
......@@ -382,9 +422,8 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
if (update_count % (2*arg->update_pad_frequency) == update_count%arg->update_pad_frequency) {
extra.pad_bytes = 100;
}
}
for (u_int32_t i = 0; i < 1000; i++) {
for (u_int32_t i = 0; i < arg->update_txn_size; i++) {
rand_key = random();
if (arg->bounded_update_range) {
rand_key = rand_key % arg->n;
......@@ -402,6 +441,9 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r);
}
//
......@@ -419,6 +461,9 @@ static int UU()update_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, ARG arg) {
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r);
return r;
......@@ -443,7 +488,7 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A
}
}
for (u_int32_t i = 0; i < 1000; i++) {
for (u_int32_t i = 0; i < arg->update_txn_size; i++) {
rand_key = random() % arg->n;
extra.u.h.new = random() % MAX_RANDOM_VAL;
// just make every other value random
......@@ -460,6 +505,9 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r);
}
//
......@@ -479,6 +527,9 @@ static int UU() update_with_history_op(DB_ENV *UU(env), DB **dbp, DB_TXN *txn, A
dbt_init(&val, &extra, sizeof extra),
0
);
if (r != 0 && !arg->crash_on_update_failure) {
return r;
}
CKERR(r);
return r;
......@@ -727,25 +778,6 @@ static int close_table(DB_ENV *env, DB *db) {
return r;
}
struct cli_args {
int num_elements;
int time_of_test;
int node_size;
int basement_node_size;
u_int64_t cachetable_size;
bool only_create;
bool only_stress;
int checkpointing_period;
int cleaner_period;
int cleaner_iterations;
int update_broadcast_period_ms;
int num_ptquery_threads;
test_update_callback_f update_function;
bool do_test_and_crash;
bool do_recover;
char *envdir;
};
static const struct cli_args DEFAULT_ARGS = {
.num_elements = 150000,
.time_of_test = 180,
......@@ -763,6 +795,9 @@ static const struct cli_args DEFAULT_ARGS = {
.do_test_and_crash = false,
.do_recover = false,
.envdir = ENVDIR,
.num_update_threads = 1,
.crash_on_update_failure = true,
.update_txn_size = 1000,
};
static inline void parse_stress_test_args (int argc, char *const argv[], struct cli_args *args) {
......@@ -789,6 +824,9 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
fprintf(stderr, "\t--cleaner_iterations INT (default %ds)\n", DEFAULT_ARGS.cleaner_iterations);
fprintf(stderr, "\t--update_broadcast_period INT (default %dms)\n", DEFAULT_ARGS.update_broadcast_period_ms);
fprintf(stderr, "\t--num_ptquery_threads INT (default %d threads)\n", DEFAULT_ARGS.num_ptquery_threads);
fprintf(stderr, "\t--num_update_threads INT (default %d threads)\n", DEFAULT_ARGS.num_update_threads);
fprintf(stderr, "\t--update_txn_size INT (default %d rows)\n", DEFAULT_ARGS.update_txn_size);
fprintf(stderr, "\t--[no-]crash_on_update_failure BOOL (default %s)\n", DEFAULT_ARGS.crash_on_update_failure ? "yes" : "no");
exit(resultcode);
}
else if (strcmp(argv[1], "--num_elements") == 0) {
......@@ -831,6 +869,20 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
argc--; argv++;
args->num_ptquery_threads = atoi(argv[1]);
}
else if (strcmp(argv[1], "--num_update_threads") == 0) {
argc--; argv++;
args->num_update_threads = atoi(argv[1]);
}
else if (strcmp(argv[1], "--crash_on_update_failure") == 0) {
args->crash_on_update_failure = true;
}
else if (strcmp(argv[1], "--no-crash_on_update_failure") == 0) {
args->crash_on_update_failure = false;
}
else if (strcmp(argv[1], "--update_txn_size") == 0) {
argc--; argv++;
args->update_txn_size = atoi(argv[1]);
}
else if (strcmp(argv[1], "--only_create") == 0) {
args->only_create = true;
}
......@@ -913,7 +965,7 @@ UU() stress_recover(struct cli_args *args) {
DB_TXN* txn = NULL;
struct arg recover_args;
arg_init(&recover_args, args->num_elements, &db, env);
arg_init(&recover_args, args->num_elements, &db, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
CKERR(r);
r = scan_op_and_maybe_check_sum(env, &db, txn, &recover_args, true);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment