Commit 414ad7e5 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4687], a hot indexing stress test, very simple, still needs scans after hot...

[t:4687], a hot indexing stress test, very simple, still needs scans after hot indexing work is done to verify correctness

git-svn-id: file:///svn/toku/tokudb@44551 c7de825b-a66e-492c-adef-691d508d4ae1
parent 38ae050c
...@@ -468,8 +468,12 @@ indexer_generate_hot_key_val(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, UXRH ...@@ -468,8 +468,12 @@ indexer_generate_hot_key_val(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, UXRH
// generate the secondary row // generate the secondary row
DB_ENV *env = indexer->i->env; DB_ENV *env = indexer->i->env;
if (hotval) {
result = env->i->generate_row_for_put(hotdb, indexer->i->src_db, hotkey, hotval, &srckey, &srcval); result = env->i->generate_row_for_put(hotdb, indexer->i->src_db, hotkey, hotval, &srckey, &srcval);
}
else {
result = env->i->generate_row_for_del(hotdb, indexer->i->src_db, hotkey, &srckey, &srcval);
}
toku_destroy_dbt(&srckey); toku_destroy_dbt(&srckey);
toku_destroy_dbt(&srcval); toku_destroy_dbt(&srcval);
......
...@@ -239,6 +239,7 @@ if(BUILD_TESTING) ...@@ -239,6 +239,7 @@ if(BUILD_TESTING)
test_stress6.c test_stress6.c
test_stress7.c test_stress7.c
test_stress_with_verify.c test_stress_with_verify.c
test_stress_hot_indexing.c
test_transactional_descriptor.c test_transactional_descriptor.c
test_trans_desc_during_chkpt.c test_trans_desc_during_chkpt.c
test_trans_desc_during_chkpt2.c test_trans_desc_during_chkpt2.c
...@@ -429,6 +430,7 @@ if(BUILD_TESTING) ...@@ -429,6 +430,7 @@ if(BUILD_TESTING)
configure_file(run_stress_test.sh . COPYONLY) configure_file(run_stress_test.sh . COPYONLY)
foreach(src ${stress_test_srcs}) foreach(src ${stress_test_srcs})
if(NOT ${src} MATCHES hot_index)
string(REGEX REPLACE "\\.c$" ".tdb" test "${src}") string(REGEX REPLACE "\\.c$" ".tdb" test "${src}")
add_test(${test} ${CMAKE_CFG_INTDIR}/run_stress_test.sh $<TARGET_FILE:${test}> 150000 1000) add_test(${test} ${CMAKE_CFG_INTDIR}/run_stress_test.sh $<TARGET_FILE:${test}> 150000 1000)
...@@ -441,6 +443,7 @@ if(BUILD_TESTING) ...@@ -441,6 +443,7 @@ if(BUILD_TESTING)
add_custom_executable(large ${test} ${src}) add_custom_executable(large ${test} ${src})
add_drd_test(drd_large_${test} ${CMAKE_CFG_INTDIR}/run_stress_test.sh $<TARGET_FILE:large_${test}> 150000 1000) add_drd_test(drd_large_${test} ${CMAKE_CFG_INTDIR}/run_stress_test.sh $<TARGET_FILE:large_${test}> 150000 1000)
set_tests_properties(drd_large_${test} PROPERTIES TIMEOUT 28800) set_tests_properties(drd_large_${test} PROPERTIES TIMEOUT 28800)
endif()
endforeach(src) endforeach(src)
## for some reason this rule doesn't run with the makefile and it crashes with this rule, so I'm disabling this special case ## for some reason this rule doesn't run with the makefile and it crashes with this rule, so I'm disabling this special case
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#ident "$Id: test_stress1.c 35109 2011-09-27 18:41:25Z leifwalsh $"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
DB* hot_db;
toku_mutex_t fops_lock;
toku_mutex_t hi_lock;
static int
hi_put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data) {
lazy_assert(src_db != NULL && dest_db != NULL);
if (dest_key->data) {
toku_free(dest_key->data);
dest_key->data = NULL;
}
if (dest_data->data) {
toku_free(dest_data->data);
dest_data->data = NULL;
}
dest_key->data = toku_xmemdup(src_key->data, src_key->size);
dest_key->size = src_key->size;
dest_data->data = toku_xmemdup(src_data->data, src_data->size);
dest_data->size = src_data->size;
return 0;
}
static int
hi_del_callback(DB *dest_db, DB *src_db, DBT *dest_key, const DBT *src_key, const DBT* UU(src_data)) {
lazy_assert(src_db != NULL && dest_db != NULL);
if (dest_key->data) {
toku_free(dest_key->data);
dest_key->data = NULL;
}
dest_key->data = toku_xmemdup(src_key->data, src_key->size);
dest_key->size = src_key->size;
return 0;
}
static int hi_inserts(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void *stats_extra) {
int r;
DB_TXN* hi_txn = NULL;
toku_mutex_lock(&fops_lock);
DB_ENV* env = arg->env;
DB* db = arg->dbp[0];
u_int32_t flags[2];
flags[0] = 0;
flags[1] = 0;
DBT dest_keys[2];
DBT dest_vals[2];
memset(dest_keys, 0, sizeof(dest_keys));
memset(dest_vals, 0, sizeof(dest_vals));
int i;
r = env->txn_begin(env, NULL, &hi_txn, 0);
CKERR(r);
for (i = 0; i < 1000; i++) {
DB* dbs[2];
toku_mutex_lock(&hi_lock);
dbs[0] = db;
dbs[1] = hot_db;
int num_dbs = hot_db ? 2 : 1;
// do a random insertion
int rand_key = random() % arg->cli->num_elements;
int rand_val = random();
DBT key, val;
dbt_init(&key, &rand_key, sizeof(rand_key)),
dbt_init(&val, &rand_val, sizeof(rand_val)),
r = env->put_multiple(
env,
db,
hi_txn,
&key,
&val,
num_dbs,
dbs,
dest_keys,
dest_vals,
flags
);
toku_mutex_unlock(&hi_lock);
if (r != 0) {
goto cleanup;
}
}
cleanup:
if (dest_keys[0].data) {
toku_free(dest_keys[0].data);
}
if (dest_keys[1].data) {
toku_free(dest_keys[1].data);
}
if (dest_vals[0].data) {
toku_free(dest_vals[0].data);
}
if (dest_vals[1].data) {
toku_free(dest_vals[1].data);
}
increment_counter(stats_extra, PUTS, i);
if (r) {
int rr = hi_txn->abort(hi_txn);
CKERR(rr);
}
else {
int rr = hi_txn->commit(hi_txn, 0);
CKERR(rr);
}
toku_mutex_unlock(&fops_lock);
return r;
}
static int hi_create_index(DB_TXN* UU(txn), ARG arg, void* UU(operation_extra), void* UU(stats_extra)) {
int r;
DB_TXN* hi_txn = NULL;
DB_ENV* env = arg->env;
DB_INDEXER* indexer = NULL;
r = env->txn_begin(env, NULL, &hi_txn, 0);
CKERR(r);
toku_mutex_lock(&hi_lock);
assert(hot_db == NULL);
db_create(&hot_db, env, 0);
CKERR(r);
r = hot_db->set_flags(hot_db, 0);
CKERR(r);
r = hot_db->set_pagesize(hot_db, arg->cli->env_args.node_size);
CKERR(r);
r = hot_db->set_readpagesize(hot_db, arg->cli->env_args.basement_node_size);
CKERR(r);
r = hot_db->open(hot_db, NULL, "hotindex_db", NULL, DB_BTREE, DB_CREATE | DB_IS_HOT_INDEX, 0666);
CKERR(r);
u_int32_t db_flags = 0;
u_int32_t indexer_flags = 0;
r = env->create_indexer(
env,
hi_txn,
&indexer,
arg->dbp[0],
1,
&hot_db,
&db_flags,
indexer_flags
);
CKERR(r);
toku_mutex_unlock(&hi_lock);
r = indexer->build(indexer);
CKERR(r);
toku_mutex_lock(&hi_lock);
r = indexer->close(indexer);
CKERR(r);
toku_mutex_unlock(&hi_lock);
r = hi_txn->commit(hi_txn, 0);
CKERR(r);
// now do some scans
// grab lock and close hot_db, set it to NULL
toku_mutex_lock(&hi_lock);
r = hot_db->close(hot_db, 0);
CKERR(r);
hot_db = NULL;
toku_mutex_unlock(&hi_lock);
toku_mutex_lock(&fops_lock);
r = env->dbremove(env, NULL, "hotindex_db", NULL, 0);
toku_mutex_unlock(&fops_lock);
CKERR(r);
return 0;
}
//
// purpose of this stress test is to do a bunch of splitting and merging
// and run db->verify periodically to make sure the db is in a
// a good state
//
static void
stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = 2;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
}
myargs[0].operation = hi_inserts;
myargs[1].operation = hi_create_index;
run_workers(myargs, num_threads, cli_args->time_of_test, false, cli_args);
}
int
test_main(int argc, char *const argv[]) {
toku_mutex_init(&hi_lock, NULL);
toku_mutex_init(&fops_lock, NULL);
hot_db = NULL;
struct cli_args args = get_default_args();
// let's make default checkpointing period really slow
args.num_ptquery_threads = 0;
parse_stress_test_args(argc, argv, &args);
args.num_DBs = 1;
args.crash_on_operation_failure = FALSE;
args.env_args.generate_del_callback = hi_del_callback;
args.env_args.generate_put_callback = hi_put_callback;
stress_test_main(&args);
toku_mutex_destroy(&hi_lock);
toku_mutex_destroy(&fops_lock);
return 0;
}
...@@ -94,6 +94,8 @@ typedef struct arg *ARG; ...@@ -94,6 +94,8 @@ typedef struct arg *ARG;
typedef int (*operation_t)(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra); typedef int (*operation_t)(DB_TXN *txn, ARG arg, void *operation_extra, void *stats_extra);
typedef int (*test_update_callback_f)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra); typedef int (*test_update_callback_f)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra);
typedef int (*test_generate_row_for_put_callback)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *src_key, const DBT *src_data);
typedef int (*test_generate_row_for_del_callback)(DB *dest_db, DB *src_db, DBT *dest_key, const DBT *src_key, const DBT *src_data);
enum stress_lock_type { enum stress_lock_type {
STRESS_LOCK_NONE = 0, STRESS_LOCK_NONE = 0,
...@@ -110,6 +112,8 @@ struct env_args { ...@@ -110,6 +112,8 @@ struct env_args {
u_int64_t cachetable_size; u_int64_t cachetable_size;
char *envdir; char *envdir;
test_update_callback_f update_function; // update callback function test_update_callback_f update_function; // update callback function
test_generate_row_for_put_callback generate_put_callback;
test_generate_row_for_del_callback generate_del_callback;
}; };
enum perf_output_format { enum perf_output_format {
...@@ -288,9 +292,6 @@ static void print_perf_iteration(struct cli_args *cli_args, const int current_ti ...@@ -288,9 +292,6 @@ static void print_perf_iteration(struct cli_args *cli_args, const int current_ti
fmt->print_perf_thread_iterations_header(current_time, operation_names[op]); fmt->print_perf_thread_iterations_header(current_time, operation_names[op]);
} }
for (int t = 0; t < num_threads; ++t) { for (int t = 0; t < num_threads; ++t) {
if (counters[t][op] == 0) {
continue;
}
const uint64_t last = last_counters[t][op]; const uint64_t last = last_counters[t][op];
const uint64_t current = counters[t][op]; const uint64_t current = counters[t][op];
if (cli_args->print_thread_performance) { if (cli_args->print_thread_performance) {
...@@ -1200,7 +1201,18 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1200,7 +1201,18 @@ static int create_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
r = env->set_redzone(env, 0); CKERR(r); r = env->set_redzone(env, 0); CKERR(r);
r = env->set_default_bt_compare(env, bt_compare); CKERR(r); r = env->set_default_bt_compare(env, bt_compare); CKERR(r);
r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r); if (env_args.generate_put_callback) {
r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback);
CKERR(r);
}
else {
r = env->set_generate_row_callback_for_put(env, generate_row_for_put);
CKERR(r);
}
if (env_args.generate_del_callback) {
r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback);
CKERR(r);
}
r = env->open(env, env_args.envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, env_args.envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r); r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r); r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
...@@ -1298,7 +1310,18 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs, ...@@ -1298,7 +1310,18 @@ static int open_tables(DB_ENV **env_res, DB **db_res, int num_DBs,
env->set_update(env, env_args.update_function); env->set_update(env, env_args.update_function);
// set the cache size to 10MB // set the cache size to 10MB
r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r); r = env->set_cachesize(env, env_args.cachetable_size / (1 << 30), env_args.cachetable_size % (1 << 30), 1); CKERR(r);
r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r); if (env_args.generate_put_callback) {
r = env->set_generate_row_callback_for_put(env, env_args.generate_put_callback);
CKERR(r);
}
else {
r = env->set_generate_row_callback_for_put(env, generate_row_for_put);
CKERR(r);
}
if (env_args.generate_del_callback) {
r = env->set_generate_row_callback_for_del(env, env_args.generate_del_callback);
CKERR(r);
}
r = env->open(env, env_args.envdir, DB_RECOVER|DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, env_args.envdir, DB_RECOVER|DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r); r = env->checkpointing_set_period(env, env_args.checkpointing_period); CKERR(r);
r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r); r = env->cleaner_set_period(env, env_args.cleaner_period); CKERR(r);
...@@ -1338,6 +1361,8 @@ static const struct env_args DEFAULT_ENV_ARGS = { ...@@ -1338,6 +1361,8 @@ static const struct env_args DEFAULT_ENV_ARGS = {
.cachetable_size = 300000, .cachetable_size = 300000,
.envdir = ENVDIR, .envdir = ENVDIR,
.update_function = update_op_callback, .update_function = update_op_callback,
.generate_put_callback = NULL,
.generate_del_callback = NULL,
}; };
static const struct env_args DEFAULT_PERF_ENV_ARGS = { static const struct env_args DEFAULT_PERF_ENV_ARGS = {
...@@ -1349,6 +1374,8 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = { ...@@ -1349,6 +1374,8 @@ static const struct env_args DEFAULT_PERF_ENV_ARGS = {
.cachetable_size = 1<<30, .cachetable_size = 1<<30,
.envdir = ENVDIR, .envdir = ENVDIR,
.update_function = NULL, .update_function = NULL,
.generate_put_callback = NULL,
.generate_del_callback = NULL,
}; };
#define MIN_VAL_SIZE sizeof(int) #define MIN_VAL_SIZE sizeof(int)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment