Commit cba16a6d authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

[t:4045] {{{svn merge -r35667:35683 ../tokudb.4045b}}} Refs #4045.

Add Makefile rules for the threaded_stress_tests.
Fix the race conditions found by drd (mostly of which are benign).
drd for test_stress1 now runs clean (for valgrind 3.6.1)


git-svn-id: file:///svn/toku/tokudb@35690 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5a66e8cc
...@@ -720,9 +720,9 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename ...@@ -720,9 +720,9 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM nodename
if (!keep_me) { if (!keep_me) {
uint64_t bytes_freed = brtnode_memory_size(brtnode); uint64_t bytes_freed = brtnode_memory_size(brtnode);
if (brtnode->height == 0) if (brtnode->height == 0)
brt_status.bytes_leaf -= bytes_freed; __sync_fetch_and_sub(&brt_status.bytes_leaf, bytes_freed);
else else
brt_status.bytes_nonleaf -= bytes_freed; __sync_fetch_and_sub(&brt_status.bytes_nonleaf, bytes_freed);
toku_brtnode_free(&brtnode); toku_brtnode_free(&brtnode);
} }
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced); //printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
...@@ -745,9 +745,9 @@ int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM noden ...@@ -745,9 +745,9 @@ int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM noden
} }
// printf("fetch node %"PRIu64"\n", nodename.b); // printf("fetch node %"PRIu64"\n", nodename.b);
if ((*result)->height == 0) if ((*result)->height == 0)
brt_status.bytes_leaf += *sizep; __sync_fetch_and_add(&brt_status.bytes_leaf, *sizep);
else else
brt_status.bytes_nonleaf += *sizep; __sync_fetch_and_add(&brt_status.bytes_nonleaf, *sizep);
return r; return r;
} }
...@@ -890,10 +890,10 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long UU(bytes_to_free), long* by ...@@ -890,10 +890,10 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long UU(bytes_to_free), long* by
*bytes_freed = brtnode_memory_size(node); *bytes_freed = brtnode_memory_size(node);
// TODO: #3874, fix this // TODO: #3874, fix this
if (node->height == 0) { if (node->height == 0) {
brt_status.bytes_leaf -= *bytes_freed; __sync_fetch_and_sub(&brt_status.bytes_leaf, *bytes_freed);
} }
else { else {
brt_status.bytes_nonleaf -= *bytes_freed; __sync_fetch_and_sub(&brt_status.bytes_nonleaf, *bytes_freed);
} }
return 0; return 0;
} }
...@@ -1030,9 +1030,9 @@ int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, lon ...@@ -1030,9 +1030,9 @@ int toku_brtnode_pf_callback(void* brtnode_pv, void* read_extraargs, int fd, lon
*sizep = brtnode_memory_size(node); *sizep = brtnode_memory_size(node);
uint64_t bytes_added = *sizep - orig_size; uint64_t bytes_added = *sizep - orig_size;
if (node->height == 0) if (node->height == 0)
brt_status.bytes_leaf += bytes_added; __sync_fetch_and_add(&brt_status.bytes_leaf, bytes_added);
else else
brt_status.bytes_nonleaf += bytes_added; __sync_fetch_and_add(&brt_status.bytes_nonleaf, bytes_added);
return 0; return 0;
} }
......
...@@ -63,8 +63,8 @@ toku_thread_run(struct toku_thread *thread, void *(*f)(void *arg), void *arg) { ...@@ -63,8 +63,8 @@ toku_thread_run(struct toku_thread *thread, void *(*f)(void *arg), void *arg) {
toku_thread_pool_lock(thread->pool); toku_thread_pool_lock(thread->pool);
thread->f = f; thread->f = f;
thread->arg = arg; thread->arg = arg;
toku_thread_pool_unlock(thread->pool);
r = toku_pthread_cond_signal(&thread->wait); resource_assert_zero(r); r = toku_pthread_cond_signal(&thread->wait); resource_assert_zero(r);
toku_thread_pool_unlock(thread->pool);
} }
static void static void
......
...@@ -111,9 +111,9 @@ static void workqueue_set_closed(WORKQUEUE wq, int dolock) { ...@@ -111,9 +111,9 @@ static void workqueue_set_closed(WORKQUEUE wq, int dolock) {
int r; int r;
if (dolock) workqueue_lock(wq); if (dolock) workqueue_lock(wq);
wq->closed = 1; wq->closed = 1;
if (dolock) workqueue_unlock(wq);
r = toku_pthread_cond_broadcast(&wq->wait_read); assert(r == 0); r = toku_pthread_cond_broadcast(&wq->wait_read); assert(r == 0);
r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0); r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
if (dolock) workqueue_unlock(wq);
} }
// Determine whether or not the work queue is empty // Determine whether or not the work queue is empty
......
...@@ -310,6 +310,27 @@ BDB_DONTRUN_TESTS = \ ...@@ -310,6 +310,27 @@ BDB_DONTRUN_TESTS = \
checkpoint_1.tdb$(BINSUF) checkpoint_stress.tdb$(BINSUF): checkpoint_test.h checkpoint_1.tdb$(BINSUF) checkpoint_stress.tdb$(BINSUF): checkpoint_test.h
# dependency only, not a rule (do not insert a tab at this line) # dependency only, not a rule (do not insert a tab at this line)
DEPENDS_ON_STRESS_HELPERS = \
recover-ft1 \
recover-ft2 \
recover-ft3 \
recover-ft4 \
recover-ft5 \
recover-ft6 \
recover-ft7 \
recover-ft8 \
recover-test_stress1 \
recover-test_stress2 \
test_stress1 \
test_stress2 \
test_stress3 \
test_stress4 \
test_stress5 \
test_stress6 \
#blank
$(patsubst %,%.tdb,$(DEPENDS_ON_STRESS_HELPERS)): threaded_stress_test_helpers.h
BDB_TESTS_THAT_SHOULD_FAIL= \ BDB_TESTS_THAT_SHOULD_FAIL= \
#\ ends prev line #\ ends prev line
ifeq ($(OS_CHOICE),windows) ifeq ($(OS_CHOICE),windows)
......
{
write_problem
drd:ConflictingAccess
fun:write
}
{
pread_problem
drd:ConflictingAccess
fun:pread
}
{
pwrite_problem
drd:ConflictingAccess
fun:pwrite
}
{
pwrite_problem
drd:ConflictingAccess
fun:fsync
}
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include <db.h> #include <db.h>
bool run_test; volatile bool run_test; // should be volatile since we are communicating through this variable.
typedef struct arg *ARG; typedef struct arg *ARG;
typedef int (*operation_t)(DB_ENV *env, DB** dbp, DB_TXN *txn, ARG arg); typedef int (*operation_t)(DB_ENV *env, DB** dbp, DB_TXN *txn, ARG arg);
...@@ -504,9 +504,10 @@ static void *test_time(void *arg) { ...@@ -504,9 +504,10 @@ static void *test_time(void *arg) {
// if num_Seconds is set to 0, run indefinitely // if num_Seconds is set to 0, run indefinitely
// //
if (num_seconds != 0) { if (num_seconds != 0) {
if (verbose) printf("Sleeping for %d seconds\n", num_seconds);
usleep(num_seconds*1000*1000); usleep(num_seconds*1000*1000);
if (verbose) printf("should now end test\n"); if (verbose) printf("should now end test\n");
run_test = false; __sync_bool_compare_and_swap(&run_test, true, false); // make this atomic to make valgrind --tool=drd happy.
if (tte->crash_at_end) { if (tte->crash_at_end) {
toku_hard_crash_on_purpose(); toku_hard_crash_on_purpose();
} }
...@@ -540,6 +541,7 @@ static int run_workers(struct arg *thread_args, int num_threads, u_int32_t num_s ...@@ -540,6 +541,7 @@ static int run_workers(struct arg *thread_args, int num_threads, u_int32_t num_s
} }
rwlock_destroy(&rwlock); rwlock_destroy(&rwlock);
if (verbose) printf("ending test, pthreads have joined\n"); if (verbose) printf("ending test, pthreads have joined\n");
toku_pthread_mutex_destroy(&mutex);
return r; return r;
} }
...@@ -558,6 +560,7 @@ static int create_table(DB_ENV **env_res, DB **db_res, ...@@ -558,6 +560,7 @@ static int create_table(DB_ENV **env_res, DB **db_res,
DB_ENV *env; DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0); r = db_env_create(&env, 0); assert(r == 0);
r = env->set_redzone(env, 0); CKERR(r);
r=env->set_default_bt_compare(env, bt_compare); CKERR(r); r=env->set_default_bt_compare(env, bt_compare); CKERR(r);
r = env->set_cachesize(env, 0, cachesize, 1); CKERR(r); r = env->set_cachesize(env, 0, cachesize, 1); CKERR(r);
r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r); r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r);
...@@ -646,6 +649,7 @@ static int open_table(DB_ENV **env_res, DB **db_res, ...@@ -646,6 +649,7 @@ static int open_table(DB_ENV **env_res, DB **db_res,
/* create the dup database file */ /* create the dup database file */
DB_ENV *env; DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0); r = db_env_create(&env, 0); assert(r == 0);
r = env->set_redzone(env, 0); CKERR(r);
r=env->set_default_bt_compare(env, bt_compare); CKERR(r); r=env->set_default_bt_compare(env, bt_compare); CKERR(r);
env->set_update(env, f); env->set_update(env, f);
// set the cache size to 10MB // set the cache size to 10MB
...@@ -719,14 +723,14 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct ...@@ -719,14 +723,14 @@ static inline void parse_stress_test_args (int argc, char *const argv[], struct
do_usage: do_usage:
fprintf(stderr, "Usage:\n%s [-h|-v|-q] [OPTIONS] [--only_create|--only_stress]\n", argv0); fprintf(stderr, "Usage:\n%s [-h|-v|-q] [OPTIONS] [--only_create|--only_stress]\n", argv0);
fprintf(stderr, "OPTIONS are among:\n"); fprintf(stderr, "OPTIONS are among:\n");
fprintf(stderr, "\t--num_elements INT\n"); fprintf(stderr, "\t--num_elements INT (default %d)\n", DEFAULT_ARGS.num_elements);
fprintf(stderr, "\t--num_seconds INT\n"); fprintf(stderr, "\t--num_seconds INT (default %ds)\n", DEFAULT_ARGS.time_of_test);
fprintf(stderr, "\t--node_size INT\n"); fprintf(stderr, "\t--node_size INT (default %d bytes)\n", DEFAULT_ARGS.node_size);
fprintf(stderr, "\t--basement_node_size INT\n"); fprintf(stderr, "\t--basement_node_size INT (default %d bytes)\n", DEFAULT_ARGS.basement_node_size);
fprintf(stderr, "\t--cachetable_size INT\n"); fprintf(stderr, "\t--cachetable_size INT (default %ld bytes)\n", DEFAULT_ARGS.cachetable_size);
fprintf(stderr, "\t--checkpointing_period INT\n"); fprintf(stderr, "\t--checkpointing_period INT (default %ds)\n", DEFAULT_ARGS.checkpointing_period);
fprintf(stderr, "\t--update_broadcast_period INT\n"); fprintf(stderr, "\t--update_broadcast_period INT (default %dms)\n", DEFAULT_ARGS.update_broadcast_period_ms);
fprintf(stderr, "\t--num_ptquery_threads INT\n"); fprintf(stderr, "\t--num_ptquery_threads INT (default %d threads)\n", DEFAULT_ARGS.num_ptquery_threads);
exit(resultcode); exit(resultcode);
} }
else if (strcmp(argv[1], "--num_elements") == 0) { else if (strcmp(argv[1], "--num_elements") == 0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment