Commit 4b5043ea authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

closes #5419 account for actual time slept, not just how long we wanted to...

closes #5419 account for actual time slept, not just how long we wanted to sleep, and set an alarm to make sure it doesn't take too long to end the test


git-svn-id: file:///svn/toku/tokudb@49986 c7de825b-a66e-492c-adef-691d508d4ae1
parent 44a1de02
......@@ -1073,7 +1073,7 @@ struct update_op_args {
int update_pad_frequency;
};
static struct update_op_args UU() get_update_op_args(struct cli_args* cli_args, int* update_history_buffer) {
static struct update_op_args UU() get_update_op_args(struct cli_args* cli_args, int* update_history_buffer) {
struct update_op_args uoe;
uoe.update_history_buffer = update_history_buffer;
uoe.update_pad_frequency = cli_args->num_elements/100; // arbitrary
......@@ -1398,14 +1398,14 @@ static int UU() remove_and_recreate_me(DB_TXN *UU(txn), ARG arg, void* UU(operat
int db_index = myrandom_r(arg->random_data)%arg->cli->num_DBs;
DB* db = arg->dbp[db_index];
r = (db)->close(db, 0); CKERR(r);
char name[30];
ZERO_ARRAY(name);
get_ith_table_name(name, sizeof(name), db_index);
r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0);
r = arg->env->dbremove(arg->env, null_txn, name, NULL, 0);
CKERR(r);
r = db_create(&(arg->dbp[db_index]), arg->env, 0);
assert(r == 0);
r = arg->dbp[db_index]->open(arg->dbp[db_index], null_txn, name, NULL, DB_BTREE, DB_CREATE, 0666);
......@@ -1451,9 +1451,23 @@ static void *test_time(void *arg) {
printf("Sleeping for %d seconds\n", num_seconds);
}
for (int i = 0; i < num_seconds; ) {
struct timeval tv[2];
const int sleeptime = intmin(tte->cli_args->performance_period, num_seconds - i);
int r = gettimeofday(&tv[0], nullptr);
assert_zero(r);
usleep(sleeptime*1000*1000);
i += sleeptime;
r = gettimeofday(&tv[1], nullptr);
assert_zero(r);
int actual_sleeptime = tv[1].tv_sec - tv[0].tv_sec;
if (abs(actual_sleeptime - sleeptime) <= 1) {
// Close enough, no need to alarm the user, and we didn't check nsec.
i += sleeptime;
} else {
if (verbose) {
printf("tried to sleep %d secs, actually slept %d secs\n", sleeptime, actual_sleeptime);
}
i += actual_sleeptime;
}
if (tte->cli_args->print_performance && tte->cli_args->print_iteration_performance) {
perf_formatter->iteration(tte->cli_args, i, last_counter_values, counters, tte->num_wes);
}
......@@ -1473,12 +1487,12 @@ static void *test_time(void *arg) {
}
static int run_workers(
struct arg *thread_args,
int num_threads,
uint32_t num_seconds,
struct arg *thread_args,
int num_threads,
uint32_t num_seconds,
bool crash_at_end,
struct cli_args* cli_args
)
)
{
int r;
const struct perf_formatter *perf_formatter = &perf_formatters[cli_args->perf_output_format];
......@@ -1509,21 +1523,28 @@ static int run_workers(
XCALLOC_N((int) NUM_OPERATION_TYPES, worker_extra[i].counters);
TOKU_DRD_IGNORE_VAR(worker_extra[i].counters);
{ int chk_r = toku_pthread_create(&tids[i], NULL, worker, &worker_extra[i]); CKERR(chk_r); }
if (verbose)
if (verbose)
printf("%lu created\n", (unsigned long) tids[i]);
}
{ int chk_r = toku_pthread_create(&time_tid, NULL, test_time, &tte); CKERR(chk_r); }
if (verbose)
if (verbose)
printf("%lu created\n", (unsigned long) time_tid);
void *ret;
r = toku_pthread_join(time_tid, &ret); assert_zero(r);
if (verbose) printf("%lu joined\n", (unsigned long) time_tid);
// Set an alarm that will kill us if it takes too long to join all the
// threads (i.e. there is some runaway thread).
unsigned int remaining = alarm((num_seconds / 10 < 30) ? 30 : num_seconds / 10);
assert_zero(remaining);
for (int i = 0; i < num_threads; ++i) {
r = toku_pthread_join(tids[i], &ret); assert_zero(r);
if (verbose)
if (verbose)
printf("%lu joined\n", (unsigned long) tids[i]);
}
// All threads joined, deschedule the alarm.
remaining = alarm(0);
assert(remaining > 0);
if (cli_args->print_performance) {
uint64_t *counters[num_threads];
......@@ -1536,7 +1557,7 @@ static int run_workers(
for (int i = 0; i < num_threads; ++i) {
toku_free(worker_extra[i].counters);
}
if (verbose)
if (verbose)
printf("ending test, pthreads have joined\n");
rwlock_destroy(&rwlock);
toku_mutex_destroy(&mutex);
......@@ -1631,10 +1652,10 @@ static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bu
// key_bufsz and val_bufsz, which were passed into this
// function, not what was stored by the callback
r = db->put(
db,
txn,
db,
txn,
dbt_init(&key, keybuf, key_bufsz),
dbt_init(&val, valbuf, val_bufsz),
dbt_init(&val, valbuf, val_bufsz),
// don't bother taking locks in the locktree
DB_PRELOCKED_WRITE
);
......@@ -1644,7 +1665,9 @@ static int fill_table_from_fun(DB_ENV *env, DB *db, int num_elements, int key_bu
// the caller can checkpoint if they want.
r = txn->commit(txn, DB_TXN_NOSYNC); CKERR(r);
txn = nullptr;
progress_cb(puts_per_txn);
if (verbose) {
progress_cb(puts_per_txn);
}
}
}
if (txn) {
......@@ -1699,11 +1722,11 @@ static void fill_table_worker(void *arg) {
struct fill_table_worker_info *CAST_FROM_VOIDP(info, arg);
int r = fill_table_from_fun(
info->env,
info->db,
info->num_elements,
info->key_size,
info->val_size,
zero_element_callback,
info->db,
info->num_elements,
info->key_size,
info->val_size,
zero_element_callback,
&info->disperse_keys,
info->progress_cb
);
......@@ -1740,7 +1763,7 @@ static void report_overall_fill_table_progress(int num_rows) {
if (t1 > last_report + minimum_report_period
&& toku_sync_bool_compare_and_swap(&reporting, 0, 1) == 0) {
double inserts_per_sec = (rows_so_far*1000000) / ((t1 - t0) * 1.0);
printf("fill tables: %ldpct complete, %.2lf rows/sec\n",
printf("fill tables: %ld%% complete, %.2lf rows/sec\n",
(long)(progress * 100), inserts_per_sec);
last_progress = progress;
last_report = t1;
......@@ -1773,7 +1796,7 @@ static int fill_tables_with_zeroes(DB_ENV *env, DB **dbs, int num_DBs, int num_e
return 0;
}
static void do_xa_recovery(DB_ENV* env) {
static void do_xa_recovery(DB_ENV* env) {
DB_PREPLIST preplist[1];
long num_recovered= 0;
int r = 0;
......@@ -1813,15 +1836,15 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
r = env->set_lg_bsize(env, env_args.rollback_node_size); CKERR(r);
if (env_args.generate_put_callback) {
r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback);
r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback);
CKERR(r);
}
else {
r = env->set_generate_row_callback_for_put(env, generate_row_for_put);
r = env->set_generate_row_callback_for_put(env, generate_row_for_put);
CKERR(r);
}
if (env_args.generate_del_callback) {
r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback);
r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback);
CKERR(r);
}
int env_flags = get_env_open_flags(cli_args);
......@@ -1832,7 +1855,6 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = env->cleaner_set_iterations(env, env_args.cleaner_iterations); CKERR(r);
*env_res = env;
for (int i = 0; i < num_DBs; i++) {
DB *db;
char name[30];
......@@ -2507,7 +2529,7 @@ UU() stress_recover(struct cli_args *args) {
stress_int_dbt_cmp,
args); CKERR(chk_r); }
DB_TXN* txn = NULL;
DB_TXN* txn = NULL;
struct arg recover_args;
arg_init(&recover_args, dbs, env, args);
int r = env->txn_begin(env, 0, &txn, recover_args.txn_type);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment