Commit a3221f86 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

closes #5419 account for actual time slept, not just how long we wanted to...

closes #5419 account for actual time slept, not just how long we wanted to sleep, and set an alarm to make sure it doesn't take too long to end the test


git-svn-id: file:///svn/toku/tokudb@49986 c7de825b-a66e-492c-adef-691d508d4ae1
parent 04b309f8
...@@ -1073,7 +1073,7 @@ struct update_op_args { ...@@ -1073,7 +1073,7 @@ struct update_op_args {
int update_pad_frequency; int update_pad_frequency;
}; };
static struct update_op_args UU() get_update_op_args(struct cli_args* cli_args, int* update_history_buffer) { static struct update_op_args UU() get_update_op_args(struct cli_args* cli_args, int* update_history_buffer) {
struct update_op_args uoe; struct update_op_args uoe;
uoe.update_history_buffer = update_history_buffer; uoe.update_history_buffer = update_history_buffer;
uoe.update_pad_frequency = cli_args->num_elements/100; // arbitrary uoe.update_pad_frequency = cli_args->num_elements/100; // arbitrary
...@@ -1398,14 +1398,14 @@ static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operat ...@@ -1398,14 +1398,14 @@ static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operat
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs; int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index]; DB* db = arg->dbp[db_index];
r = (db)->close(db, 0); CKERR(r); r = (db)->close(db, 0); CKERR(r);
char name[30]; char name[30];
ZERO_ARRAY(name); ZERO_ARRAY(name);
get_ith_table_name(name, sizeof(name), db_index); get_ith_table_name(name, sizeof(name), db_index);
r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0); r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0);
CKERR(r); CKERR(r);
r = db_create(&(arg->dbp[db_index]), arg->env, 0); r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0); assert(r == 0);
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666); r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666);
...@@ -1451,9 +1451,23 @@ static void *test_time(void *arg) { ...@@ -1451,9 +1451,23 @@ static void *test_time(void *arg) {
printf("Sleeping for %d seconds\n", num_seconds); printf("Sleeping for %d seconds\n", num_seconds);
} }
for (int i = 0; i < num_seconds; ) { for (int i = 0; i < num_seconds; ) {
struct timeval tv[2];
const int sleeptime = intmin(tte->cli_args->performance_period, num_seconds - i); const int sleeptime = intmin(tte->cli_args->performance_period, num_seconds - i);
int r = gettimeofday(&tv[0], nullptr);
assert_zero(r);
usleep(sleeptime*1000*1000); usleep(sleeptime*1000*1000);
i += sleeptime; r = gettimeofday(&tv[1], nullptr);
assert_zero(r);
int actual_sleeptime = tv[1].tv_sec - tv[0].tv_sec;
if (abs(actual_sleeptime - sleeptime) <= 1) {
// Close enough, no need to alarm the user, and we didn't check nsec.
i += sleeptime;
} else {
if (verbose) {
printf("tried to sleep %d secs, actually slept %d secs\n", sleeptime, actual_sleeptime);
}
i += actual_sleeptime;
}
if (tte->cli_args->print_performance && tte->cli_args->print_iteration_performance) { if (tte->cli_args->print_performance && tte->cli_args->print_iteration_performance) {
perf_formatter->iteration(tte->cli_args, i, last_counter_values, counters, tte->num_wes); perf_formatter->iteration(tte->cli_args, i, last_counter_values, counters, tte->num_wes);
} }
...@@ -1473,12 +1487,12 @@ static void *test_time(void *arg) { ...@@ -1473,12 +1487,12 @@ static void *test_time(void *arg) {
} }
static int run_workers( static int run_workers(
struct arg *thread_args, struct arg *thread_args,
int num_threads, int num_threads,
uint32_t num_seconds, uint32_t num_seconds,
bool crash_at_end, bool crash_at_end,
struct cli_args* cli_args struct cli_args* cli_args
) )
{ {
int r; int r;
const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format]; const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format];
...@@ -1509,21 +1523,28 @@ static int run_workers( ...@@ -1509,21 +1523,28 @@ static int run_workers(
XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters); XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters);
TOKU_DRD_IGNORE_VAR(worker_extra[i].counters); TOKU_DRD_IGNORE_VAR(worker_extra[i].counters);
{ int chk_r = toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]); CKERR(chk_r); } { int chk_r = toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]); CKERR(chk_r); }
if (verbose) if (verbose)
printf("%lu created\n", (unsigned long) tids[i]); printf("%lu created\n", (unsigned long) tids[i]);
} }
{ int chk_r = toku_pthread_create(&time_tid, NULL, test_time, &tte); CKERR(chk_r); } { int chk_r = toku_pthread_create(&time_tid, NULL, test_time, &tte); CKERR(chk_r); }
if (verbose) if (verbose)
printf("%lu created\n", (unsigned long) time_tid); printf("%lu created\n", (unsigned long) time_tid);
void *ret; void *ret;
r = toku_pthread_join(time_tid, &ret); assert_zero(r); r = toku_pthread_join(time_tid, &ret); assert_zero(r);
if (verbose) printf("%lu joined\n", (unsigned long) time_tid); if (verbose) printf("%lu joined\n", (unsigned long) time_tid);
// Set an alarm that will kill us if it takes too long to join all the
// threads (i.e. there is some runaway thread).
unsigned int remaining = alarm((num_seconds / 10 < 30) ? 30 : num_seconds / 10);
assert_zero(remaining);
for (int i = 0; i < num_threads; ++i) { for (int i = 0; i < num_threads; ++i) {
r = toku_pthread_join(tids[i], &ret); assert_zero(r); r = toku_pthread_join(tids[i], &ret); assert_zero(r);
if (verbose) if (verbose)
printf("%lu joined\n", (unsigned long) tids[i]); printf("%lu joined\n", (unsigned long) tids[i]);
} }
// All threads joined, deschedule the alarm.
remaining = alarm(0);
assert(remaining > 0);
if (cli_args->print_performance) { if (cli_args->print_performance) {
uint64_t *counters[num_threads]; uint64_t *counters[num_threads];
...@@ -1536,7 +1557,7 @@ static int run_workers( ...@@ -1536,7 +1557,7 @@ static int run_workers(
for (int i = 0; i < num_threads; ++i) { for (int i = 0; i < num_threads; ++i) {
toku_free(worker_extra[i].counters); toku_free(worker_extra[i].counters);
} }
if (verbose) if (verbose)
printf("ending test, pthreads have joined\n"); printf("ending test, pthreads have joined\n");
rwlock_destroy(&rwlock); rwlock_destroy(&rwlock);
toku_mutex_destroy(&mutex); toku_mutex_destroy(&mutex);
...@@ -1631,10 +1652,10 @@ static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bu ...@@ -1631,10 +1652,10 @@ static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bu
// key_bufsz and val_bufsz, which were passed into this // key_bufsz and val_bufsz, which were passed into this
// function, not what was stored by the callback // function, not what was stored by the callback
r = db->put( r = db->put(
db, db,
txn, txn,
dbt_init(&key, keybuf, key_bufsz), dbt_init(&key, keybuf, key_bufsz),
dbt_init(&val, valbuf, val_bufsz), dbt_init(&val, valbuf, val_bufsz),
// don't bother taking locks in the locktree // don't bother taking locks in the locktree
DB_PRELOCKED_WRITE DB_PRELOCKED_WRITE
); );
...@@ -1644,7 +1665,9 @@ static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bu ...@@ -1644,7 +1665,9 @@ static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bu
// the caller can checkpoint if they want. // the caller can checkpoint if they want.
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r); r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
txn = nullptr; txn = nullptr;
progress_cb(puts_per_txn); if (verbose) {
progress_cb(puts_per_txn);
}
} }
} }
if (txn) { if (txn) {
...@@ -1699,11 +1722,11 @@ static void fill_table_worker(void *arg) { ...@@ -1699,11 +1722,11 @@ static void fill_table_worker(void *arg) {
struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg); struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
int r = fill_table_from_fun( int r = fill_table_from_fun(
info->env, info->env,
info->db, info->db,
info->num_elements, info->num_elements,
info->key_size, info->key_size,
info->val_size, info->val_size,
zero_element_callback, zero_element_callback,
&info->disperse_keys, &info->disperse_keys,
info->progress_cb info->progress_cb
); );
...@@ -1740,7 +1763,7 @@ static void report_overall_fill_table_progress(int num_rows) { ...@@ -1740,7 +1763,7 @@ static void report_overall_fill_table_progress(int num_rows) {
if (t1 > last_report + minimum_report_period if (t1 > last_report + minimum_report_period
&& toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) { && toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) {
double inserts_per_sec = (rows_so_far*1000000) / ((t1 - t0) * 1.0); double inserts_per_sec = (rows_so_far*1000000) / ((t1 - t0) * 1.0);
printf("fill tables: %ldpct complete, %.2lf rows/sec\n", printf("fill tables: %ld%% complete, %.2lf rows/sec\n",
(long)(progress * 100), inserts_per_sec); (long)(progress * 100), inserts_per_sec);
last_progress = progress; last_progress = progress;
last_report = t1; last_report = t1;
...@@ -1773,7 +1796,7 @@ static int fill_tables_with_zeroes(DB_ENV *env, DB **dbs, int num_DBs, int num_e ...@@ -1773,7 +1796,7 @@ static int fill_tables_with_zeroes(DB_ENV *env, DB **dbs, int num_DBs, int num_e
return 0; return 0;
} }
static void do_xa_recovery(DB_ENV* env) { static void do_xa_recovery(DB_ENV* env) {
DB_PREPLIST preplist[1]; DB_PREPLIST preplist[1];
long num_recovered= 0; long num_recovered= 0;
int r = 0; int r = 0;
...@@ -1813,15 +1836,15 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1813,15 +1836,15 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r); r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r);
if (env_args.generate_put_callback) { if (env_args.generate_put_callback) {
r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback); r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback);
CKERR(r); CKERR(r);
} }
else { else {
r = env->set_generate_row_callback_for_put(env, generate_row_for_put); r = env->set_generate_row_callback_for_put(env, generate_row_for_put);
CKERR(r); CKERR(r);
} }
if (env_args.generate_del_callback) { if (env_args.generate_del_callback) {
r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback); r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback);
CKERR(r); CKERR(r);
} }
int env_flags = get_env_open_flags(cli_args); int env_flags = get_env_open_flags(cli_args);
...@@ -1832,7 +1855,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1832,7 +1855,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r); r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
*env_res = env; *env_res = env;
for (int i = 0; i < num_DBs; i++) { for (int i = 0; i < num_DBs; i++) {
DB *db; DB *db;
char name[30]; char name[30];
...@@ -2507,7 +2529,7 @@ UU() stress_recover(struct cli_args *args) { ...@@ -2507,7 +2529,7 @@ UU() stress_recover(struct cli_args *args) {
stress_int_dbt_cmp, stress_int_dbt_cmp,
args); CKERR(chk_r); } args); CKERR(chk_r); }
DB_TXN* txn = NULL; DB_TXN* txn = NULL;
struct arg recover_args; struct arg recover_args;
arg_init(&recover_args, dbs, env, args); arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type); int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment