Commit d3dbd816 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

create multiple sub blocks per leaf in the loader. merge -r 18961:head ptq...

create multiple sub blocks per leaf in the loader.  merge -r 18961:head ptq branch to main refs[t:2351]

git-svn-id: file:///svn/toku/tokudb@18992 c7de825b-a66e-492c-adef-691d508d4ae1
parent 429ef45c
...@@ -67,6 +67,7 @@ BRT_SOURCES = \ ...@@ -67,6 +67,7 @@ BRT_SOURCES = \
recover \ recover \
roll \ roll \
rollback \ rollback \
sub_block \
ule \ ule \
threadpool \ threadpool \
toku_worker \ toku_worker \
......
...@@ -262,203 +262,7 @@ enum { ...@@ -262,203 +262,7 @@ enum {
uncompressed_version_offset = 8, uncompressed_version_offset = 8,
}; };
struct sub_block { #include "sub_block.h"
void *uncompressed_ptr;
u_int32_t uncompressed_size;
void *compressed_ptr;
u_int32_t compressed_size; // real compressed size
u_int32_t compressed_size_bound; // estimated compressed size
u_int32_t xsum; // sub block checksum
};
struct stored_sub_block {
u_int32_t uncompressed_size;
u_int32_t compressed_size;
u_int32_t xsum;
};
static void
sub_block_init(struct sub_block *sub_block) {
sub_block->uncompressed_ptr = 0;
sub_block->uncompressed_size = 0;
sub_block->compressed_ptr = 0;
sub_block->compressed_size_bound = 0;
sub_block->compressed_size = 0;
sub_block->xsum = 0;
}
// get the size of the compression header
static size_t
sub_block_header_size(int n_sub_blocks) {
return sizeof (u_int32_t) + n_sub_blocks * sizeof (struct stored_sub_block);
}
// get the sum of the sub block compressed sizes
static size_t
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]) {
size_t compressed_size_bound = 0;
for (int i = 0; i < n_sub_blocks; i++) {
sub_block[i].compressed_size_bound = compressBound(sub_block[i].uncompressed_size);
compressed_size_bound += sub_block[i].compressed_size_bound;
}
return compressed_size_bound;
}
// get the sum of the sub block uncompressed sizes
static size_t
get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]) {
size_t uncompressed_size = 0;
for (int i = 0; i < n_sub_blocks; i++)
uncompressed_size += sub_block[i].uncompressed_size;
return uncompressed_size;
}
// round up n
static inline int
alignup32(int a, int b) {
return ((a+b-1) / b) * b;
}
static const int max_sub_blocks = 8;
// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at
// least >= the target_sub_block_size.
static void
choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret) {
const int target_sub_block_size = 512*1024;
const int alignment = 32;
int n_sub_blocks, sub_block_size;
n_sub_blocks = total_size / target_sub_block_size;
if (n_sub_blocks <= 1) {
n_sub_blocks = 1;
sub_block_size = total_size;
} else {
if (n_sub_blocks > n_sub_blocks_limit) // limit the number of sub-blocks
n_sub_blocks = n_sub_blocks_limit;
sub_block_size = alignup32(total_size / n_sub_blocks, alignment);
while (sub_block_size * n_sub_blocks < total_size) // round up the sub-block size until big enough
sub_block_size += alignment;
}
*sub_block_size_ret = sub_block_size;
*n_sub_blocks_ret = n_sub_blocks;
}
static void
set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]) {
int size_left = total_size;
int i;
for (i = 0; i < n_sub_blocks-1; i++) {
sub_block[i].uncompressed_size = sub_block_size;
size_left -= sub_block_size;
}
if (i == 0 || size_left > 0)
sub_block[i].uncompressed_size = size_left;
}
#include "workset.h"
struct compress_work {
struct work base;
struct sub_block *sub_block;
};
static void
compress_work_init(struct compress_work *w, struct sub_block *sub_block) {
w->sub_block = sub_block;
}
static void
compress_sub_block(struct sub_block *sub_block) {
// compress it
Bytef *uncompressed_ptr = (Bytef *) sub_block->uncompressed_ptr;
Bytef *compressed_ptr = (Bytef *) sub_block->compressed_ptr;
uLongf uncompressed_len = sub_block->uncompressed_size;
uLongf real_compressed_len = sub_block->compressed_size_bound;
int compression_level = 5;
int r = compress2((Bytef*)compressed_ptr, &real_compressed_len,
(Bytef*)uncompressed_ptr, uncompressed_len,
compression_level);
assert(r == Z_OK);
sub_block->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size
// checksum it
sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size);
}
static void *
compress_worker(void *arg) {
struct workset *ws = (struct workset *) arg;
while (1) {
struct compress_work *w = (struct compress_work *) workset_get(ws);
if (w == NULL)
break;
compress_sub_block(w->sub_block);
}
return arg;
}
static size_t
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr) {
char *compressed_base_ptr = compressed_ptr;
size_t compressed_len;
if (n_sub_blocks == 1) {
// single sub-block
sub_block[0].uncompressed_ptr = uncompressed_ptr;
sub_block[0].compressed_ptr = compressed_ptr;
compress_sub_block(&sub_block[0]);
compressed_len = sub_block[0].compressed_size;
} else {
// multiple sub-blocks
int T = num_cores; // T = min(num_cores, n_sub_blocks) - 1
if (T > n_sub_blocks)
T = n_sub_blocks;
if (T > 0)
T = T - 1; // threads in addition to the running thread
struct workset ws;
workset_init(&ws);
struct compress_work work[n_sub_blocks];
workset_lock(&ws);
for (int i = 0; i < n_sub_blocks; i++) {
sub_block[i].uncompressed_ptr = uncompressed_ptr;
sub_block[i].compressed_ptr = compressed_ptr;
compress_work_init(&work[i], &sub_block[i]);
workset_put_locked(&ws, &work[i].base);
uncompressed_ptr += sub_block[i].uncompressed_size;
compressed_ptr += sub_block[i].compressed_size_bound;
}
workset_unlock(&ws);
// compress the sub-blocks
if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks);
toku_pthread_t tids[T];
threadset_create(tids, &T, compress_worker, &ws);
compress_worker(&ws);
// wait for all of the work to complete
threadset_join(tids, T);
// squeeze out the holes not used by the compress bound
compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size;
for (int i = 1; i < n_sub_blocks; i++) {
memmove(compressed_ptr, sub_block[i].compressed_ptr, sub_block[i].compressed_size);
compressed_ptr += sub_block[i].compressed_size;
}
compressed_len = compressed_ptr - compressed_base_ptr;
}
return compressed_len;
}
static void static void
serialize_node_header(BRTNODE node, struct wbuf *wbuf) { serialize_node_header(BRTNODE node, struct wbuf *wbuf) {
...@@ -672,7 +476,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_th ...@@ -672,7 +476,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_th
// compress all of the sub blocks // compress all of the sub blocks
char *uncompressed_ptr = buf + node_header_overhead; char *uncompressed_ptr = buf + node_header_overhead;
char *compressed_ptr = compressed_buf + header_len; char *compressed_ptr = compressed_buf + header_len;
compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr); compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr, num_cores);
//if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len); //if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-node_header_overhead, (uint64_t) compressed_len);
...@@ -687,7 +491,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_th ...@@ -687,7 +491,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_th
} }
// compute the header checksum and serialize it // compute the header checksum and serialize it
uint32_t header_length = (void *)ptr - (void *) compressed_buf; uint32_t header_length = (char *)ptr - (char *)compressed_buf;
uint32_t xsum = x1764_memory(compressed_buf, header_length); uint32_t xsum = x1764_memory(compressed_buf, header_length);
*ptr = toku_htod32(xsum); *ptr = toku_htod32(xsum);
...@@ -969,101 +773,6 @@ deserialize_brtnode_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *b ...@@ -969,101 +773,6 @@ deserialize_brtnode_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *b
return 0; return 0;
} }
#include "workset.h"
struct decompress_work {
struct work base;
void *compress_ptr;
void *uncompress_ptr;
u_int32_t compress_size;
u_int32_t uncompress_size;
u_int32_t xsum;
int error;
};
// initialize the decompression work
static void
decompress_work_init(struct decompress_work *dw,
void *compress_ptr, u_int32_t compress_size,
void *uncompress_ptr, u_int32_t uncompress_size,
u_int32_t xsum) {
dw->compress_ptr = compress_ptr;
dw->compress_size = compress_size;
dw->uncompress_ptr = uncompress_ptr;
dw->uncompress_size = uncompress_size;
dw->xsum = xsum;
dw->error = 0;
}
// decompress one block
static int
decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum) {
// verify checksum
u_int32_t xsum = x1764_memory(compress_ptr, compress_size);
assert(xsum == expected_xsum);
// decompress
uLongf destlen = uncompress_size;
int r = uncompress(uncompress_ptr, &destlen, compress_ptr, compress_size);
assert(destlen == uncompress_size);
assert(r==Z_OK);
return 0;
}
// decompress blocks until there is no more work to do
static void *
decompress_worker(void *arg) {
struct workset *ws = (struct workset *) arg;
while (1) {
struct decompress_work *dw = (struct decompress_work *) workset_get(ws);
if (dw == NULL)
break;
dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum);
}
return arg;
}
static void
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data) {
if (n_sub_blocks == 1) {
decompress_sub_block(compressed_data, sub_block[0].compressed_size, uncompressed_data, sub_block[0].uncompressed_size, sub_block[0].xsum);
} else {
// compute the number of additional threads needed for decompressing this node
int T = num_cores; // T = min(#cores, #blocks) - 1
if (T > n_sub_blocks)
T = n_sub_blocks;
if (T > 0)
T = T - 1; // threads in addition to the running thread
// init the decompression work set
struct workset ws;
workset_init(&ws);
// initialize the decompression work and add to the work set
struct decompress_work decompress_work[n_sub_blocks];
workset_lock(&ws);
for (int i = 0; i < n_sub_blocks; i++) {
decompress_work_init(&decompress_work[i], compressed_data, sub_block[i].compressed_size, uncompressed_data, sub_block[i].uncompressed_size, sub_block[i].xsum);
workset_put_locked(&ws, &decompress_work[i].base);
uncompressed_data += sub_block[i].uncompressed_size;
compressed_data += sub_block[i].compressed_size;
}
workset_unlock(&ws);
// decompress the sub-blocks
if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T);
toku_pthread_t tids[T];
threadset_create(tids, &T, decompress_worker, &ws);
decompress_worker(&ws);
// cleanup
threadset_join(tids, T);
workset_destroy(&ws);
}
}
static int static int
decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) { decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) {
toku_trace("decompress"); toku_trace("decompress");
...@@ -1116,7 +825,7 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb ...@@ -1116,7 +825,7 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb
unsigned char *uncompressed_data = rb->buf + node_header_overhead; unsigned char *uncompressed_data = rb->buf + node_header_overhead;
// decompress all the compressed sub blocks into the uncompressed buffer // decompress all the compressed sub blocks into the uncompressed buffer
decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data); decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores);
toku_trace("decompress done"); toku_trace("decompress done");
......
...@@ -284,6 +284,7 @@ dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) { ...@@ -284,6 +284,7 @@ dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) {
toku_free(vp); toku_free(vp);
} }
#if 0
static void static void
hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) { hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
u_int64_t i; u_int64_t i;
...@@ -298,6 +299,39 @@ hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) { ...@@ -298,6 +299,39 @@ hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
} }
printf("\n"); printf("\n");
} }
#endif
static void
hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
u_int64_t n = size / 32;
for (u_int64_t i = 0; i < n; i++) {
printf("%"PRIu64": ", offset);
for (u_int64_t j = 0; j < 32; j++) {
unsigned char c = vp[j];
printf("%2.2X", c);
if (((j+1) % 4) == 0)
printf(" ");
}
for (u_int64_t j = 0; j < 32; j++) {
unsigned char c = vp[j];
printf("%c", isprint(c) ? c : ' ');
}
printf("\n");
vp += 32;
offset += 32;
}
size = size % 32;
for (u_int64_t i=0; i<size; i++) {
if ((i % 32) == 0)
printf("%"PRIu64": ", offset+i);
printf("%2.2X", vp[i]);
if (((i+1) % 4) == 0)
printf(" ");
if (((i+1) % 32) == 0)
printf("\n");
}
printf("\n");
}
static void static void
dump_file(int f, u_int64_t offset, u_int64_t size) { dump_file(int f, u_int64_t offset, u_int64_t size) {
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "brtloader-internal.h" #include "brtloader-internal.h"
#include "brt-internal.h" #include "brt-internal.h"
#include "sub_block.h"
static size_t (*os_fwrite_fun)(const void *,size_t,size_t,FILE*)=NULL; static size_t (*os_fwrite_fun)(const void *,size_t,size_t,FILE*)=NULL;
void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) { void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) {
...@@ -1178,48 +1179,52 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr ...@@ -1178,48 +1179,52 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
+4 // layout version +4 // layout version
+4 // layout version original +4 // layout version original
); );
int n_extra_bytes_for_compression = (+4 // n_sub blocks
+4 // compressed size
+4 // compressed size
+4 // sub block checksum
+4 // header checksum
);
int header_len = n_uncompressed_bytes_at_beginning + n_extra_bytes_for_compression;
int compression_level = 5;
int uncompressed_len = lbuf->dbuf.off - n_uncompressed_bytes_at_beginning; int uncompressed_len = lbuf->dbuf.off - n_uncompressed_bytes_at_beginning;
int bound = compressBound(uncompressed_len);
// choose sub block size and number
int sub_block_size, n_sub_blocks;
choose_sub_block_size(uncompressed_len, max_sub_blocks, &sub_block_size, &n_sub_blocks);
int header_len = n_uncompressed_bytes_at_beginning + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t);
// initialize the sub blocks
struct sub_block sub_block[n_sub_blocks];
for (int i = 0; i < n_sub_blocks; i++)
sub_block_init(&sub_block[i]);
set_all_sub_block_sizes(uncompressed_len, sub_block_size, n_sub_blocks, sub_block);
// allocate space for the compressed bufer
int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
unsigned char *MALLOC_N(header_len + bound, compressed_buf); unsigned char *MALLOC_N(header_len + bound, compressed_buf);
uLongf real_compressed_len = bound;
{
int r = compress2((Bytef*)(compressed_buf + header_len), &real_compressed_len,
(Bytef*)(lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning), uncompressed_len,
compression_level);
assert(r==Z_OK);
}
// checksum the sub block
u_int32_t xsum0 = x1764_memory(compressed_buf + header_len, real_compressed_len);
// compress and checksum the sub blocks
int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block,
(char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning),
(char *) (compressed_buf + header_len), 2);
// cppy the uncompressed header to the compressed buffer
memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning); memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
int compressed_len = real_compressed_len;
int n_compressed_blocks = 1; // serialize the sub block header
memcpy(compressed_buf+16, &n_compressed_blocks, 4); memcpy(compressed_buf+16, &n_sub_blocks, 4);
memcpy(compressed_buf+20, &compressed_len, 4); for (int i = 0; i < n_sub_blocks; i++) {
memcpy(compressed_buf+24, &uncompressed_len, 4); memcpy(compressed_buf+20+12*i+0, &sub_block[i].compressed_size, 4);
memcpy(compressed_buf+28, &xsum0, 4); memcpy(compressed_buf+20+12*i+4, &sub_block[i].uncompressed_size, 4);
memcpy(compressed_buf+20+12*i+8, &sub_block[i].xsum, 4);
}
// compute the header checksum and serialize it // compute the header checksum and serialize it
u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t)); u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t));
memcpy(compressed_buf+32, &header_xsum, 4); memcpy(compressed_buf + header_len - sizeof (u_int32_t), &header_xsum, 4);
//#ifndef CILK_STUB //#ifndef CILK_STUB
// ttable_and_write_lock->lock(); // ttable_and_write_lock->lock();
//#endif //#endif
long long off_of_leaf = out->current_off; long long off_of_leaf = out->current_off;
int size = real_compressed_len + header_len; int size = header_len + compressed_len;
if (0) { if (0) {
fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len); fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len);
fprintf(stderr, "compressed buf size=%lu, off=%lld\n", real_compressed_len, off_of_leaf); fprintf(stderr, "compressed buf size=%u, off=%lld\n", compressed_len, off_of_leaf);
fprintf(stderr, "compressed bytes are:"); fprintf(stderr, "compressed bytes are:");
//for (int i=0; i<compressed_len; i++) { //for (int i=0; i<compressed_len; i++) {
// unsigned char c = compressed_buf[28+i]; // unsigned char c = compressed_buf[28+i];
......
#include <stdio.h>
#include <string.h>
#include <zlib.h>
#include "toku_portability.h"
#include "toku_assert.h"
#include "x1764.h"
#include "sub_block.h"
void
sub_block_init(struct sub_block *sub_block) {
sub_block->uncompressed_ptr = 0;
sub_block->uncompressed_size = 0;
sub_block->compressed_ptr = 0;
sub_block->compressed_size_bound = 0;
sub_block->compressed_size = 0;
sub_block->xsum = 0;
}
// get the size of the compression header
size_t
sub_block_header_size(int n_sub_blocks) {
return sizeof (u_int32_t) + n_sub_blocks * sizeof (struct stored_sub_block);
}
// get the sum of the sub block compressed sizes
size_t
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]) {
size_t compressed_size_bound = 0;
for (int i = 0; i < n_sub_blocks; i++) {
sub_block[i].compressed_size_bound = compressBound(sub_block[i].uncompressed_size);
compressed_size_bound += sub_block[i].compressed_size_bound;
}
return compressed_size_bound;
}
// get the sum of the sub block uncompressed sizes
size_t
get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]) {
size_t uncompressed_size = 0;
for (int i = 0; i < n_sub_blocks; i++)
uncompressed_size += sub_block[i].uncompressed_size;
return uncompressed_size;
}
// round up n
static inline int
alignup32(int a, int b) {
return ((a+b-1) / b) * b;
}
// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at
// least >= the target_sub_block_size.
void
choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret) {
const int alignment = 32;
int n_sub_blocks, sub_block_size;
n_sub_blocks = total_size / target_sub_block_size;
if (n_sub_blocks <= 1) {
n_sub_blocks = 1;
sub_block_size = total_size;
} else {
if (n_sub_blocks > n_sub_blocks_limit) // limit the number of sub-blocks
n_sub_blocks = n_sub_blocks_limit;
sub_block_size = alignup32(total_size / n_sub_blocks, alignment);
while (sub_block_size * n_sub_blocks < total_size) // round up the sub-block size until big enough
sub_block_size += alignment;
}
*sub_block_size_ret = sub_block_size;
*n_sub_blocks_ret = n_sub_blocks;
}
void
set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]) {
int size_left = total_size;
int i;
for (i = 0; i < n_sub_blocks-1; i++) {
sub_block[i].uncompressed_size = sub_block_size;
size_left -= sub_block_size;
}
if (i == 0 || size_left > 0)
sub_block[i].uncompressed_size = size_left;
}
#include "workset.h"
void
compress_work_init(struct compress_work *w, struct sub_block *sub_block) {
w->sub_block = sub_block;
}
void
compress_sub_block(struct sub_block *sub_block) {
// compress it
Bytef *uncompressed_ptr = (Bytef *) sub_block->uncompressed_ptr;
Bytef *compressed_ptr = (Bytef *) sub_block->compressed_ptr;
uLongf uncompressed_len = sub_block->uncompressed_size;
uLongf real_compressed_len = sub_block->compressed_size_bound;
int compression_level = 5;
int r = compress2((Bytef*)compressed_ptr, &real_compressed_len,
(Bytef*)uncompressed_ptr, uncompressed_len,
compression_level);
assert(r == Z_OK);
sub_block->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size
// checksum it
sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size);
}
void *
compress_worker(void *arg) {
struct workset *ws = (struct workset *) arg;
while (1) {
struct compress_work *w = (struct compress_work *) workset_get(ws);
if (w == NULL)
break;
compress_sub_block(w->sub_block);
}
return arg;
}
size_t
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores) {
char *compressed_base_ptr = compressed_ptr;
size_t compressed_len;
if (n_sub_blocks == 1) {
// single sub-block
sub_block[0].uncompressed_ptr = uncompressed_ptr;
sub_block[0].compressed_ptr = compressed_ptr;
compress_sub_block(&sub_block[0]);
compressed_len = sub_block[0].compressed_size;
} else {
// multiple sub-blocks
int T = num_cores; // T = min(num_cores, n_sub_blocks) - 1
if (T > n_sub_blocks)
T = n_sub_blocks;
if (T > 0)
T = T - 1; // threads in addition to the running thread
struct workset ws;
workset_init(&ws);
struct compress_work work[n_sub_blocks];
workset_lock(&ws);
for (int i = 0; i < n_sub_blocks; i++) {
sub_block[i].uncompressed_ptr = uncompressed_ptr;
sub_block[i].compressed_ptr = compressed_ptr;
compress_work_init(&work[i], &sub_block[i]);
workset_put_locked(&ws, &work[i].base);
uncompressed_ptr += sub_block[i].uncompressed_size;
compressed_ptr += sub_block[i].compressed_size_bound;
}
workset_unlock(&ws);
// compress the sub-blocks
if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks);
toku_pthread_t tids[T];
threadset_create(tids, &T, compress_worker, &ws);
compress_worker(&ws);
// wait for all of the work to complete
threadset_join(tids, T);
// squeeze out the holes not used by the compress bound
compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size;
for (int i = 1; i < n_sub_blocks; i++) {
memmove(compressed_ptr, sub_block[i].compressed_ptr, sub_block[i].compressed_size);
compressed_ptr += sub_block[i].compressed_size;
}
compressed_len = compressed_ptr - compressed_base_ptr;
}
return compressed_len;
}
// initialize the decompression work
void
decompress_work_init(struct decompress_work *dw,
void *compress_ptr, u_int32_t compress_size,
void *uncompress_ptr, u_int32_t uncompress_size,
u_int32_t xsum) {
dw->compress_ptr = compress_ptr;
dw->compress_size = compress_size;
dw->uncompress_ptr = uncompress_ptr;
dw->uncompress_size = uncompress_size;
dw->xsum = xsum;
dw->error = 0;
}
// decompress one block
int
decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum) {
// verify checksum
u_int32_t xsum = x1764_memory(compress_ptr, compress_size);
assert(xsum == expected_xsum);
// decompress
uLongf destlen = uncompress_size;
int r = uncompress(uncompress_ptr, &destlen, compress_ptr, compress_size);
assert(destlen == uncompress_size);
assert(r==Z_OK);
return 0;
}
// decompress blocks until there is no more work to do
void *
decompress_worker(void *arg) {
struct workset *ws = (struct workset *) arg;
while (1) {
struct decompress_work *dw = (struct decompress_work *) workset_get(ws);
if (dw == NULL)
break;
dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum);
}
return arg;
}
void
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores) {
if (n_sub_blocks == 1) {
decompress_sub_block(compressed_data, sub_block[0].compressed_size, uncompressed_data, sub_block[0].uncompressed_size, sub_block[0].xsum);
} else {
// compute the number of additional threads needed for decompressing this node
int T = num_cores; // T = min(#cores, #blocks) - 1
if (T > n_sub_blocks)
T = n_sub_blocks;
if (T > 0)
T = T - 1; // threads in addition to the running thread
// init the decompression work set
struct workset ws;
workset_init(&ws);
// initialize the decompression work and add to the work set
struct decompress_work decompress_work[n_sub_blocks];
workset_lock(&ws);
for (int i = 0; i < n_sub_blocks; i++) {
decompress_work_init(&decompress_work[i], compressed_data, sub_block[i].compressed_size, uncompressed_data, sub_block[i].uncompressed_size, sub_block[i].xsum);
workset_put_locked(&ws, &decompress_work[i].base);
uncompressed_data += sub_block[i].uncompressed_size;
compressed_data += sub_block[i].compressed_size;
}
workset_unlock(&ws);
// decompress the sub-blocks
if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T);
toku_pthread_t tids[T];
threadset_create(tids, &T, decompress_worker, &ws);
decompress_worker(&ws);
// cleanup
threadset_join(tids, T);
workset_destroy(&ws);
}
}
#ifndef TOKU_SUB_BLOCK_H
#define TOKU_SUB_BLOCK_H
static const int max_sub_blocks = 8;
static const int target_sub_block_size = 512*1024;
struct sub_block {
void *uncompressed_ptr;
u_int32_t uncompressed_size;
void *compressed_ptr;
u_int32_t compressed_size; // real compressed size
u_int32_t compressed_size_bound; // estimated compressed size
u_int32_t xsum; // sub block checksum
};
struct stored_sub_block {
u_int32_t uncompressed_size;
u_int32_t compressed_size;
u_int32_t xsum;
};
void
sub_block_init(struct sub_block *sub_block);
// get the size of the compression header
size_t
sub_block_header_size(int n_sub_blocks);
// get the sum of the sub block compressed sizes
size_t
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]);
// get the sum of the sub block uncompressed sizes
size_t
get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]);
// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at
// least >= the target_sub_block_size.
void
choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret);
void
set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]);
#include "workset.h"
struct compress_work {
struct work base;
struct sub_block *sub_block;
};
void
compress_work_init(struct compress_work *w, struct sub_block *sub_block);
void
compress_sub_block(struct sub_block *sub_block);
void *
compress_worker(void *arg);
size_t
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr, int num_cores);
struct decompress_work {
struct work base;
void *compress_ptr;
void *uncompress_ptr;
u_int32_t compress_size;
u_int32_t uncompress_size;
u_int32_t xsum;
int error;
};
// initialize the decompression work
void
decompress_work_init(struct decompress_work *dw,
void *compress_ptr, u_int32_t compress_size,
void *uncompress_ptr, u_int32_t uncompress_size,
u_int32_t xsum);
// decompress one block
int
decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum);
// decompress blocks until there is no more work to do
void *
decompress_worker(void *arg);
void
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores);
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment