Commit 5c0dce7c authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#2513 merge the cached threadpool to main refs[t:2513]

git-svn-id: file:///svn/toku/tokudb@24627 c7de825b-a66e-492c-adef-691d508d4ae1
parent 26ca9896
......@@ -58,7 +58,30 @@ toku_os_get_number_processors(void) {
int
toku_os_get_number_active_processors(void) {
return sysconf(_SC_NPROCESSORS_ONLN);
int n = sysconf(_SC_NPROCESSORS_ONLN);
#define DO_AFFINITY 1
#if DO_AFFINITY
#include <sched.h>
cpu_set_t cpuset;
int r = sched_getaffinity(getpid(), sizeof cpuset, &cpuset);
assert(r == 0);
int nn = 0;
for (unsigned i = 0; i < 8 * sizeof cpuset; i++)
if (CPU_ISSET(i, &cpuset))
nn++;
assert(nn <= n);
n = nn;
#endif
#define DO_TOKU_NCPUS 1
#if DO_TOKU_NCPUS
char *toku_ncpus = getenv("TOKU_NCPUS");
if (toku_ncpus) {
int ncpus = atoi(toku_ncpus);
if (ncpus < n)
n = ncpus;
}
#endif
return n;
}
int
......
......@@ -78,8 +78,8 @@ BRT_SOURCES = \
sub_block \
ule \
threadpool \
toku_worker \
txn \
workqueue \
x1764 \
xids \
ybt \
......
......@@ -5,6 +5,7 @@
#include "includes.h"
#include "toku_atomic.h"
#include "threadpool.h"
static BRT_UPGRADE_STATUS_S upgrade_status; // accountability, used in backwards_x.c
......@@ -13,9 +14,6 @@ toku_brt_get_upgrade_status (BRT_UPGRADE_STATUS s) {
*s = upgrade_status;
}
// performance tracing
#define DO_TOKU_TRACE 0
#if DO_TOKU_TRACE
......@@ -30,15 +28,18 @@ static inline void do_toku_trace(const char *cp, int len) {
#endif
static int num_cores = 0; // cache the number of cores for the parallelization
static struct toku_thread_pool *brt_pool = NULL;
int
toku_brt_serialize_init(void) {
num_cores = toku_os_get_number_processors();
num_cores = toku_os_get_number_active_processors();
int r = toku_thread_pool_create(&brt_pool, num_cores); assert(r == 0);
return 0;
}
int
toku_brt_serialize_destroy(void) {
toku_thread_pool_destroy(&brt_pool);
return 0;
}
......@@ -440,7 +441,7 @@ serialize_uncompressed_block_to_memory(char * uncompressed_buf,
// compress all of the sub blocks
char *uncompressed_ptr = uncompressed_buf + node_header_overhead;
char *compressed_ptr = compressed_buf + header_len;
compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores);
compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores, brt_pool);
//if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len);
......@@ -596,6 +597,7 @@ deserialize_child_buffer_worker(void *arg) {
break;
deserialize_child_buffer(dw->node, dw->cnum, &dw->rb, &dw->local_fingerprint);
}
workset_release_ref(ws);
return arg;
}
......@@ -626,11 +628,11 @@ deserialize_all_child_buffers(BRTNODE result, struct rbuf *rbuf, struct sub_bloc
// deserialize the fifos
if (0) printf("%s:%d T=%d N=%d %d\n", __FUNCTION__, __LINE__, T, result->u.n.n_children, n_nonempty_fifos);
toku_pthread_t tids[T];
threadset_create(tids, &T, deserialize_child_buffer_worker, &ws);
toku_thread_pool_run(brt_pool, 0, &T, deserialize_child_buffer_worker, &ws);
workset_add_ref(&ws, T);
deserialize_child_buffer_worker(&ws);
workset_join(&ws);
threadset_join(tids, T);
// combine the fingerprints and update the buffer counts
uint32_t check_local_fingerprint = 0;
for (int i = 0; i < result->u.n.n_children; i++) {
......@@ -915,7 +917,7 @@ decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size,
unsigned char *uncompressed_data = rb->buf + node_header_overhead;
// decompress all the compressed sub blocks into the uncompressed buffer
r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores);
r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores, brt_pool);
assert(r == 0);
toku_trace("decompress done");
......
......@@ -2842,7 +2842,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
// compress and checksum the sub blocks
int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block,
(char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning),
(char *) (compressed_buf + header_len), 1);
(char *) (compressed_buf + header_len), 1, NULL);
// cppy the uncompressed header to the compressed buffer
memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
......
......@@ -15,7 +15,6 @@
#include "threadpool.h"
#include "cachetable.h"
#include "rwlock.h"
#include "toku_worker.h"
#include "log_header.h"
#include "checkpoint.h"
#include "minicron.h"
......@@ -684,7 +683,7 @@ WORKQUEUE toku_cachetable_get_workqueue(CACHETABLE ct) {
void toku_cachefile_get_workqueue_load (CACHEFILE cf, int *n_in_queue, int *n_threads) {
CACHETABLE ct = cf->cachetable;
*n_in_queue = workqueue_n_in_queue(&ct->wq, 1);
*n_threads = threadpool_get_current_threads(ct->threadpool);
*n_threads = toku_thread_pool_get_current_threads(ct->threadpool);
}
//Test-only function
......
......@@ -10,6 +10,7 @@
#include "toku_assert.h"
#include "x1764.h"
#include "threadpool.h"
#include "sub_block.h"
void
......@@ -147,11 +148,12 @@ compress_worker(void *arg) {
break;
compress_sub_block(w->sub_block);
}
workset_release_ref(ws);
return arg;
}
size_t
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores) {
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores, struct toku_thread_pool *pool) {
char *compressed_base_ptr = compressed_ptr;
size_t compressed_len;
......@@ -186,12 +188,12 @@ compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *un
// compress the sub-blocks
if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks);
toku_pthread_t tids[T];
threadset_create(tids, &T, compress_worker, &ws);
toku_thread_pool_run(pool, 0, &T, compress_worker, &ws);
workset_add_ref(&ws, T);
compress_worker(&ws);
// wait for all of the work to complete
threadset_join(tids, T);
workset_join(&ws);
// squeeze out the holes not used by the compress bound
compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size;
......@@ -246,11 +248,12 @@ decompress_worker(void *arg) {
break;
dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum);
}
workset_release_ref(ws);
return arg;
}
int
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores) {
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores, struct toku_thread_pool *pool) {
int r;
if (n_sub_blocks == 1) {
......@@ -281,12 +284,12 @@ decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsign
// decompress the sub-blocks
if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T);
toku_pthread_t tids[T];
threadset_create(tids, &T, decompress_worker, &ws);
toku_thread_pool_run(pool, 0, &T, decompress_worker, &ws);
workset_add_ref(&ws, T);
decompress_worker(&ws);
// cleanup
threadset_join(tids, T);
workset_join(&ws);
workset_destroy(&ws);
r = 0;
......
......@@ -74,7 +74,7 @@ void *
compress_worker(void *arg);
size_t
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores);
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores, struct toku_thread_pool *pool);
struct decompress_work {
struct work base;
......@@ -104,10 +104,10 @@ decompress_worker(void *arg);
// decompress all sub blocks from the compressed_data buffer to the uncompressed_data buffer
// Returns 0 if success, otherwise an error
int
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores);
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores, struct toku_thread_pool *pool);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
}
#endif
#endif
......@@ -9,9 +9,8 @@
static TOKUTXN const null_txn = 0;
static DB * const null_db = 0;
static void test_flat (void) {
static void test_flat (u_int64_t limit) {
char fname[]= __FILE__ ".brt";
const u_int64_t limit=10000;
u_int64_t permute[limit];
unlink(fname);
CACHETABLE ct;
......@@ -67,7 +66,8 @@ static void test_flat (void) {
int
test_main (int argc , const char *argv[]) {
#define DO_AFFINITY 0
u_int64_t limit = 10000;
#define DO_AFFINITY 1
#if DO_AFFINITY == 0
default_parse_args(argc, argv);
#else
......@@ -83,6 +83,10 @@ test_main (int argc , const char *argv[]) {
ncpus = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--limit") == 0 && i+1 < argc) {
limit = atoi(argv[++i]);
continue;
}
break;
}
......@@ -103,7 +107,7 @@ test_main (int argc , const char *argv[]) {
}
#endif
test_flat();
test_flat(limit);
if (verbose) printf("test ok\n");
return 0;
......
......@@ -23,7 +23,7 @@ set_uint8_at_offset(void *vp, size_t offset, uint8_t newv) {
}
static void
test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_cores) {
test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_cores, struct toku_thread_pool *pool) {
if (verbose)
printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, total_size, my_max_sub_blocks);
......@@ -42,7 +42,7 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
void *cbuf = toku_malloc(cbuf_size_bound);
assert(cbuf);
size_t cbuf_size = compress_all_sub_blocks(n_sub_blocks, sub_blocks, buf, cbuf, n_cores);
size_t cbuf_size = compress_all_sub_blocks(n_sub_blocks, sub_blocks, buf, cbuf, n_cores, pool);
assert(cbuf_size <= cbuf_size_bound);
void *ubuf = toku_malloc(total_size);
......@@ -52,13 +52,13 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
// corrupt a checksum
sub_blocks[xidx].xsum += 1;
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r != 0);
// reset the checksums
sub_blocks[xidx].xsum -= 1;
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0);
......@@ -67,13 +67,13 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
unsigned char c = get_uint8_at_offset(cbuf, offset);
set_uint8_at_offset(cbuf, offset, c+1);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r != 0);
// reset the data
set_uint8_at_offset(cbuf, offset, c);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0);
}
......@@ -90,16 +90,16 @@ set_random(void *buf, int total_size) {
}
static void
run_test(int total_size, int n_cores) {
run_test(int total_size, int n_cores, struct toku_thread_pool *pool) {
void *buf = toku_malloc(total_size);
assert(buf);
for (int my_max_sub_blocks = 1; my_max_sub_blocks <= max_sub_blocks; my_max_sub_blocks++) {
memset(buf, 0, total_size);
test_sub_block_checksum(buf, total_size, my_max_sub_blocks, n_cores);
test_sub_block_checksum(buf, total_size, my_max_sub_blocks, n_cores, pool);
set_random(buf, total_size);
test_sub_block_checksum(buf, total_size, my_max_sub_blocks, n_cores);
test_sub_block_checksum(buf, total_size, my_max_sub_blocks, n_cores, pool);
}
toku_free(buf);
......@@ -129,11 +129,16 @@ test_main (int argc, const char *argv[]) {
}
}
struct toku_thread_pool *pool = NULL;
int r = toku_thread_pool_create(&pool, 8); assert(r == 0);
for (int total_size = 256*1024; total_size <= 4*1024*1024; total_size *= 2) {
for (int size = total_size - e; size <= total_size + e; size++) {
run_test(size, n_cores);
run_test(size, n_cores, pool);
}
}
toku_thread_pool_destroy(&pool);
return 0;
}
......@@ -30,13 +30,13 @@ test_sub_block_compression(void *buf, int total_size, int my_max_sub_blocks, int
void *cbuf = toku_malloc(cbuf_size_bound);
assert(cbuf);
size_t cbuf_size = compress_all_sub_blocks(n_sub_blocks, sub_blocks, buf, cbuf, n_cores);
size_t cbuf_size = compress_all_sub_blocks(n_sub_blocks, sub_blocks, buf, cbuf, n_cores, NULL);
assert(cbuf_size <= cbuf_size_bound);
void *ubuf = toku_malloc(total_size);
assert(ubuf);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, NULL);
assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0);
......
......@@ -25,7 +25,7 @@ struct my_threadpool {
static void
my_threadpool_init (struct my_threadpool *my_threadpool, int max_threads) {
int r;
r = threadpool_create(&my_threadpool->threadpool, max_threads); assert(r == 0);
r = toku_thread_pool_create(&my_threadpool->threadpool, max_threads); assert(r == 0);
assert(my_threadpool != 0);
r = toku_pthread_mutex_init(&my_threadpool->mutex, 0); assert(r == 0);
r = toku_pthread_cond_init(&my_threadpool->wait, 0); assert(r == 0);
......@@ -41,8 +41,8 @@ my_threadpool_destroy (struct my_threadpool *my_threadpool, int max_threads) {
r = toku_pthread_cond_broadcast(&my_threadpool->wait); assert(r == 0);
r = toku_pthread_mutex_unlock(&my_threadpool->mutex); assert(r == 0);
if (verbose) printf("current %d\n", threadpool_get_current_threads(my_threadpool->threadpool));
threadpool_destroy(&my_threadpool->threadpool); assert(my_threadpool->threadpool == 0);
if (verbose) printf("current %d\n", toku_thread_pool_get_current_threads(my_threadpool->threadpool));
toku_thread_pool_destroy(&my_threadpool->threadpool); assert(my_threadpool->threadpool == 0);
assert(my_threadpool->counter == max_threads);
r = toku_pthread_mutex_destroy(&my_threadpool->mutex); assert(r == 0);
r = toku_pthread_cond_destroy(&my_threadpool->wait); assert(r == 0);
......@@ -117,10 +117,11 @@ test_main (int argc, const char *argv[]) {
threadpool = my_threadpool.threadpool;
if (verbose) printf("test threadpool_set_busy\n");
for (i=0; i<2*max_threads; i++) {
assert(threadpool_get_current_threads(threadpool) == (i >= max_threads ? max_threads : i));
threadpool_maybe_add(threadpool, my_thread_f, &my_threadpool);
assert(toku_thread_pool_get_current_threads(threadpool) == (i >= max_threads ? max_threads : i));
int n = 1;
toku_thread_pool_run(threadpool, 0, &n, my_thread_f, &my_threadpool);
}
assert(threadpool_get_current_threads(threadpool) == max_threads);
assert(toku_thread_pool_get_current_threads(threadpool) == max_threads);
my_threadpool_destroy(&my_threadpool, max_threads);
#if DO_MALLOC_HOOK
......@@ -133,8 +134,8 @@ test_main (int argc, const char *argv[]) {
void *(*orig_malloc_hook) (size_t, const __malloc_ptr_t) = __malloc_hook;
__malloc_hook = my_malloc_always_fails;
int r;
r = threadpool_create(&threadpool, 0); assert(r == ENOMEM);
r = threadpool_create(&threadpool, 1); assert(r == ENOMEM);
r = toku_thread_pool_create(&threadpool, 0); assert(r == ENOMEM);
r = toku_thread_pool_create(&threadpool, 1); assert(r == ENOMEM);
__malloc_hook = orig_malloc_hook;
}
#endif
......
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include "threadpool.h"
int verbose = 0;
static int usage(int ncpus, int poolsize) {
fprintf(stderr, "[-q] [-v] [--verbose] (%d)\n", verbose);
fprintf(stderr, "[--ncpus %d]\n", ncpus);
fprintf(stderr, "[--poolsize %d]\n", poolsize);
return 1;
}
static void *f(void *arg) {
return arg;
}
static void dotest(int poolsize, int nloops) {
int r;
struct toku_thread_pool *pool = NULL;
r = toku_thread_pool_create(&pool, poolsize);
assert(r == 0 && pool != NULL);
int i;
for (i = 0; i < nloops; i++) {
int n = 1;
r = toku_thread_pool_run(pool, 1, &n, f, NULL);
assert(r == 0);
}
if (verbose)
toku_thread_pool_print(pool, stderr);
toku_thread_pool_destroy(&pool);
}
int main(int argc, char *argv[]) {
// defaults
int ncpus = 0;
int poolsize = 1;
int nloops = 100000;
// options
int i;
for (i = 1; i < argc; i++) {
char *arg = argv[i];
if (arg[0] != '-')
break;
if (strcmp(arg, "--ncpus") == 0 && i+1 < argc) {
ncpus = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--poolsize") == 0 && i+1 < argc) {
poolsize = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "-v") == 0 || strcmp(arg, "--verbose") == 0) {
verbose = verbose+1;
continue;
}
if (strcmp(arg, "-q") == 0) {
verbose = verbose > 0 ? verbose-1 : 0;
continue;
}
return usage(ncpus, poolsize);
}
int starti = i;
if (ncpus > 0) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (i = 0; i < ncpus; i++)
CPU_SET(i, &cpuset);
int r;
r = sched_setaffinity(getpid(), sizeof cpuset, &cpuset);
assert(r == 0);
cpu_set_t use_cpuset;
CPU_ZERO(&use_cpuset);
r = sched_getaffinity(getpid(), sizeof use_cpuset, &use_cpuset);
assert(r == 0);
assert(memcmp(&cpuset, &use_cpuset, sizeof cpuset) == 0);
}
if (starti == argc) {
dotest(poolsize, nloops);
} else {
for (i = starti; i < argc; i++) {
nloops = atoi(argv[i]);
dotest(poolsize, nloops);
}
}
return 0;
}
......@@ -161,9 +161,10 @@ test_flow_control (int limit, int n, int maxthreads) {
THREADPOOL tp;
int i;
rwfc_init(rwfc, limit);
threadpool_create(&tp, maxthreads);
for (i=0; i<maxthreads; i++)
threadpool_maybe_add(tp, rwfc_worker, &rwfc->workqueue);
toku_thread_pool_create(&tp, maxthreads);
int T = maxthreads;
toku_thread_pool_run(tp, 0, &T, rwfc_worker, &rwfc->workqueue);
assert(T == maxthreads);
sleep(1); // this is here to block the reader on the first deq
for (i=0; i<n; i++) {
WORKITEM wi = new_workitem();
......@@ -179,7 +180,7 @@ test_flow_control (int limit, int n, int maxthreads) {
// toku_os_usleep(random() % 1);
}
workqueue_set_closed(&rwfc->workqueue, 1);
threadpool_destroy(&tp);
toku_thread_pool_destroy(&tp);
rwfc_destroy(rwfc);
}
......
......@@ -11,49 +11,244 @@
#include "toku_pthread.h"
#include "toku_assert.h"
#include "memory.h"
#include "toku_list.h"
#include "threadpool.h"
struct threadpool {
struct toku_thread {
struct toku_thread_pool *pool;
toku_pthread_t tid;
void *(*f)(void *arg);
void *arg;
int doexit;
struct toku_list free_link;
struct toku_list all_link;
toku_pthread_cond_t wait;
};
struct toku_thread_pool {
int max_threads;
int current_threads;
toku_pthread_t tids[];
int cur_threads;
struct toku_list free_threads;
struct toku_list all_threads;
toku_pthread_mutex_t lock;
toku_pthread_cond_t wait_free;
uint64_t gets, get_blocks;
};
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) {
size_t size = sizeof (struct threadpool) + max_threads*sizeof (toku_pthread_t);
struct threadpool *threadpool = toku_malloc(size);
if (threadpool == 0)
return ENOMEM;
threadpool->max_threads = max_threads;
threadpool->current_threads = 0;
int i;
for (i=0; i<max_threads; i++)
memset(&threadpool->tids[i], 0, sizeof(threadpool->tids[i]));
*threadpoolptr = threadpool;
return 0;
static void *toku_thread_run_internal(void *arg);
static void toku_thread_pool_lock(struct toku_thread_pool *pool);
static void toku_thread_pool_unlock(struct toku_thread_pool *pool);
static int
toku_thread_create(struct toku_thread_pool *pool, struct toku_thread **toku_thread_return) {
int r;
struct toku_thread *thread = (struct toku_thread *) toku_malloc(sizeof *thread);
if (thread == NULL) {
r = errno;
} else {
memset(thread, 0, sizeof *thread);
thread->pool = pool;
r = toku_pthread_cond_init(&thread->wait, NULL); invariant(r == 0);
r = toku_pthread_create(&thread->tid, NULL, toku_thread_run_internal, thread); invariant(r == 0);
*toku_thread_return = thread;
}
return r;
}
void threadpool_destroy(THREADPOOL *threadpoolptr) {
struct threadpool *threadpool = *threadpoolptr;
int i;
for (i=0; i<threadpool->current_threads; i++) {
int r; void *ret;
r = toku_pthread_join(threadpool->tids[i], &ret);
assert(r == 0);
void
toku_thread_run(struct toku_thread *thread, void *(*f)(void *arg), void *arg) {
int r;
toku_thread_pool_lock(thread->pool);
thread->f = f;
thread->arg = arg;
toku_thread_pool_unlock(thread->pool);
r = toku_pthread_cond_signal(&thread->wait); invariant(r == 0);
}
static void
toku_thread_destroy(struct toku_thread *thread) {
int r;
void *ret;
r = toku_pthread_join(thread->tid, &ret); invariant(r == 0 && ret == thread);
struct toku_thread_pool *pool = thread->pool;
toku_thread_pool_lock(pool);
toku_list_remove(&thread->free_link);
toku_thread_pool_unlock(pool);
r = toku_pthread_cond_destroy(&thread->wait); invariant(r == 0);
toku_free(thread);
}
static void
toku_thread_ask_exit(struct toku_thread *thread) {
thread->doexit = 1;
int r = toku_pthread_cond_signal(&thread->wait); invariant(r == 0);
}
static void *
toku_thread_run_internal(void *arg) {
struct toku_thread *thread = (struct toku_thread *) arg;
struct toku_thread_pool *pool = thread->pool;
int r;
toku_thread_pool_lock(pool);
while (1) {
r = toku_pthread_cond_signal(&pool->wait_free); invariant(r == 0);
void *(*thread_f)(void *); void *thread_arg; int doexit;
while (1) {
thread_f = thread->f; thread_arg = thread->arg; doexit = thread->doexit; // make copies of these variables to make helgrind happy
if (thread_f || doexit)
break;
r = toku_pthread_cond_wait(&thread->wait, &pool->lock); invariant(r == 0);
}
toku_thread_pool_unlock(pool);
if (thread_f)
(void) thread_f(thread_arg);
if (doexit)
break;
toku_thread_pool_lock(pool);
thread->f = NULL;
toku_list_push(&pool->free_threads, &thread->free_link);
}
*threadpoolptr = 0;
toku_free(threadpool);
return arg;
}
int
toku_thread_pool_create(struct toku_thread_pool **pool_return, int max_threads) {
int r;
struct toku_thread_pool *pool = (struct toku_thread_pool *) toku_malloc(sizeof *pool);
if (pool == NULL) {
r = errno;
} else {
memset(pool, 0, sizeof *pool);
r = toku_pthread_mutex_init(&pool->lock, NULL); invariant(r == 0);
toku_list_init(&pool->free_threads);
toku_list_init(&pool->all_threads);
r = toku_pthread_cond_init(&pool->wait_free, NULL); invariant(r == 0);
pool->cur_threads = 0;
pool->max_threads = max_threads;
*pool_return = pool;
r = 0;
}
return r;
}
static void
toku_thread_pool_lock(struct toku_thread_pool *pool) {
int r = toku_pthread_mutex_lock(&pool->lock); invariant(r == 0);
}
static void
toku_thread_pool_unlock(struct toku_thread_pool *pool) {
int r = toku_pthread_mutex_unlock(&pool->lock); invariant(r == 0);
}
void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) {
if (threadpool->current_threads < threadpool->max_threads) {
int r = toku_pthread_create(&threadpool->tids[threadpool->current_threads], 0, f, arg);
if (r == 0) {
threadpool->current_threads++;
void
toku_thread_pool_destroy(struct toku_thread_pool **poolptr) {
struct toku_thread_pool *pool = *poolptr;
*poolptr = NULL;
// ask the threads to exit
toku_thread_pool_lock(pool);
struct toku_list *list;
for (list = pool->all_threads.next; list != &pool->all_threads; list = list->next) {
struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link);
toku_thread_ask_exit(thread);
}
toku_thread_pool_unlock(pool);
// wait for all of the threads to exit
while (!toku_list_empty(&pool->all_threads)) {
list = toku_list_pop_head(&pool->all_threads);
struct toku_thread *thread = toku_list_struct(list, struct toku_thread, all_link);
toku_thread_destroy(thread);
pool->cur_threads -= 1;
}
invariant(pool->cur_threads == 0);
// cleanup
int r;
r = toku_pthread_cond_destroy(&pool->wait_free); invariant(r == 0);
r = toku_pthread_mutex_destroy(&pool->lock); invariant(r == 0);
toku_free(pool);
}
static int
toku_thread_pool_add(struct toku_thread_pool *pool) {
struct toku_thread *thread = NULL;
int r = toku_thread_create(pool, &thread);
if (r == 0) {
pool->cur_threads += 1;
toku_list_push(&pool->all_threads, &thread->all_link);
toku_list_push(&pool->free_threads, &thread->free_link);
r = toku_pthread_cond_signal(&pool->wait_free); invariant(r == 0);
}
return r;
}
// get one thread from the free pool.
static int
toku_thread_pool_get_one(struct toku_thread_pool *pool, int dowait, struct toku_thread **toku_thread_return) {
int r = 0;
toku_thread_pool_lock(pool);
pool->gets++;
while (1) {
if (!toku_list_empty(&pool->free_threads))
break;
if (pool->max_threads == 0 || pool->cur_threads < pool->max_threads)
(void) toku_thread_pool_add(pool);
if (toku_list_empty(&pool->free_threads) && !dowait) {
r = EWOULDBLOCK;
break;
}
pool->get_blocks++;
r = toku_pthread_cond_wait(&pool->wait_free, &pool->lock); invariant(r == 0);
}
if (r == 0) {
struct toku_list *list = toku_list_pop_head(&pool->free_threads);
struct toku_thread *thread = toku_list_struct(list, struct toku_thread, free_link);
*toku_thread_return = thread;
} else
*toku_thread_return = NULL;
toku_thread_pool_unlock(pool);
return r;
}
int
toku_thread_pool_get(struct toku_thread_pool *pool, int dowait, int *nthreads, struct toku_thread **toku_thread_return) {
int r = 0;
int n = *nthreads;
int i;
for (i = 0; i < n; i++) {
r = toku_thread_pool_get_one(pool, dowait, &toku_thread_return[i]);
if (r != 0)
break;
}
*nthreads = i;
return r;
}
int
toku_thread_pool_run(struct toku_thread_pool *pool, int dowait, int *nthreads, void *(*f)(void *arg), void *arg) {
int n = *nthreads;
struct toku_thread *tids[n];
int r = toku_thread_pool_get(pool, dowait, nthreads, tids);
if (r == 0 || r == EWOULDBLOCK) {
n = *nthreads;
for (int i = 0; i < n; i++)
toku_thread_run(tids[i], f, arg);
}
return r;
}
void
toku_thread_pool_print(struct toku_thread_pool *pool, FILE *out) {
fprintf(out, "%s:%d %p %llu %llu\n", __FILE__, __LINE__, pool, (long long unsigned) pool->gets, (long long unsigned) pool->get_blocks);
}
int threadpool_get_current_threads(THREADPOOL threadpool) {
return threadpool->current_threads;
int
toku_thread_pool_get_current_threads(struct toku_thread_pool *pool) {
return pool->cur_threads;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef THREADPOOL_H
#define THREADPOOL_H
#ifndef TOKU_THREADPOOL_H
#define TOKU_THREADPOOL_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
#include "c_dialects.h"
C_BEGIN
// A threadpool is a limited set of threads that can be used to apply a
// function to work contained in a work queue. The work queue is outside
// of the scope of the threadpool; the threadpool merely provides
// mechanisms to grow the number of threads in the threadpool on demand.
// A toku_thread is toku_pthread that can be cached.
struct toku_thread;
typedef struct threadpool *THREADPOOL;
// Run a function f on a thread
// This function setups up the thread to run function f with argument arg and then wakes up
// the thread to run it.
void toku_thread_run(struct toku_thread *thread, void *(*f)(void *arg), void *arg);
// Create a new threadpool
// Effects: a new threadpool is allocated and initialized. the number of
// threads in the threadpool is limited to max_threads. initially, there
// are no threads in the pool.
// Returns: if there are no errors, the threadpool is set and zero is returned.
// Otherwise, an error number is returned.
// A toku_thread_pool is a pool of toku_threads. These threads can be allocated from the pool
// and can run an arbitrary function.
struct toku_thread_pool;
typedef struct toku_thread_pool *THREADPOOL;
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads);
// Create a new threadpool
// Effects: a new threadpool is allocated and initialized. the number of threads in the threadpool is limited to max_threads.
// If max_threads == 0 then there is no limit on the number of threads in the pool.
// Initially, there are no threads in the pool. Threads are allocated by the _get or _run functions.
// Returns: if there are no errors, the threadpool is set and zero is returned. Otherwise, an error number is returned.
int toku_thread_pool_create(struct toku_thread_pool **threadpoolptr, int max_threads);
// Destroy a threadpool
// Effects: the calling thread joins with all of the threads in the threadpool.
// Effects: the threadpool memory is freed.
// Returns: the threadpool is set to null.
void toku_thread_pool_destroy(struct toku_thread_pool **threadpoolptr);
void threadpool_destroy(THREADPOOL *threadpoolptr);
// Maybe add a thread to the threadpool.
// Effects: the number of threads in the threadpool is expanded by 1 as long
// as the current number of threads in the threadpool is less than the max
// and there are no idle threads.
// Effects: if the thread is create, it calls the function f with argument arg
// Expects: external serialization on this function; only one thread may
// execute this function
// Get the current number of threads in the thread pool
int toku_thread_pool_get_current_threads(struct toku_thread_pool *pool);
void threadpool_maybe_add(THREADPOOL theadpool, void *(*f)(void *), void *arg);
// Get one or more threads from the thread pool
// dowait indicates whether or not the caller blocks waiting for threads to free up
// nthreads on input determines the number of threads that are wanted
// nthreads on output indicates the number of threads that were allocated
// toku_thread_return on input supplies an array of thread pointers (all NULL). This function returns the threads
// that were allocated in the array.
int toku_thread_pool_get(struct toku_thread_pool *pool, int dowait, int *nthreads, struct toku_thread **toku_thread_return);
// get the current number of threads
// Run a function f on one or more threads allocated from the thread pool
int toku_thread_pool_run(struct toku_thread_pool *pool, int dowait, int *nthreads, void *(*f)(void *arg), void *arg);
int threadpool_get_current_threads(THREADPOOL);
// Print the state of the thread pool
void toku_thread_pool_print(struct toku_thread_pool *pool, FILE *out);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
C_END
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef _TOKU_WORKER_H
#define _TOKU_WORKER_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// initialize the work queue and worker
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// destroy the work queue and worker
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// this is the thread function for the worker threads in the worker thread
// pool. the arg is a pointer to the work queue that feeds work to the
// workers.
void *toku_worker(void *arg);
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
......@@ -12,7 +12,6 @@
#include "toku_pthread.h"
#include "workqueue.h"
#include "threadpool.h"
#include "toku_worker.h"
// Create fixed number of worker threads, all waiting on a single queue
// of work items (WORKQUEUE).
......@@ -21,15 +20,13 @@ void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr) {
workqueue_init(wq);
int nprocs = toku_os_get_number_active_processors();
int nthreads = nprocs*2;
threadpool_create(tpptr, nthreads);
int i;
for (i=0; i<nthreads; i++)
threadpool_maybe_add(*tpptr, toku_worker, wq);
toku_thread_pool_create(tpptr, nthreads);
toku_thread_pool_run(*tpptr, 0, &nthreads, toku_worker, wq);
}
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr) {
workqueue_set_closed(wq, 1); // close the work queue and [see "A" in toku_worker()]
threadpool_destroy(tpptr); // wait for all of the worker threads to exit
toku_thread_pool_destroy(tpptr); // wait for all of the worker threads to exit
workqueue_destroy(wq);
}
......
......@@ -9,10 +9,9 @@
#include <errno.h>
#include "toku_assert.h"
#include "toku_pthread.h"
#include "c_dialects.h"
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
C_BEGIN
struct workitem;
......@@ -203,9 +202,20 @@ static int workqueue_n_in_queue (WORKQUEUE wq, int dolock) {
return r;
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#include "threadpool.h"
// initialize the work queue and worker
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// destroy the work queue and worker
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// this is the thread function for the worker threads in the worker thread
// pool. the arg is a pointer to the work queue that feeds work to the
// workers.
void *toku_worker(void *arg);
C_END
#endif
......@@ -8,55 +8,67 @@
#include <toku_list.h>
#include <toku_pthread.h>
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
#include "c_dialects.h"
C_BEGIN
// the work struct is the base class for work to be done by some threads
// The work struct is the base class for work to be done by some threads
struct work {
struct toku_list next;
};
// the workset struct contains the set of work to be done by some threads
// the lock protects the work list
// The workset struct contains the set of work to be done by some threads
struct workset {
toku_pthread_mutex_t lock;
struct toku_list worklist;
struct toku_list worklist; // a list of work
int refs; // number of workers that have a reference on the workset
toku_pthread_cond_t worker_wait; // a condition variable used to wait for all of the worker to release their reference on the workset
};
static inline void workset_init(struct workset *ws) {
int r = toku_pthread_mutex_init(&ws->lock, NULL); assert(r == 0);
static inline void
workset_init(struct workset *ws) {
int r;
r = toku_pthread_mutex_init(&ws->lock, NULL); invariant(r == 0);
toku_list_init(&ws->worklist);
ws->refs = 1; // the calling thread gets a reference
r = toku_pthread_cond_init(&ws->worker_wait, NULL); invariant(r == 0);
}
static inline void workset_destroy(struct workset *ws) {
assert(toku_list_empty(&ws->worklist));
int r = toku_pthread_mutex_destroy(&ws->lock); assert(r == 0);
static inline void
workset_destroy(struct workset *ws) {
invariant(toku_list_empty(&ws->worklist));
int r;
r = toku_pthread_cond_destroy(&ws->worker_wait); invariant(r == 0);
r = toku_pthread_mutex_destroy(&ws->lock); invariant(r == 0);
}
static inline void workset_lock(struct workset *ws) {
int r = toku_pthread_mutex_lock(&ws->lock); assert(r == 0);
static inline void
workset_lock(struct workset *ws) {
int r = toku_pthread_mutex_lock(&ws->lock); invariant(r == 0);
}
static inline void workset_unlock(struct workset *ws) {
int r = toku_pthread_mutex_unlock(&ws->lock); assert(r == 0);
static inline void
workset_unlock(struct workset *ws) {
int r = toku_pthread_mutex_unlock(&ws->lock); invariant(r == 0);
}
// put work in the workset
static inline void workset_put(struct workset *ws, struct work *w) {
workset_lock(ws);
// Put work in the workset. Assume the workset is already locked.
static inline void
workset_put_locked(struct workset *ws, struct work *w) {
toku_list_push(&ws->worklist, &w->next);
workset_unlock(ws);
}
// put work in the workset. assume already locked.
static inline void workset_put_locked(struct workset *ws, struct work *w) {
toku_list_push(&ws->worklist, &w->next);
// Put work in the workset
static inline void
workset_put(struct workset *ws, struct work *w) {
workset_lock(ws);
workset_put_locked(ws, w);
workset_unlock(ws);
}
// get work from the workset
static inline struct work *workset_get(struct workset *ws) {
// Get work from the workset
static inline struct work *
workset_get(struct workset *ws) {
workset_lock(ws);
struct work *w = NULL;
if (!toku_list_empty(&ws->worklist)) {
......@@ -67,30 +79,34 @@ static inline struct work *workset_get(struct workset *ws) {
return w;
}
// create a set of threads to run a given function
// tids will contain the thread id's of the created threads
// *ntids on input contains the number of threads requested, on output contains the number of threads created
static inline void threadset_create(toku_pthread_t tids[], int *ntids, void *(*f)(void *arg), void *arg) {
int n = *ntids;
int i;
for (i = 0; i < n; i++) {
int r = toku_pthread_create(&tids[i], NULL, f, arg);
if (r != 0)
break;
// Add references to the workset
static inline void
workset_add_ref(struct workset *ws, int refs) {
workset_lock(ws);
ws->refs += refs;
workset_unlock(ws);
}
// Release a reference on the workset
static inline void
workset_release_ref(struct workset *ws) {
workset_lock(ws);
if (--ws->refs == 0) {
int r = toku_pthread_cond_broadcast(&ws->worker_wait); invariant(r == 0);
}
*ntids = i;
workset_unlock(ws);
}
// join with a set of threads
static inline void threadset_join(toku_pthread_t tids[], int ntids) {
for (int i = 0; i < ntids; i++) {
void *ret;
int r = toku_pthread_join(tids[i], &ret); assert(r == 0);
// Wait until all of the worker threads have released their reference on the workset
static inline void
workset_join(struct workset *ws) {
workset_lock(ws);
while (ws->refs != 0) {
int r = toku_pthread_cond_wait(&ws->worker_wait, &ws->lock); invariant(r == 0);
}
workset_unlock(ws);
}
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
C_END
#endif
......@@ -6,7 +6,7 @@
#if defined(__cplusplus) || defined(__cilkplusplus)
#define C_BEGIN extern "C" {
#define C_END };
#define C_END }
#else
#define C_BEGIN
#define C_END
......@@ -15,7 +15,7 @@
#if defined(__cilkplusplus)
#define CILK_BEGIN extern "Cilk++" {
#define CILK_END };
#define CILK_END }
#else
#define CILK_BEGIN
#define CILK_END
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment