Commit 7ab32cad authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge parallel block compressor from 3.0.4 to main closes[t:2371]

git-svn-id: file:///svn/toku/tokudb@18038 c7de825b-a66e-492c-adef-691d508d4ae1
parent bb33112e
...@@ -493,6 +493,55 @@ enum { uncompressed_magic_len_10 = (8 // tokuleaf or tokunode ...@@ -493,6 +493,55 @@ enum { uncompressed_magic_len_10 = (8 // tokuleaf or tokunode
) )
}; };
#define DO_DECOMPRESS_WORKER 1
struct decompress_work_10 {
toku_pthread_t id;
void *compress_ptr;
void *uncompress_ptr;
u_int32_t compress_size;
u_int32_t uncompress_size;
};
// initialize the decompression work
static void init_decompress_work_10(struct decompress_work_10 *w,
void *compress_ptr, u_int32_t compress_size,
void *uncompress_ptr, u_int32_t uncompress_size) {
memset(&w->id, 0, sizeof(w->id));
w->compress_ptr = compress_ptr; w->compress_size = compress_size;
w->uncompress_ptr = uncompress_ptr; w->uncompress_size = uncompress_size;
}
// do the decompression work
static void do_decompress_work_10(struct decompress_work_10 *w) {
uLongf destlen = w->uncompress_size;
int r = uncompress(w->uncompress_ptr, &destlen,
w->compress_ptr, w->compress_size);
assert(destlen==w->uncompress_size);
assert(r==Z_OK);
}
#if DO_DECOMPRESS_WORKER
static void *decompress_worker_10(void *);
static void start_decompress_work_10(struct decompress_work_10 *w) {
int r = toku_pthread_create(&w->id, NULL, decompress_worker_10, w); assert(r == 0);
}
static void wait_decompress_work_10(struct decompress_work_10 *w) {
void *ret;
int r = toku_pthread_join(w->id, &ret); assert(r == 0);
}
static void *decompress_worker_10(void *arg) {
struct decompress_work_10 *w = (struct decompress_work_10 *) arg;
do_decompress_work_10(w);
return arg;
}
#endif
static int static int
decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) { decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) {
int r; int r;
...@@ -534,24 +583,24 @@ decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf ...@@ -534,24 +583,24 @@ decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf
// decompress the sub blocks // decompress the sub blocks
unsigned char *uncompressed_data = rb->buf+uncompressed_magic_len_10; unsigned char *uncompressed_data = rb->buf+uncompressed_magic_len_10;
struct decompress_work decompress_work[n_sub_blocks]; struct decompress_work_10 decompress_work[n_sub_blocks];
for (i=0; i<n_sub_blocks; i++) { for (i=0; i<n_sub_blocks; i++) {
init_decompress_work(&decompress_work[i], compressed_data, sub_block_sizes[i].compressed_size, uncompressed_data, sub_block_sizes[i].uncompressed_size); init_decompress_work_10(&decompress_work[i], compressed_data, sub_block_sizes[i].compressed_size, uncompressed_data, sub_block_sizes[i].uncompressed_size);
if (i>0) { if (i>0) {
#if DO_DECOMPRESS_WORKER #if DO_DECOMPRESS_WORKER
start_decompress_work(&decompress_work[i]); start_decompress_work_10(&decompress_work[i]);
#else #else
do_decompress_work(&decompress_work[i]); do_decompress_work_10(&decompress_work[i]);
#endif #endif
} }
uncompressed_data += sub_block_sizes[i].uncompressed_size; uncompressed_data += sub_block_sizes[i].uncompressed_size;
compressed_data += sub_block_sizes[i].compressed_size; compressed_data += sub_block_sizes[i].compressed_size;
} }
do_decompress_work(&decompress_work[0]); do_decompress_work_10(&decompress_work[0]);
#if DO_DECOMPRESS_WORKER #if DO_DECOMPRESS_WORKER
for (i=1; i<n_sub_blocks; i++) for (i=1; i<n_sub_blocks; i++)
wait_decompress_work(&decompress_work[i]); wait_decompress_work_10(&decompress_work[i]);
#endif #endif
toku_trace("decompress done"); toku_trace("decompress done");
......
...@@ -6,6 +6,31 @@ ...@@ -6,6 +6,31 @@
#include "includes.h" #include "includes.h"
#include "backwards_10.h" #include "backwards_10.h"
static int num_cores = 0;
int
toku_brt_serialize_init(void) {
num_cores = toku_os_get_number_processors();
return 0;
}
int
toku_brt_serialize_destroy(void) {
return 0;
}
#define DO_TOKU_TRACE 0
#if DO_TOKU_TRACE
static int toku_trace_fd = -1;
static inline void do_toku_trace(const char *cp, int len) {
write(toku_trace_fd, cp, len);
}
#define toku_trace(a) do_toku_trace(a, strlen(a))
#else
#define toku_trace(a)
#endif
// NOTE: The backwards compatability functions are in a file that is included at the END of this file. // NOTE: The backwards compatability functions are in a file that is included at the END of this file.
static int deserialize_brtheader_10 (int fd, struct rbuf *rb, struct brt_header **brth); static int deserialize_brtheader_10 (int fd, struct rbuf *rb, struct brt_header **brth);
static int upgrade_brtheader_10_11 (struct brt_header **brth_10, struct brt_header **brth_11); static int upgrade_brtheader_10_11 (struct brt_header **brth_10, struct brt_header **brth_11);
...@@ -253,8 +278,9 @@ enum { ...@@ -253,8 +278,9 @@ enum {
// compression header sub block sizes // compression header sub block sizes
struct sub_block_sizes { struct sub_block_sizes {
u_int32_t compressed_size;
u_int32_t uncompressed_size; u_int32_t uncompressed_size;
u_int32_t compressed_size; // real compressed size
u_int32_t compressed_size_bound; // estimated compressed size
}; };
// round up n // round up n
...@@ -264,22 +290,22 @@ static inline int roundup2(int n, int alignment) { ...@@ -264,22 +290,22 @@ static inline int roundup2(int n, int alignment) {
// choose the number of sub blocks such that the sub block size // choose the number of sub blocks such that the sub block size
// is around 1 meg. put an upper bound on the number of sub blocks. // is around 1 meg. put an upper bound on the number of sub blocks.
static int get_sub_block_sizes(int totalsize, int maxn, struct sub_block_sizes sizes[]) { static int choose_sub_block_sizes(int total_size, int maxn, struct sub_block_sizes sizes[]) {
const int meg = 1024*1024; const int target_sub_block_size = 512*1024;
const int alignment = 256; const int alignment = 256;
int n, subsize; int n, subsize;
n = totalsize/meg; n = total_size/target_sub_block_size;
if (n == 0) { if (n == 0) {
n = 1; n = 1;
subsize = totalsize; subsize = total_size;
} else { } else {
if (n > maxn) if (n > maxn)
n = maxn; n = maxn;
subsize = roundup2(totalsize/n, alignment); subsize = roundup2(total_size/n, alignment);
while (n < maxn && subsize >= meg + meg/8) { while (n < maxn && subsize >= target_sub_block_size + target_sub_block_size/8) {
n++; n++;
subsize = roundup2(totalsize/n, alignment); subsize = roundup2(total_size/n, alignment);
} }
} }
...@@ -287,12 +313,14 @@ static int get_sub_block_sizes(int totalsize, int maxn, struct sub_block_sizes s ...@@ -287,12 +313,14 @@ static int get_sub_block_sizes(int totalsize, int maxn, struct sub_block_sizes s
int i; int i;
for (i=0; i<n-1; i++) { for (i=0; i<n-1; i++) {
sizes[i].uncompressed_size = subsize; sizes[i].uncompressed_size = subsize;
sizes[i].compressed_size = compressBound(subsize); sizes[i].compressed_size_bound = compressBound(subsize);
totalsize -= subsize; sizes[i].compressed_size = 0;
} total_size -= subsize;
if (i == 0 || totalsize > 0) { }
sizes[i].uncompressed_size = totalsize; if (i == 0 || total_size > 0) {
sizes[i].compressed_size = compressBound(totalsize); sizes[i].uncompressed_size = total_size;
sizes[i].compressed_size_bound = compressBound(total_size);
sizes[i].compressed_size = 0;
i++; i++;
} }
...@@ -301,15 +329,15 @@ static int get_sub_block_sizes(int totalsize, int maxn, struct sub_block_sizes s ...@@ -301,15 +329,15 @@ static int get_sub_block_sizes(int totalsize, int maxn, struct sub_block_sizes s
// get the size of the compression header // get the size of the compression header
static size_t get_compression_header_size(int UU(layout_version), int n) { static size_t get_compression_header_size(int UU(layout_version), int n) {
return sizeof (u_int32_t) + n * sizeof (struct sub_block_sizes); return sizeof (u_int32_t) + (n * 2 * sizeof (u_int32_t));
} }
// get the sum of the sub block compressed sizes // get the sum of the sub block compressed sizes
static size_t get_sum_compressed_size(int n, struct sub_block_sizes sizes[]) { static size_t get_sum_compressed_size_bound(int n, struct sub_block_sizes sizes[]) {
int i; int i;
size_t compressed_size = 0; size_t compressed_size = 0;
for (i=0; i<n; i++) for (i=0; i<n; i++)
compressed_size += sizes[i].compressed_size; compressed_size += sizes[i].compressed_size_bound;
return compressed_size; return compressed_size;
} }
...@@ -323,11 +351,54 @@ static size_t get_sum_uncompressed_size(int n, struct sub_block_sizes sizes[]) { ...@@ -323,11 +351,54 @@ static size_t get_sum_uncompressed_size(int n, struct sub_block_sizes sizes[]) {
return uncompressed_size; return uncompressed_size;
} }
static inline void ignore_int (int UU(ignore_me)) {}
static void serialize_descriptor_contents_to_wbuf(struct wbuf *wb, struct descriptor *desc); static void serialize_descriptor_contents_to_wbuf(struct wbuf *wb, struct descriptor *desc);
int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h, int n_workitems, int n_threads, BOOL for_checkpoint) { #include "workset.h"
struct compress_work {
struct work base;
struct sub_block_sizes *sub_block_sizes;
void *from;
void *to;
};
static void
compress_work_init(struct compress_work *w, void *from, void *to, struct sub_block_sizes *sub_block_sizes) {
assert(from); assert(to);
w->from = from;
w->to = to;
w->sub_block_sizes = sub_block_sizes;
}
static void
compress_sub_block(struct compress_work *w) {
struct sub_block_sizes *sub_block_sizes = w->sub_block_sizes;
Bytef *uncompressed_ptr = (Bytef *) w->from;
Bytef *compressed_ptr = (Bytef *) w->to;
uLongf uncompressed_len = sub_block_sizes->uncompressed_size;
uLongf real_compressed_len = sub_block_sizes->compressed_size_bound;
int compression_level = 5;
int r = compress2((Bytef*)compressed_ptr, &real_compressed_len,
(Bytef*)uncompressed_ptr, uncompressed_len,
compression_level);
assert(r == Z_OK);
sub_block_sizes->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size
}
static void *
compress_worker(void *arg) {
struct workset *ws = (struct workset *) arg;
while (1) {
struct compress_work *w = (struct compress_work *) workset_get(ws);
if (w == NULL)
break;
compress_sub_block(w);
}
return arg;
}
int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h, int UU(n_workitems), int UU(n_threads), BOOL for_checkpoint) {
struct wbuf w; struct wbuf w;
int i; int i;
...@@ -449,7 +520,7 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -449,7 +520,7 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
// impose an upper bound on the number of sub blocks. // impose an upper bound on the number of sub blocks.
int max_sub_blocks = 4; int max_sub_blocks = 4;
struct sub_block_sizes sub_block_sizes[max_sub_blocks]; struct sub_block_sizes sub_block_sizes[max_sub_blocks];
int n_sub_blocks = get_sub_block_sizes(calculated_size-uncompressed_magic_len, max_sub_blocks, sub_block_sizes); int n_sub_blocks = choose_sub_block_sizes(calculated_size-uncompressed_magic_len, max_sub_blocks, sub_block_sizes);
assert(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks); assert(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks);
if (0 && n_sub_blocks != 1) { if (0 && n_sub_blocks != 1) {
printf("%s:%d %d:", __FUNCTION__, __LINE__, n_sub_blocks); printf("%s:%d %d:", __FUNCTION__, __LINE__, n_sub_blocks);
...@@ -458,7 +529,7 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -458,7 +529,7 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
printf("\n"); printf("\n");
} }
size_t compressed_len = get_sum_compressed_size(n_sub_blocks, sub_block_sizes); size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block_sizes);
size_t compression_header_len = get_compression_header_size(node->layout_version, n_sub_blocks); size_t compression_header_len = get_compression_header_size(node->layout_version, n_sub_blocks);
char *MALLOC_N(compressed_len+uncompressed_magic_len+compression_header_len, compressed_buf); char *MALLOC_N(compressed_len+uncompressed_magic_len+compression_header_len, compressed_buf);
memcpy(compressed_buf, buf, uncompressed_magic_len); memcpy(compressed_buf, buf, uncompressed_magic_len);
...@@ -470,34 +541,42 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -470,34 +541,42 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
char *uncompressed_ptr = buf + uncompressed_magic_len; char *uncompressed_ptr = buf + uncompressed_magic_len;
char *compressed_base_ptr = compressed_buf + uncompressed_magic_len + compression_header_len; char *compressed_base_ptr = compressed_buf + uncompressed_magic_len + compression_header_len;
char *compressed_ptr = compressed_base_ptr; char *compressed_ptr = compressed_base_ptr;
for (i=0; i<n_sub_blocks; i++) {
uLongf uncompressed_len = sub_block_sizes[i].uncompressed_size;
uLongf real_compressed_len = sub_block_sizes[i].compressed_size; int T = num_cores; // T = min(num_cores, n_sub_blocks) - 1
if (T > n_sub_blocks)
T = n_sub_blocks;
if (T > 0)
T = T - 1; // threads in addition to the running thread
{ struct workset ws;
#ifdef ADAPTIVE_COMPRESSION workset_init(&ws);
// Marketing has expressed concern that this algorithm will make customers go crazy.
int compression_level; struct compress_work work[n_sub_blocks];
if (n_workitems <= n_threads) compression_level = 5; workset_lock(&ws);
else if (n_workitems <= 2*n_threads) compression_level = 4; for (i = 0; i < n_sub_blocks; i++) {
else if (n_workitems <= 3*n_threads) compression_level = 3; compress_work_init(&work[i], uncompressed_ptr, compressed_ptr, &sub_block_sizes[i]);
else if (n_workitems <= 4*n_threads) compression_level = 2; uncompressed_ptr += sub_block_sizes[i].uncompressed_size;
else compression_level = 1; compressed_ptr += sub_block_sizes[i].compressed_size_bound;
#else workset_put_locked(&ws, &work[i].base);
int compression_level = 5;
ignore_int(n_workitems); ignore_int(n_threads);
#endif
//printf("compress(%d) n_workitems=%d n_threads=%d\n", compression_level, n_workitems, n_threads);
int r = compress2((Bytef*)compressed_ptr, &real_compressed_len,
(Bytef*)uncompressed_ptr, uncompressed_len,
compression_level);
assert(r==Z_OK);
sub_block_sizes[i].compressed_size = real_compressed_len; // replace the compressed size estimate with the real size
uncompressed_ptr += uncompressed_len; // update the uncompressed and compressed buffer pointers
compressed_ptr += real_compressed_len;
} }
workset_unlock(&ws);
// compress the sub-blocks
if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks);
toku_pthread_t tids[T];
threadset_create(tids, &T, compress_worker, &ws);
compress_worker(&ws);
// wait for all of the work to complete
threadset_join(tids, T);
// squeeze out the holes not used by the compress bound
compressed_ptr = compressed_base_ptr + sub_block_sizes[0].compressed_size;
for (i = 1; i < n_sub_blocks; i++) {
memmove(compressed_ptr, work[i].to, sub_block_sizes[i].compressed_size);
compressed_ptr += sub_block_sizes[i].compressed_size;
} }
compressed_len = compressed_ptr - compressed_base_ptr; compressed_len = compressed_ptr - compressed_base_ptr;
if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-uncompressed_magic_len, (uint64_t) compressed_len); if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-uncompressed_magic_len, (uint64_t) compressed_len);
...@@ -539,66 +618,6 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -539,66 +618,6 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
return 0; return 0;
} }
#define DO_DECOMPRESS_WORKER 1
struct decompress_work {
toku_pthread_t id;
void *compress_ptr;
void *uncompress_ptr;
u_int32_t compress_size;
u_int32_t uncompress_size;
};
// initialize the decompression work
static void init_decompress_work(struct decompress_work *w,
void *compress_ptr, u_int32_t compress_size,
void *uncompress_ptr, u_int32_t uncompress_size) {
memset(&w->id, 0, sizeof(w->id));
w->compress_ptr = compress_ptr; w->compress_size = compress_size;
w->uncompress_ptr = uncompress_ptr; w->uncompress_size = uncompress_size;
}
// do the decompression work
static void do_decompress_work(struct decompress_work *w) {
uLongf destlen = w->uncompress_size;
int r = uncompress(w->uncompress_ptr, &destlen,
w->compress_ptr, w->compress_size);
assert(destlen==w->uncompress_size);
assert(r==Z_OK);
}
#if DO_DECOMPRESS_WORKER
static void *decompress_worker(void *);
static void start_decompress_work(struct decompress_work *w) {
int r = toku_pthread_create(&w->id, NULL, decompress_worker, w); assert(r == 0);
}
static void wait_decompress_work(struct decompress_work *w) {
void *ret;
int r = toku_pthread_join(w->id, &ret); assert(r == 0);
}
static void *decompress_worker(void *arg) {
struct decompress_work *w = (struct decompress_work *) arg;
do_decompress_work(w);
return arg;
}
#endif
#define DO_TOKU_TRACE 0
#if DO_TOKU_TRACE
const int toku_trace_fd = -1;
static inline void do_toku_trace(const char *cp, int len) {
toku_os_write(toku_trace_fd, cp, len);
}
#define toku_trace(a) do_toku_trace(a, strlen(a))
#else
#define toku_trace(a)
#endif
static void deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor *desc, BOOL temporary); static void deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor *desc, BOOL temporary);
static int static int
...@@ -859,6 +878,50 @@ verify_decompressed_brtnode_checksum (struct rbuf *rb) { ...@@ -859,6 +878,50 @@ verify_decompressed_brtnode_checksum (struct rbuf *rb) {
return r; return r;
} }
#include "workset.h"
struct decompress_work {
struct work base;
void *compress_ptr;
void *uncompress_ptr;
u_int32_t compress_size;
u_int32_t uncompress_size;
};
// initialize the decompression work
static void
decompress_work_init(struct decompress_work *dw,
void *compress_ptr, u_int32_t compress_size,
void *uncompress_ptr, u_int32_t uncompress_size) {
dw->compress_ptr = compress_ptr;
dw->compress_size = compress_size;
dw->uncompress_ptr = uncompress_ptr;
dw->uncompress_size = uncompress_size;
}
// decompress one block
static void
decompress_block(struct decompress_work *dw) {
if (0) printf("%s:%d %x %p\n", __FUNCTION__, __LINE__, (int) toku_pthread_self(), dw);
uLongf destlen = dw->uncompress_size;
int r = uncompress(dw->uncompress_ptr, &destlen, dw->compress_ptr, dw->compress_size);
assert(destlen == dw->uncompress_size);
assert(r==Z_OK);
}
// decompress blocks until there is no more work to do
static void *
decompress_worker(void *arg) {
struct workset *ws = (struct workset *) arg;
while (1) {
struct decompress_work *dw = (struct decompress_work *) workset_get(ws);
if (dw == NULL)
break;
decompress_block(dw);
}
return arg;
}
static int static int
decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) { decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) {
toku_trace("decompress"); toku_trace("decompress");
...@@ -899,27 +962,39 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb ...@@ -899,27 +962,39 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb
// construct the uncompressed block from the header and compressed sub blocks // construct the uncompressed block from the header and compressed sub blocks
memcpy(rb->buf, raw_block, uncompressed_magic_len); memcpy(rb->buf, raw_block, uncompressed_magic_len);
// decompress the sub blocks // init the decompression work set
struct workset ws;
workset_init(&ws);
// compute the number of additional threads needed for decompressing this node
int T = n_sub_blocks; // T = min(#cores, #blocks) - 1
if (T > num_cores)
T = num_cores;
if (T > 0)
T = T - 1; // threads in addition to the running thread
if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T);
toku_pthread_t tids[T];
// initialize the decompression work and add to the work set
unsigned char *uncompressed_data = rb->buf+uncompressed_magic_len; unsigned char *uncompressed_data = rb->buf+uncompressed_magic_len;
struct decompress_work decompress_work[n_sub_blocks]; struct decompress_work decompress_work[n_sub_blocks];
workset_lock(&ws);
for (i=0; i<n_sub_blocks; i++) { for (i=0; i<n_sub_blocks; i++) {
init_decompress_work(&decompress_work[i], compressed_data, sub_block_sizes[i].compressed_size, uncompressed_data, sub_block_sizes[i].uncompressed_size); decompress_work_init(&decompress_work[i], compressed_data, sub_block_sizes[i].compressed_size, uncompressed_data, sub_block_sizes[i].uncompressed_size);
if (i>0) {
#if DO_DECOMPRESS_WORKER
start_decompress_work(&decompress_work[i]);
#else
do_decompress_work(&decompress_work[i]);
#endif
}
uncompressed_data += sub_block_sizes[i].uncompressed_size; uncompressed_data += sub_block_sizes[i].uncompressed_size;
compressed_data += sub_block_sizes[i].compressed_size; compressed_data += sub_block_sizes[i].compressed_size;
workset_put_locked(&ws, &decompress_work[i].base);
} }
do_decompress_work(&decompress_work[0]); workset_unlock(&ws);
#if DO_DECOMPRESS_WORKER
for (i=1; i<n_sub_blocks; i++) // do the decompression work
wait_decompress_work(&decompress_work[i]); threadset_create(tids, &T, decompress_worker, &ws);
#endif decompress_worker(&ws);
// cleanup
threadset_join(tids, T);
workset_destroy(&ws);
toku_trace("decompress done"); toku_trace("decompress done");
if (0) printf("First 4 bytes of uncompressed data are %02x%02x%02x%02x\n", if (0) printf("First 4 bytes of uncompressed data are %02x%02x%02x%02x\n",
...@@ -990,6 +1065,8 @@ toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BR ...@@ -990,6 +1065,8 @@ toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BR
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b); if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
if (h->panic) return h->panic; if (h->panic) return h->panic;
toku_trace("deserial start");
// get the file offset and block size for the block // get the file offset and block size for the block
DISKOFF offset, size; DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size); toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size);
......
...@@ -5175,11 +5175,15 @@ int toku_brt_init(void (*ydb_lock_callback)(void), void (*ydb_unlock_callback)(v ...@@ -5175,11 +5175,15 @@ int toku_brt_init(void (*ydb_lock_callback)(void), void (*ydb_unlock_callback)(v
r = toku_brt_lock_init(); r = toku_brt_lock_init();
if (r==0) if (r==0)
r = toku_checkpoint_init(ydb_lock_callback, ydb_unlock_callback); r = toku_checkpoint_init(ydb_lock_callback, ydb_unlock_callback);
if (r == 0)
r = toku_brt_serialize_init();
return r; return r;
} }
int toku_brt_destroy(void) { int toku_brt_destroy(void) {
int r = 0; int r = 0;
if (r == 0)
r = toku_brt_serialize_destroy();
if (r==0) if (r==0)
r = toku_brt_lock_destroy(); r = toku_brt_lock_destroy();
if (r==0) if (r==0)
......
...@@ -189,6 +189,8 @@ int toku_brt_init(void (*ydb_lock_callback)(void), void (*ydb_unlock_callback)(v ...@@ -189,6 +189,8 @@ int toku_brt_init(void (*ydb_lock_callback)(void), void (*ydb_unlock_callback)(v
int toku_brt_destroy(void); int toku_brt_destroy(void);
int toku_pwrite_lock_init(void); int toku_pwrite_lock_init(void);
int toku_pwrite_lock_destroy(void); int toku_pwrite_lock_destroy(void);
int toku_brt_serialize_init(void);
int toku_brt_serialize_destroy(void);
void toku_maybe_truncate_cachefile (CACHEFILE cf, u_int64_t size_used); void toku_maybe_truncate_cachefile (CACHEFILE cf, u_int64_t size_used);
// Effect: truncate file if overallocated by at least 32MiB // Effect: truncate file if overallocated by at least 32MiB
......
...@@ -1325,6 +1325,7 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful ...@@ -1325,6 +1325,7 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
} }
if (do_wait_time) if (do_wait_time)
t0 = get_tnow(); t0 = get_tnow();
if (p->checkpoint_pending) { if (p->checkpoint_pending) {
get_and_pin_footprint = 4; get_and_pin_footprint = 4;
write_pair_for_checkpoint(ct, p); write_pair_for_checkpoint(ct, p);
......
#ifndef _TOKU_WORKSET_H
#define _TOKU_WORKSET_H
#include <toku_list.h>
#include <toku_pthread.h>
// the work struct is the base class for work to be done by some threads
struct work {
struct toku_list next;
};
// the workset struct contains the set of work to be done by some threads
// the lock protects the work list
struct workset {
pthread_mutex_t lock;
struct toku_list worklist;
};
static inline void workset_init(struct workset *ws) {
int r = toku_pthread_mutex_init(&ws->lock, NULL); assert(r == 0);
toku_list_init(&ws->worklist);
};
static inline void workset_destroy(struct workset *ws) {
assert(toku_list_empty(&ws->worklist));
int r = toku_pthread_mutex_destroy(&ws->lock); assert(r == 0);
}
static inline void workset_lock(struct workset *ws) {
int r = toku_pthread_mutex_lock(&ws->lock); assert(r == 0);
}
static inline void workset_unlock(struct workset *ws) {
int r = toku_pthread_mutex_unlock(&ws->lock); assert(r == 0);
}
// put work in the workset
static inline void workset_put(struct workset *ws, struct work *w) {
workset_lock(ws);
toku_list_push(&ws->worklist, &w->next);
workset_unlock(ws);
}
// put work in the workset. assume already locked.
static inline void workset_put_locked(struct workset *ws, struct work *w) {
toku_list_push(&ws->worklist, &w->next);
}
// get work from the workset
static inline struct work *workset_get(struct workset *ws) {
workset_lock(ws);
struct work *w = NULL;
if (!toku_list_empty(&ws->worklist)) {
struct toku_list *l = toku_list_pop_head(&ws->worklist);
w = toku_list_struct(l, struct work, next);
}
workset_unlock(ws);
return w;
}
// create a set of threads to run a given function
// tids will contain the thread id's of the created threads
// *ntids on input contains the number of threads requested, on output contains the number of threads created
static inline void threadset_create(toku_pthread_t tids[], int *ntids, void *(*f)(void *arg), void *arg) {
int n = *ntids;
int i;
for (i = 0; i < n; i++) {
int r = toku_pthread_create(&tids[i], NULL, f, arg);
if (r != 0)
break;
}
*ntids = i;
}
// join with a set of threads
static inline void threadset_join(toku_pthread_t tids[], int ntids) {
for (int i = 0; i < ntids; i++) {
void *ret;
int r = toku_pthread_join(tids[i], &ret); assert(r == 0);
}
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment