Commit bdebc7d2 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

some reorganization of brt serialize. merge -r 18898:head...

some reorganization of brt serialize.  merge -r 18898:head https://svn.tokutek.com/tokudb/toku/tokudb.2351.ptq main refs[t:2351] #2351

git-svn-id: file:///svn/toku/tokudb@18919 c7de825b-a66e-492c-adef-691d508d4ae1
parent 50b75b68
......@@ -558,7 +558,7 @@ decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf
// verify the sizes of the compressed sub blocks
if (0 && n_sub_blocks != 1) printf("%s:%d %d\n", __FUNCTION__, __LINE__, n_sub_blocks);
struct sub_block_sizes sub_block_sizes[n_sub_blocks];
struct sub_block sub_block[n_sub_blocks];
for (i=0; i<n_sub_blocks; i++) {
u_int32_t compressed_size = toku_dtoh32(*(u_int32_t*)(&raw_block[compression_header_offset+8*i]));
if (compressed_size<=0 || compressed_size>(1<<30)) { r = toku_db_badformat(); return r; }
......@@ -566,13 +566,13 @@ decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf
if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = toku_db_badformat(); return r; }
sub_block_sizes[i].compressed_size = compressed_size;
sub_block_sizes[i].uncompressed_size = uncompressed_size;
sub_block[i].compressed_size = compressed_size;
sub_block[i].uncompressed_size = uncompressed_size;
}
unsigned char *compressed_data = raw_block + uncompressed_magic_len_10 + get_compression_header_size(BRT_LAYOUT_VERSION, n_sub_blocks);
unsigned char *compressed_data = raw_block + uncompressed_magic_len_10 + sub_block_header_size(n_sub_blocks);
size_t uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block_sizes);
size_t uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
rb->size= uncompressed_magic_len_10 + uncompressed_size;
assert(rb->size>0);
......@@ -586,7 +586,7 @@ decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf
struct decompress_work_10 decompress_work[n_sub_blocks];
for (i=0; i<n_sub_blocks; i++) {
init_decompress_work_10(&decompress_work[i], compressed_data, sub_block_sizes[i].compressed_size, uncompressed_data, sub_block_sizes[i].uncompressed_size);
init_decompress_work_10(&decompress_work[i], compressed_data, sub_block[i].compressed_size, uncompressed_data, sub_block[i].uncompressed_size);
if (i>0) {
#if DO_DECOMPRESS_WORKER
start_decompress_work_10(&decompress_work[i]);
......@@ -594,8 +594,8 @@ decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf
do_decompress_work_10(&decompress_work[i]);
#endif
}
uncompressed_data += sub_block_sizes[i].uncompressed_size;
compressed_data += sub_block_sizes[i].compressed_size;
uncompressed_data += sub_block[i].uncompressed_size;
compressed_data += sub_block[i].compressed_size;
}
do_decompress_work_10(&decompress_work[0]);
#if DO_DECOMPRESS_WORKER
......
......@@ -4,26 +4,21 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
#include "backwards_10.h"
static int num_cores = 0;
int
toku_brt_serialize_init(void) {
num_cores = toku_os_get_number_processors();
return 0;
}
int
toku_brt_serialize_destroy(void) {
return 0;
}
#include "backwards_10.h"
// NOTE: The backwards compatability functions are in a file that is included at the END of this file.
static int deserialize_brtheader_10 (int fd, struct rbuf *rb, struct brt_header **brth);
static int upgrade_brtheader_10_11 (struct brt_header **brth_10, struct brt_header **brth_11);
static int decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum);
static int deserialize_brtnode_from_rbuf_10 (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h, struct rbuf *rb);
static int upgrade_brtnode_10_11 (BRTNODE *brtnode_10, BRTNODE *brtnode_11);
// performance tracing
#define DO_TOKU_TRACE 0
#if DO_TOKU_TRACE
static int toku_trace_fd = -1;
static inline void do_toku_trace(const char *cp, int len) {
const int toku_trace_fd = -1;
write(toku_trace_fd, cp, len);
}
#define toku_trace(a) do_toku_trace(a, strlen(a))
......@@ -31,44 +26,31 @@ static inline void do_toku_trace(const char *cp, int len) {
#define toku_trace(a)
#endif
// NOTE: The backwards compatability functions are in a file that is included at the END of this file.
static int deserialize_brtheader_10 (int fd, struct rbuf *rb, struct brt_header **brth);
static int upgrade_brtheader_10_11 (struct brt_header **brth_10, struct brt_header **brth_11);
static int decompress_brtnode_from_raw_block_into_rbuf_10(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum);
static int deserialize_brtnode_from_rbuf_10 (BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h, struct rbuf *rb);
static int upgrade_brtnode_10_11 (BRTNODE *brtnode_10, BRTNODE *brtnode_11);
static int num_cores = 0; // cache the number of cores for the parallelization
#if 0
static u_int64_t ntohll(u_int64_t v) {
union u {
u_int32_t l[2];
u_int64_t ll;
} uv;
uv.ll = v;
return (((u_int64_t)uv.l[0])<<32) + uv.l[1];
}
#endif
static u_int64_t umin64(u_int64_t a, u_int64_t b) {
if (a<b) return a;
return b;
int
toku_brt_serialize_init(void) {
num_cores = toku_os_get_number_processors();
return 0;
}
static inline u_int64_t alignup (u_int64_t a, u_int64_t b) {
return ((a+b-1)/b)*b;
int
toku_brt_serialize_destroy(void) {
return 0;
}
// This mutex protects pwrite from running in parallel, and also protects modifications to the block allocator.
static toku_pthread_mutex_t pwrite_mutex = TOKU_PTHREAD_MUTEX_INITIALIZER;
static int pwrite_is_locked=0;
int toku_pwrite_lock_init(void) {
int
toku_pwrite_lock_init(void) {
int r = toku_pthread_mutex_init(&pwrite_mutex, NULL); assert(r == 0);
return r;
}
int toku_pwrite_lock_destroy(void) {
int
toku_pwrite_lock_destroy(void) {
int r = toku_pthread_mutex_destroy(&pwrite_mutex); assert(r == 0);
return r;
}
......@@ -91,6 +73,11 @@ unlock_for_pwrite (void) {
enum {FILE_CHANGE_INCREMENT = (16<<20)};
static inline u_int64_t
alignup64(u_int64_t a, u_int64_t b) {
return ((a+b-1)/b)*b;
}
//Race condition if ydb lock is split.
//Ydb lock is held when this function is called.
//Not going to truncate and delete (redirect to devnull) at same time.
......@@ -120,7 +107,7 @@ toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used)
assert(file_size >= 0);
}
if ((u_int64_t)file_size >= size_used + (2*FILE_CHANGE_INCREMENT)) {
toku_off_t new_size = alignup(size_used, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used.
toku_off_t new_size = alignup64(size_used, (2*FILE_CHANGE_INCREMENT)); //Truncate to new size_used.
assert(new_size < file_size);
int r = toku_cachefile_truncate(cf, new_size);
assert(r==0);
......@@ -131,6 +118,12 @@ toku_maybe_truncate_cachefile (CACHEFILE cf, int fd, u_int64_t size_used)
return;
}
static u_int64_t
umin64(u_int64_t a, u_int64_t b) {
if (a<b) return a;
return b;
}
int
maybe_preallocate_in_file (int fd, u_int64_t size)
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MiB whichever is less.
......@@ -150,7 +143,7 @@ maybe_preallocate_in_file (int fd, u_int64_t size)
const int N = umin64(size, FILE_CHANGE_INCREMENT); // Double the size of the file, or add 16MiB, whichever is less.
char *MALLOC_N(N, wbuf);
memset(wbuf, 0, N);
toku_off_t start_write = alignup(file_size, 4096);
toku_off_t start_write = alignup64(file_size, 4096);
assert(start_write >= file_size);
toku_os_full_pwrite(fd, wbuf, N, start_write);
toku_free(wbuf);
......@@ -192,24 +185,24 @@ addupsize (OMTVALUE lev, u_int32_t UU(idx), void *vp) {
return 0;
}
static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
static unsigned int
toku_serialize_brtnode_size_slow (BRTNODE node) {
unsigned int size=brtnode_header_overhead;
size += toku_serialize_descriptor_size(node->desc);
if (node->height>0) {
unsigned int hsize=0;
unsigned int csize=0;
int i;
size+=4; /* n_children */
size+=4; /* subtree fingerprint. */
size+=4*(node->u.n.n_children-1); /* key lengths*/
if (node->flags & TOKU_DB_DUPSORT) size += 4*(node->u.n.n_children-1);
for (i=0; i<node->u.n.n_children-1; i++) {
for (int i=0; i<node->u.n.n_children-1; i++) {
csize+=toku_brtnode_pivot_key_len(node, node->u.n.childkeys[i]);
}
size+=(8+4+4+1+3*8)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, the subtree fingerprint, and 3*8 for the subtree estimates and 1 for the exact bit for the estimates. */
int n_buffers = node->u.n.n_children;
assert(0 <= n_buffers && n_buffers < TREE_FANOUT+1);
for (i=0; i< n_buffers; i++) {
for (int i=0; i< n_buffers; i++) {
FIFO_ITERATE(BNC_BUFFER(node,i),
key, keylen,
data __attribute__((__unused__)), datalen,
......@@ -233,7 +226,8 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
}
// This is the size of the uncompressed data, not including the compression headers
unsigned int toku_serialize_brtnode_size (BRTNODE node) {
unsigned int
toku_serialize_brtnode_size (BRTNODE node) {
unsigned int result =brtnode_header_overhead;
assert(sizeof(toku_off_t)==8);
result += toku_serialize_descriptor_size(node->desc);
......@@ -259,13 +253,6 @@ unsigned int toku_serialize_brtnode_size (BRTNODE node) {
return result;
}
static int
wbufwriteleafentry (OMTVALUE lev, u_int32_t UU(idx), void *v) {
LEAFENTRY le=lev;
struct wbuf *thisw=v;
wbuf_LEAFENTRY(thisw, le);
return 0;
}
enum { uncompressed_magic_len = (8 // tokuleaf or tokunode
+4 // layout version
......@@ -279,114 +266,136 @@ enum {
uncompressed_version_offset = 8,
};
// compression header sub block sizes
struct sub_block_sizes {
struct sub_block {
void *uncompressed_ptr;
u_int32_t uncompressed_size;
void *compressed_ptr;
u_int32_t compressed_size; // real compressed size
u_int32_t compressed_size_bound; // estimated compressed size
};
// round up n
static inline int roundup2(int n, int alignment) {
return (n+alignment-1)&~(alignment-1);
}
u_int32_t xsum; // sub block checksum
};
static const int target_sub_block_size = 512*1024;
static const int max_sub_blocks = 8;
struct stored_sub_block {
u_int32_t uncompressed_size;
u_int32_t compressed_size;
// u_int32_t xsum;
};
// choose the number of sub blocks such that the sub block size
// is around 1 meg. put an upper bound on the number of sub blocks.
static int choose_sub_block_sizes(int total_size, int maxn, struct sub_block_sizes sizes[]) {
const int alignment = 256;
static void
sub_block_init(struct sub_block *sub_block) {
sub_block->uncompressed_ptr = 0;
sub_block->uncompressed_size = 0;
int n, subsize;
n = total_size/target_sub_block_size;
if (n == 0) {
n = 1;
subsize = total_size;
} else {
if (n > maxn)
n = maxn;
subsize = roundup2(total_size/n, alignment);
while (n < maxn && subsize >= target_sub_block_size + target_sub_block_size/8) {
n++;
subsize = roundup2(total_size/n, alignment);
}
}
sub_block->compressed_ptr = 0;
sub_block->compressed_size_bound = 0;
sub_block->compressed_size = 0;
// generate the sub block sizes
int i;
for (i=0; i<n-1; i++) {
sizes[i].uncompressed_size = subsize;
sizes[i].compressed_size_bound = compressBound(subsize);
sizes[i].compressed_size = 0;
total_size -= subsize;
}
if (i == 0 || total_size > 0) {
sizes[i].uncompressed_size = total_size;
sizes[i].compressed_size_bound = compressBound(total_size);
sizes[i].compressed_size = 0;
i++;
}
return i;
sub_block->xsum = 0;
}
// get the size of the compression header
static size_t get_compression_header_size(int UU(layout_version), int n) {
return sizeof (u_int32_t) + (n * 2 * sizeof (u_int32_t));
static size_t
sub_block_header_size(int n_sub_blocks) {
return sizeof (u_int32_t) + n_sub_blocks * sizeof (struct stored_sub_block);
}
// get the sum of the sub block compressed sizes
static size_t get_sum_compressed_size_bound(int n, struct sub_block_sizes sizes[]) {
int i;
size_t compressed_size = 0;
for (i=0; i<n; i++)
compressed_size += sizes[i].compressed_size_bound;
return compressed_size;
static size_t
get_sum_compressed_size_bound(int n_sub_blocks, struct sub_block sub_block[]) {
size_t compressed_size_bound = 0;
for (int i = 0; i < n_sub_blocks; i++) {
sub_block[i].compressed_size_bound = compressBound(sub_block[i].uncompressed_size);
compressed_size_bound += sub_block[i].compressed_size_bound;
}
return compressed_size_bound;
}
// get the sum of the sub block uncompressed sizes
static size_t get_sum_uncompressed_size(int n, struct sub_block_sizes sizes[]) {
int i;
static size_t
get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]) {
size_t uncompressed_size = 0;
for (i=0; i<n; i++)
uncompressed_size += sizes[i].uncompressed_size;
for (int i = 0; i < n_sub_blocks; i++)
uncompressed_size += sub_block[i].uncompressed_size;
return uncompressed_size;
}
// round up n
static inline int
alignup32(int a, int b) {
return ((a+b-1) / b) * b;
}
static const int max_sub_blocks = 8;
// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at
// least >= the target_sub_block_size.
static void
choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret) {
const int target_sub_block_size = 512*1024;
const int alignment = 32;
int n_sub_blocks, sub_block_size;
n_sub_blocks = total_size / target_sub_block_size;
if (n_sub_blocks <= 1) {
n_sub_blocks = 1;
sub_block_size = total_size;
} else {
if (n_sub_blocks > n_sub_blocks_limit) // limit the number of sub-blocks
n_sub_blocks = n_sub_blocks_limit;
sub_block_size = alignup32(total_size / n_sub_blocks, alignment);
while (sub_block_size * n_sub_blocks < total_size) // round up the sub-block size until big enough
sub_block_size += alignment;
}
*sub_block_size_ret = sub_block_size;
*n_sub_blocks_ret = n_sub_blocks;
}
static void
set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]) {
int size_left = total_size;
int i;
for (i = 0; i < n_sub_blocks-1; i++) {
sub_block[i].uncompressed_size = sub_block_size;
size_left -= sub_block_size;
}
if (i == 0 || size_left > 0)
sub_block[i].uncompressed_size = size_left;
}
#include "workset.h"
struct compress_work {
struct work base;
struct sub_block_sizes *sub_block_sizes;
void *from;
void *to;
struct sub_block *sub_block;
};
static void
compress_work_init(struct compress_work *w, void *from, void *to, struct sub_block_sizes *sub_block_sizes) {
assert(from); assert(to);
w->from = from;
w->to = to;
w->sub_block_sizes = sub_block_sizes;
compress_work_init(struct compress_work *w, struct sub_block *sub_block) {
w->sub_block = sub_block;
}
static void
compress_sub_block(struct compress_work *w) {
struct sub_block_sizes *sub_block_sizes = w->sub_block_sizes;
Bytef *uncompressed_ptr = (Bytef *) w->from;
Bytef *compressed_ptr = (Bytef *) w->to;
uLongf uncompressed_len = sub_block_sizes->uncompressed_size;
uLongf real_compressed_len = sub_block_sizes->compressed_size_bound;
compress_sub_block(struct sub_block *sub_block) {
// compress it
Bytef *uncompressed_ptr = (Bytef *) sub_block->uncompressed_ptr;
Bytef *compressed_ptr = (Bytef *) sub_block->compressed_ptr;
uLongf uncompressed_len = sub_block->uncompressed_size;
uLongf real_compressed_len = sub_block->compressed_size_bound;
int compression_level = 5;
int r = compress2((Bytef*)compressed_ptr, &real_compressed_len,
(Bytef*)uncompressed_ptr, uncompressed_len,
compression_level);
assert(r == Z_OK);
sub_block_sizes->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size
sub_block->compressed_size = real_compressed_len; // replace the compressed size estimate with the real size
// checksum it
// sub_block->xsum = x1764_memory(sub_block->compressed_ptr, sub_block->compressed_size);
}
static void *
......@@ -396,222 +405,322 @@ compress_worker(void *arg) {
struct compress_work *w = (struct compress_work *) workset_get(ws);
if (w == NULL)
break;
compress_sub_block(w);
compress_sub_block(w->sub_block);
}
return arg;
}
int toku_serialize_brtnode_to_memory (BRTNODE node, int n_workitems __attribute__((__unused__)), int n_threads __attribute__((__unused__)),
/*out*/ size_t *n_bytes_to_write,
/*out*/ char **bytes_to_write) {
struct wbuf w;
int i;
static size_t
compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *uncompressed_ptr, char *compressed_ptr) {
char *compressed_base_ptr = compressed_ptr;
size_t compressed_len;
if (n_sub_blocks == 1) {
// single sub-block
sub_block[0].uncompressed_ptr = uncompressed_ptr;
sub_block[0].compressed_ptr = compressed_ptr;
compress_sub_block(&sub_block[0]);
compressed_len = sub_block[0].compressed_size;
} else {
// multiple sub-blocks
int T = num_cores; // T = min(num_cores, n_sub_blocks) - 1
if (T > n_sub_blocks)
T = n_sub_blocks;
if (T > 0)
T = T - 1; // threads in addition to the running thread
struct workset ws;
workset_init(&ws);
struct compress_work work[n_sub_blocks];
workset_lock(&ws);
for (int i = 0; i < n_sub_blocks; i++) {
sub_block[i].uncompressed_ptr = uncompressed_ptr;
sub_block[i].compressed_ptr = compressed_ptr;
compress_work_init(&work[i], &sub_block[i]);
workset_put_locked(&ws, &work[i].base);
uncompressed_ptr += sub_block[i].uncompressed_size;
compressed_ptr += sub_block[i].compressed_size_bound;
}
workset_unlock(&ws);
// compress the sub-blocks
if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks);
toku_pthread_t tids[T];
threadset_create(tids, &T, compress_worker, &ws);
compress_worker(&ws);
// wait for all of the work to complete
threadset_join(tids, T);
// squeeze out the holes not used by the compress bound
compressed_ptr = compressed_base_ptr + sub_block[0].compressed_size;
for (int i = 1; i < n_sub_blocks; i++) {
memmove(compressed_ptr, sub_block[i].compressed_ptr, sub_block[i].compressed_size);
compressed_ptr += sub_block[i].compressed_size;
}
// serialize the node into buf
unsigned int calculated_size = toku_serialize_brtnode_size(node);
//printf("%s:%d serializing %" PRIu64 " size=%d\n", __FILE__, __LINE__, blocknum.b, calculated_size);
//assert(calculated_size<=size);
//char buf[size];
char *MALLOC_N(calculated_size, buf);
//toku_verify_counts(node);
//assert(size>0);
//printf("%s:%d serializing %lld w height=%d p0=%p\n", __FILE__, __LINE__, off, node->height, node->mdicts[0]);
wbuf_init(&w, buf, calculated_size);
wbuf_literal_bytes(&w, "toku", 4);
if (node->height==0) wbuf_literal_bytes(&w, "leaf", 4);
else wbuf_literal_bytes(&w, "node", 4);
compressed_len = compressed_ptr - compressed_base_ptr;
}
return compressed_len;
}
static void
serialize_node_header(BRTNODE node, struct wbuf *wbuf) {
wbuf_literal_bytes(wbuf, "toku", 4);
if (node->height==0)
wbuf_literal_bytes(wbuf, "leaf", 4);
else
wbuf_literal_bytes(wbuf, "node", 4);
assert(node->layout_version == BRT_LAYOUT_VERSION);
wbuf_int(&w, node->layout_version);
wbuf_int(&w, node->layout_version_original);
toku_serialize_descriptor_contents_to_wbuf(&w, node->desc);
wbuf_int(wbuf, node->layout_version);
wbuf_int(wbuf, node->layout_version_original);
// serialize the descriptor
toku_serialize_descriptor_contents_to_wbuf(wbuf, node->desc);
//printf("%s:%d %lld.calculated_size=%d\n", __FILE__, __LINE__, off, calculated_size);
wbuf_uint(&w, node->nodesize);
wbuf_uint(&w, node->flags);
wbuf_int(&w, node->height);
wbuf_uint(wbuf, node->nodesize);
wbuf_uint(wbuf, node->flags);
wbuf_int(wbuf, node->height);
//printf("%s:%d %lld rand=%08x sum=%08x height=%d\n", __FILE__, __LINE__, node->thisnodename, node->rand4fingerprint, node->subtree_fingerprint, node->height);
wbuf_uint(&w, node->rand4fingerprint);
wbuf_uint(&w, node->local_fingerprint);
// printf("%s:%d wrote %08x for node %lld\n", __FILE__, __LINE__, node->local_fingerprint, (long long)node->thisnodename);
wbuf_uint(wbuf, node->rand4fingerprint);
wbuf_uint(wbuf, node->local_fingerprint);
//printf("%s:%d wrote %08x for node %lld\n", __FILE__, __LINE__, node->local_fingerprint, (long long)node->thisnodename);
//printf("%s:%d local_fingerprint=%8x\n", __FILE__, __LINE__, node->local_fingerprint);
//printf("%s:%d w.ndone=%d n_children=%d\n", __FILE__, __LINE__, w.ndone, node->n_children);
if (node->height>0) {
assert(node->u.n.n_children>0);
// Local fingerprint is not actually stored while in main memory. Must calculate it.
// Subtract the child fingerprints from the subtree fingerprint to get the local fingerprint.
{
u_int32_t subtree_fingerprint = node->local_fingerprint;
for (i=0; i<node->u.n.n_children; i++) {
subtree_fingerprint += BNC_SUBTREE_FINGERPRINT(node, i);
}
wbuf_uint(&w, subtree_fingerprint);
}
wbuf_int(&w, node->u.n.n_children);
for (i=0; i<node->u.n.n_children; i++) {
wbuf_uint(&w, BNC_SUBTREE_FINGERPRINT(node, i));
struct subtree_estimates *se = &(BNC_SUBTREE_ESTIMATES(node, i));
wbuf_ulonglong(&w, se->nkeys);
wbuf_ulonglong(&w, se->ndata);
wbuf_ulonglong(&w, se->dsize);
wbuf_char (&w, (char)se->exact);
}
//printf("%s:%d w.ndone=%d\n", __FILE__, __LINE__, w.ndone);
for (i=0; i<node->u.n.n_children-1; i++) {
if (node->flags & TOKU_DB_DUPSORT) {
wbuf_bytes(&w, kv_pair_key(node->u.n.childkeys[i]), kv_pair_keylen(node->u.n.childkeys[i]));
wbuf_bytes(&w, kv_pair_val(node->u.n.childkeys[i]), kv_pair_vallen(node->u.n.childkeys[i]));
} else {
wbuf_bytes(&w, kv_pair_key(node->u.n.childkeys[i]), toku_brtnode_pivot_key_len(node, node->u.n.childkeys[i]));
}
//printf("%s:%d w.ndone=%d (childkeylen[%d]=%d\n", __FILE__, __LINE__, w.ndone, i, node->childkeylens[i]);
}
for (i=0; i<node->u.n.n_children; i++) {
wbuf_BLOCKNUM(&w, BNC_BLOCKNUM(node,i));
//printf("%s:%d w.ndone=%d\n", __FILE__, __LINE__, w.ndone);
}
}
{
int n_buffers = node->u.n.n_children;
u_int32_t check_local_fingerprint = 0;
for (i=0; i< n_buffers; i++) {
//printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i]));
wbuf_int(&w, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xids,
{
assert(type>=0 && type<256);
wbuf_char(&w, (unsigned char)type);
wbuf_xids(&w, xids);
wbuf_bytes(&w, key, keylen);
wbuf_bytes(&w, data, datalen);
check_local_fingerprint+=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xids, key, keylen, data, datalen);
});
}
//printf("%s:%d check_local_fingerprint=%8x\n", __FILE__, __LINE__, check_local_fingerprint);
if (check_local_fingerprint!=node->local_fingerprint) printf("%s:%d node=%" PRId64 " fingerprint expected=%08x actual=%08x\n", __FILE__, __LINE__, node->thisnodename.b, check_local_fingerprint, node->local_fingerprint);
assert(check_local_fingerprint==node->local_fingerprint);
}
} else {
//printf("%s:%d writing node %lld n_entries=%d\n", __FILE__, __LINE__, node->thisnodename, toku_gpma_n_entries(node->u.l.buffer));
wbuf_ulonglong(&w, node->u.l.leaf_stats.nkeys);
wbuf_ulonglong(&w, node->u.l.leaf_stats.ndata);
wbuf_ulonglong(&w, node->u.l.leaf_stats.dsize);
wbuf_uint(&w, toku_omt_size(node->u.l.buffer));
toku_omt_iterate(node->u.l.buffer, wbufwriteleafentry, &w);
}
assert(w.ndone<=w.size);
static void
serialize_nonleaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[], struct wbuf *wbuf) {
// serialize the nonleaf header
assert(node->u.n.n_children>0);
// Local fingerprint is not actually stored while in main memory. Must calculate it.
// Subtract the child fingerprints from the subtree fingerprint to get the local fingerprint.
{
u_int32_t subtree_fingerprint = node->local_fingerprint;
for (int i = 0; i < node->u.n.n_children; i++) {
subtree_fingerprint += BNC_SUBTREE_FINGERPRINT(node, i);
}
wbuf_uint(wbuf, subtree_fingerprint);
}
wbuf_int(wbuf, node->u.n.n_children);
for (int i = 0; i < node->u.n.n_children; i++) {
wbuf_uint(wbuf, BNC_SUBTREE_FINGERPRINT(node, i));
struct subtree_estimates *se = &(BNC_SUBTREE_ESTIMATES(node, i));
wbuf_ulonglong(wbuf, se->nkeys);
wbuf_ulonglong(wbuf, se->ndata);
wbuf_ulonglong(wbuf, se->dsize);
wbuf_char (wbuf, (char)se->exact);
}
//printf("%s:%d w.ndone=%d\n", __FILE__, __LINE__, w.ndone);
for (int i = 0; i < node->u.n.n_children-1; i++) {
if (node->flags & TOKU_DB_DUPSORT) {
wbuf_bytes(wbuf, kv_pair_key(node->u.n.childkeys[i]), kv_pair_keylen(node->u.n.childkeys[i]));
wbuf_bytes(wbuf, kv_pair_val(node->u.n.childkeys[i]), kv_pair_vallen(node->u.n.childkeys[i]));
} else {
wbuf_bytes(wbuf, kv_pair_key(node->u.n.childkeys[i]), toku_brtnode_pivot_key_len(node, node->u.n.childkeys[i]));
}
//printf("%s:%d w.ndone=%d (childkeylen[%d]=%d\n", __FILE__, __LINE__, w.ndone, i, node->childkeylens[i]);
}
for (int i = 0; i < node->u.n.n_children; i++) {
wbuf_BLOCKNUM(wbuf, BNC_BLOCKNUM(node,i));
//printf("%s:%d w.ndone=%d\n", __FILE__, __LINE__, w.ndone);
}
#if 0
// RFP
// map the child buffers
// RFP maybe move sub block boundaries
struct sub_block_map child_buffer_map[node->u.n.n_children];
size_t offset = wbuf_get_woffset(wbuf) + node->u.n.n_children * stored_sub_block_map_size;
for (int i = 0; i < node->u.n.n_children; i++) {
int idx = get_sub_block_index(n_sub_blocks, sub_block, offset);
size_t size = sizeof (u_int32_t) + BNC_NBYTESINBUF(node, i); // # elements + size of the elements
sub_block_map_init(&child_buffer_map[i], idx, offset, size);
offset += size;
}
// serialize the child buffer map
for (int i = 0; i < node->u.n.n_children ; i++)
sub_block_map_serialize(&child_buffer_map[i], wbuf);
#else
n_sub_blocks = n_sub_blocks;
sub_block = sub_block;
#endif
// serialize the child buffers
{
int n_buffers = node->u.n.n_children;
u_int32_t check_local_fingerprint = 0;
for (int i = 0; i < n_buffers; i++) {
//printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i]));
// assert(child_buffer_map[i].offset == wbuf_get_woffset(wbuf));
wbuf_int(wbuf, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xids,
{
assert(type>=0 && type<256);
wbuf_char(wbuf, (unsigned char)type);
wbuf_xids(wbuf, xids);
wbuf_bytes(wbuf, key, keylen);
wbuf_bytes(wbuf, data, datalen);
check_local_fingerprint+=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xids, key, keylen, data, datalen);
});
}
//printf("%s:%d check_local_fingerprint=%8x\n", __FILE__, __LINE__, check_local_fingerprint);
if (check_local_fingerprint!=node->local_fingerprint) printf("%s:%d node=%" PRId64 " fingerprint expected=%08x actual=%08x\n", __FILE__, __LINE__, node->thisnodename.b, check_local_fingerprint, node->local_fingerprint);
assert(check_local_fingerprint==node->local_fingerprint);
}
}
static int
wbufwriteleafentry (OMTVALUE lev, u_int32_t UU(idx), void *v) {
LEAFENTRY le=lev;
struct wbuf *thisw=v;
wbuf_LEAFENTRY(thisw, le);
return 0;
}
static void
serialize_leaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[], struct wbuf *wbuf) {
// serialize the leaf stats
wbuf_ulonglong(wbuf, node->u.l.leaf_stats.nkeys);
wbuf_ulonglong(wbuf, node->u.l.leaf_stats.ndata);
wbuf_ulonglong(wbuf, node->u.l.leaf_stats.dsize);
#if 0
// RFP partition the leaf elements. for now, 1 partition
const int npartitions = 1;
wbuf_nocrc_int(wbuf, npartitions);
struct sub_block_map part_map[npartitions];
for (int i = 0; i < npartitions; i++) {
size_t offset = wbuf_get_woffset(wbuf);
size_t size = sizeof (u_int32_t) + node->u.l.n_bytes_in_buffer; // # in partition + size of partition
int idx = get_sub_block_index(n_sub_blocks, sub_block, offset);
sub_block_map_init(&part_map[i], idx, offset, size);
}
// RFP serialize the partition pivots
for (int i = 0; i < npartitions-1; i++) {
assert(0);
}
// serialize the partition maps
for (int i = 0; i < npartitions; i++)
sub_block_map_serialize(&part_map[i], wbuf);
#else
n_sub_blocks = n_sub_blocks;
sub_block = sub_block;
#endif
// serialize the leaf entries
wbuf_uint(wbuf, toku_omt_size(node->u.l.buffer));
toku_omt_iterate(node->u.l.buffer, wbufwriteleafentry, wbuf);
}
static void
serialize_node(BRTNODE node, char *buf, size_t calculated_size, int n_sub_blocks, struct sub_block sub_block[]) {
struct wbuf wb;
wbuf_init(&wb, buf, calculated_size);
serialize_node_header(node, &wb);
if (node->height > 0)
serialize_nonleaf(node, n_sub_blocks, sub_block, &wb);
else
serialize_leaf(node, n_sub_blocks, sub_block, &wb);
assert(wb.ndone <= wb.size);
#ifdef CRC_ATEND
wbuf_int(&w, crc32(toku_null_crc, w.buf, w.ndone));
wbuf_int(&w, crc32(toku_null_crc, wb.buf, wb.ndone));
#endif
#ifdef CRC_INCR
{
u_int32_t checksum = x1764_finish(&w.checksum);
wbuf_uint(&w, checksum);
u_int32_t checksum = x1764_finish(&wb.checksum);
wbuf_uint(&wb, checksum);
}
#endif
if (calculated_size!=w.ndone)
printf("%s:%d w.done=%u calculated_size=%u\n", __FILE__, __LINE__, w.ndone, calculated_size);
assert(calculated_size==w.ndone);
// The uncompressed part of the block header is
// tokuleaf(8),
// version(4),
// lsn(8),
// n_sub_blocks(4), followed by n length pairs
// compressed_len(4)
// uncompressed_len(4)
// select the number of sub blocks and their sizes.
// impose an upper bound on the number of sub blocks.
struct sub_block_sizes sub_block_sizes[max_sub_blocks];
int n_sub_blocks = choose_sub_block_sizes(calculated_size-uncompressed_magic_len, max_sub_blocks, sub_block_sizes);
assert(calculated_size==wb.ndone);
}
int
toku_serialize_brtnode_to_memory (BRTNODE node, int n_workitems __attribute__((__unused__)), int n_threads __attribute__((__unused__)),
/*out*/ size_t *n_bytes_to_write,
/*out*/ char **bytes_to_write) {
// get the size of the serialized node
unsigned int calculated_size = toku_serialize_brtnode_size(node);
// choose sub block parameters
int n_sub_blocks, sub_block_size;
size_t data_size = calculated_size - uncompressed_magic_len;
choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &n_sub_blocks);
assert(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks);
if (0 && n_sub_blocks != 1) {
printf("%s:%d %d:", __FUNCTION__, __LINE__, n_sub_blocks);
for (i=0; i<n_sub_blocks; i++)
printf("%u ", sub_block_sizes[i].uncompressed_size);
printf("\n");
}
size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block_sizes);
size_t compression_header_len = get_compression_header_size(node->layout_version, n_sub_blocks);
assert(sub_block_size > 0);
// set the initial sub block size for all of the sub blocks
struct sub_block sub_block[n_sub_blocks];
for (int i = 0; i < n_sub_blocks; i++)
sub_block_init(&sub_block[i]);
set_all_sub_block_sizes(data_size, sub_block_size, n_sub_blocks, sub_block);
// alloocate space for the serialized node
char *MALLOC_N(calculated_size, buf);
//toku_verify_counts(node);
//assert(size>0);
//printf("%s:%d serializing %lld w height=%d p0=%p\n", __FILE__, __LINE__, off, node->height, node->mdicts[0]);
// serialize the node into buf
serialize_node(node, buf, calculated_size, n_sub_blocks, sub_block);
// allocate space for the compressed buf
size_t compressed_len = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
size_t compression_header_len = sub_block_header_size(n_sub_blocks);
char *MALLOC_N(compressed_len+uncompressed_magic_len+compression_header_len, compressed_buf);
// copy the header
memcpy(compressed_buf, buf, uncompressed_magic_len);
if (0) printf("First 4 bytes before compressing data are %02x%02x%02x%02x\n",
buf[uncompressed_magic_len], buf[uncompressed_magic_len+1],
buf[uncompressed_magic_len+2], buf[uncompressed_magic_len+3]);
// TBD compress all of the sub blocks
// compress all of the sub blocks
char *uncompressed_ptr = buf + uncompressed_magic_len;
char *compressed_base_ptr = compressed_buf + uncompressed_magic_len + compression_header_len;
char *compressed_ptr = compressed_base_ptr;
int T = num_cores; // T = min(num_cores, n_sub_blocks) - 1
if (T > n_sub_blocks)
T = n_sub_blocks;
if (T > 0)
T = T - 1; // threads in addition to the running thread
struct workset ws;
workset_init(&ws);
struct compress_work work[n_sub_blocks];
workset_lock(&ws);
for (i = 0; i < n_sub_blocks; i++) {
compress_work_init(&work[i], uncompressed_ptr, compressed_ptr, &sub_block_sizes[i]);
uncompressed_ptr += sub_block_sizes[i].uncompressed_size;
compressed_ptr += sub_block_sizes[i].compressed_size_bound;
workset_put_locked(&ws, &work[i].base);
}
workset_unlock(&ws);
// compress the sub-blocks
if (0) printf("%s:%d T=%d N=%d\n", __FUNCTION__, __LINE__, T, n_sub_blocks);
toku_pthread_t tids[T];
threadset_create(tids, &T, compress_worker, &ws);
compress_worker(&ws);
// wait for all of the work to complete
threadset_join(tids, T);
// squeeze out the holes not used by the compress bound
compressed_ptr = compressed_base_ptr + sub_block_sizes[0].compressed_size;
for (i = 1; i < n_sub_blocks; i++) {
memmove(compressed_ptr, work[i].to, sub_block_sizes[i].compressed_size);
compressed_ptr += sub_block_sizes[i].compressed_size;
}
compressed_len = compressed_ptr - compressed_base_ptr;
compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, uncompressed_ptr, compressed_ptr);
//if (0) printf("Block %" PRId64 " Size before compressing %u, after compression %"PRIu64"\n", blocknum.b, calculated_size-uncompressed_magic_len, (uint64_t) compressed_len);
// write out the compression header
// serialize the sub block header
uint32_t *compressed_header_ptr = (uint32_t *)(compressed_buf + uncompressed_magic_len);
*compressed_header_ptr++ = toku_htod32(n_sub_blocks);
for (i=0; i<n_sub_blocks; i++) {
compressed_header_ptr[0] = toku_htod32(sub_block_sizes[i].compressed_size);
compressed_header_ptr[1] = toku_htod32(sub_block_sizes[i].uncompressed_size);
for (int i=0; i<n_sub_blocks; i++) {
compressed_header_ptr[0] = toku_htod32(sub_block[i].compressed_size);
compressed_header_ptr[1] = toku_htod32(sub_block[i].uncompressed_size);
// RFP xsum
compressed_header_ptr += 2;
}
// RFP compute the header checksum and serialize it
*n_bytes_to_write = uncompressed_magic_len + compression_header_len + compressed_len;
*bytes_to_write = compressed_buf;
assert(w.ndone==calculated_size);
toku_free(buf);
return 0;
}
int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h, int n_workitems, int n_threads, BOOL for_checkpoint) {
int
toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h, int n_workitems, int n_threads, BOOL for_checkpoint) {
assert(node->desc == &h->descriptor);
size_t n_to_write;
char *compressed_buf;
{
int r = toku_serialize_brtnode_to_memory (node, n_workitems, n_threads,
&n_to_write, &compressed_buf);
int r = toku_serialize_brtnode_to_memory (node, n_workitems, n_threads, &n_to_write, &compressed_buf);
if (r!=0) return r;
}
......@@ -644,7 +753,6 @@ static void deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor
static int
deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *rb) {
int r;
int i;
if (memcmp(magic, "tokunode", 8)!=0) {
r = toku_db_badformat();
......@@ -659,7 +767,7 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu
MALLOC_N(result->u.n.n_children, result->u.n.childkeys);
//printf("n_children=%d\n", result->n_children);
assert(result->u.n.n_children>=0);
for (i=0; i<result->u.n.n_children; i++) {
for (int i=0; i<result->u.n.n_children; i++) {
u_int32_t childfp = rbuf_int(rb);
BNC_SUBTREE_FINGERPRINT(result, i)= childfp;
check_subtree_fingerprint += childfp;
......@@ -669,7 +777,7 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu
se->dsize = rbuf_ulonglong(rb);
se->exact = (BOOL) (rbuf_char(rb) != 0);
}
for (i=0; i<result->u.n.n_children-1; i++) {
for (int i=0; i<result->u.n.n_children-1; i++) {
if (result->flags & TOKU_DB_DUPSORT) {
bytevec keyptr, dataptr;
unsigned int keylen, datalen;
......@@ -685,14 +793,14 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu
//printf(" key %d length=%d data=%s\n", i, result->childkeylens[i], result->childkeys[i]);
result->u.n.totalchildkeylens+=toku_brtnode_pivot_key_len(result, result->u.n.childkeys[i]);
}
for (i=0; i<result->u.n.n_children; i++) {
for (int i=0; i<result->u.n.n_children; i++) {
BNC_BLOCKNUM(result,i) = rbuf_blocknum(rb);
BNC_HAVE_FULLHASH(result, i) = FALSE;
BNC_NBYTESINBUF(result,i) = 0;
//printf("Child %d at %lld\n", i, result->children[i]);
}
result->u.n.n_bytes_in_buffers = 0;
for (i=0; i<result->u.n.n_children; i++) {
for (int i=0; i<result->u.n.n_children; i++) {
r=toku_fifo_create(&BNC_BUFFER(result,i));
if (r!=0) {
int j;
......@@ -707,7 +815,7 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu
for (cnum=0; cnum<result->u.n.n_children; cnum++) {
int n_in_this_hash = rbuf_int(rb);
//printf("%d in hash\n", n_in_hash);
for (i=0; i<n_in_this_hash; i++) {
for (int i=0; i<n_in_this_hash; i++) {
int diff;
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
......@@ -749,7 +857,6 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu
static int
deserialize_brtnode_leaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *rb) {
int r;
int i;
if (memcmp(magic, "tokuleaf", 8)!=0) {
r = toku_db_badformat();
......@@ -770,7 +877,7 @@ deserialize_brtnode_leaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *
u_int32_t actual_sum = 0;
u_int32_t start_of_data = rb->ndone;
OMTVALUE *MALLOC_N(n_in_buf, array);
for (i=0; i<n_in_buf; i++) {
for (int i=0; i<n_in_buf; i++) {
LEAFENTRY le = (LEAFENTRY)(&rb->buf[rb->ndone]);
u_int32_t disksize = leafentry_disksize(le);
rb->ndone += disksize;
......@@ -907,34 +1014,39 @@ struct decompress_work {
void *uncompress_ptr;
u_int32_t compress_size;
u_int32_t uncompress_size;
u_int32_t xsum;
int error;
};
// initialize the decompression work
static void
decompress_work_init(struct decompress_work *dw,
void *compress_ptr, u_int32_t compress_size,
void *uncompress_ptr, u_int32_t uncompress_size) {
void *uncompress_ptr, u_int32_t uncompress_size,
u_int32_t xsum) {
dw->compress_ptr = compress_ptr;
dw->compress_size = compress_size;
dw->uncompress_ptr = uncompress_ptr;
dw->uncompress_size = uncompress_size;
dw->xsum = xsum;
dw->error = 0;
}
// decompress one block
static void
decompress_block(struct decompress_work *dw) {
if (0) {
union {
toku_pthread_t p;
int i;
} u;
u.p = toku_pthread_self();
printf("%s:%d %x %p\n", __FUNCTION__, __LINE__, u.i, dw);
}
uLongf destlen = dw->uncompress_size;
int r = uncompress(dw->uncompress_ptr, &destlen, dw->compress_ptr, dw->compress_size);
assert(destlen == dw->uncompress_size);
static int
decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum) {
// verify checksum
// u_int32_t xsum = x1764_memory(compress_ptr, compress_size);
// assert(xsum == expected_sum);
expected_xsum = expected_xsum;
// decompress
uLongf destlen = uncompress_size;
int r = uncompress(uncompress_ptr, &destlen, compress_ptr, compress_size);
assert(destlen == uncompress_size);
assert(r==Z_OK);
return 0;
}
// decompress blocks until there is no more work to do
......@@ -945,16 +1057,56 @@ decompress_worker(void *arg) {
struct decompress_work *dw = (struct decompress_work *) workset_get(ws);
if (dw == NULL)
break;
decompress_block(dw);
dw->error = decompress_sub_block(dw->compress_ptr, dw->compress_size, dw->uncompress_ptr, dw->uncompress_size, dw->xsum);
}
return arg;
}
static void
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data) {
if (n_sub_blocks == 1) {
decompress_sub_block(compressed_data, sub_block[0].compressed_size, uncompressed_data, sub_block[0].uncompressed_size, sub_block[0].xsum);
} else {
// compute the number of additional threads needed for decompressing this node
int T = num_cores; // T = min(#cores, #blocks) - 1
if (T > n_sub_blocks)
T = n_sub_blocks;
if (T > 0)
T = T - 1; // threads in addition to the running thread
// init the decompression work set
struct workset ws;
workset_init(&ws);
// initialize the decompression work and add to the work set
struct decompress_work decompress_work[n_sub_blocks];
workset_lock(&ws);
for (int i = 0; i < n_sub_blocks; i++) {
decompress_work_init(&decompress_work[i], compressed_data, sub_block[i].compressed_size, uncompressed_data, sub_block[i].uncompressed_size, sub_block[i].xsum);
workset_put_locked(&ws, &decompress_work[i].base);
uncompressed_data += sub_block[i].uncompressed_size;
compressed_data += sub_block[i].compressed_size;
}
workset_unlock(&ws);
// decompress the sub-blocks
if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T);
toku_pthread_t tids[T];
threadset_create(tids, &T, decompress_worker, &ws);
decompress_worker(&ws);
// cleanup
threadset_join(tids, T);
workset_destroy(&ws);
}
}
static int
decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb, BLOCKNUM blocknum) {
toku_trace("decompress");
int r;
int i;
// get the number of compressed sub blocks
int n_sub_blocks;
int compression_header_offset;
......@@ -967,21 +1119,21 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb
// verify the sizes of the compressed sub blocks
if (0 && n_sub_blocks != 1) printf("%s:%d %d\n", __FUNCTION__, __LINE__, n_sub_blocks);
struct sub_block_sizes sub_block_sizes[n_sub_blocks];
for (i=0; i<n_sub_blocks; i++) {
struct sub_block sub_block[n_sub_blocks];
for (int i=0; i<n_sub_blocks; i++) {
u_int32_t compressed_size = toku_dtoh32(*(u_int32_t*)(&raw_block[compression_header_offset+8*i]));
if (compressed_size<=0 || compressed_size>(1<<30)) { r = toku_db_badformat(); return r; }
u_int32_t uncompressed_size = toku_dtoh32(*(u_int32_t*)(&raw_block[compression_header_offset+8*i+4]));
if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = toku_db_badformat(); return r; }
sub_block_sizes[i].compressed_size = compressed_size;
sub_block_sizes[i].uncompressed_size = uncompressed_size;
sub_block[i].compressed_size = compressed_size;
sub_block[i].uncompressed_size = uncompressed_size;
}
unsigned char *compressed_data = raw_block + uncompressed_magic_len + get_compression_header_size(BRT_LAYOUT_VERSION, n_sub_blocks);
unsigned char *compressed_data = raw_block + uncompressed_magic_len + sub_block_header_size(n_sub_blocks);
size_t uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block_sizes);
size_t uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block);
rb->size= uncompressed_magic_len + uncompressed_size;
assert(rb->size>0);
......@@ -990,38 +1142,8 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb
// construct the uncompressed block from the header and compressed sub blocks
memcpy(rb->buf, raw_block, uncompressed_magic_len);
// init the decompression work set
struct workset ws;
workset_init(&ws);
// compute the number of additional threads needed for decompressing this node
int T = n_sub_blocks; // T = min(#cores, #blocks) - 1
if (T > num_cores)
T = num_cores;
if (T > 0)
T = T - 1; // threads in addition to the running thread
if (0) printf("%s:%d Cores=%d Blocks=%d T=%d\n", __FUNCTION__, __LINE__, num_cores, n_sub_blocks, T);
toku_pthread_t tids[T];
// initialize the decompression work and add to the work set
unsigned char *uncompressed_data = rb->buf+uncompressed_magic_len;
struct decompress_work decompress_work[n_sub_blocks];
workset_lock(&ws);
for (i=0; i<n_sub_blocks; i++) {
decompress_work_init(&decompress_work[i], compressed_data, sub_block_sizes[i].compressed_size, uncompressed_data, sub_block_sizes[i].uncompressed_size);
uncompressed_data += sub_block_sizes[i].uncompressed_size;
compressed_data += sub_block_sizes[i].compressed_size;
workset_put_locked(&ws, &decompress_work[i].base);
}
workset_unlock(&ws);
// do the decompression work
threadset_create(tids, &T, decompress_worker, &ws);
decompress_worker(&ws);
// cleanup
threadset_join(tids, T);
workset_destroy(&ws);
decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data);
toku_trace("decompress done");
......@@ -1204,8 +1326,7 @@ void toku_verify_counts (BRTNODE node) {
assert(fps==node->local_fingerprint);
} else {
unsigned int sum = 0;
int i;
for (i=0; i<node->u.n.n_children; i++)
for (int i=0; i<node->u.n.n_children; i++)
sum += BNC_NBYTESINBUF(node,i);
// We don't rally care of the later buffers have garbage in them. Valgrind would do a better job noticing if we leave it uninitialized.
// But for now the code always initializes the later tables so they are 0.
......@@ -1697,7 +1818,8 @@ toku_deserialize_brtheader_from (int fd, struct brt_header **brth) {
return r;
}
unsigned int toku_brt_pivot_key_len (BRT brt, struct kv_pair *pk) {
unsigned int
toku_brt_pivot_key_len (BRT brt, struct kv_pair *pk) {
if (brt->flags & TOKU_DB_DUPSORT) {
return kv_pair_keylen(pk) + kv_pair_vallen(pk);
} else {
......@@ -1705,7 +1827,8 @@ unsigned int toku_brt_pivot_key_len (BRT brt, struct kv_pair *pk) {
}
}
unsigned int toku_brtnode_pivot_key_len (BRTNODE node, struct kv_pair *pk) {
unsigned int
toku_brtnode_pivot_key_len (BRTNODE node, struct kv_pair *pk) {
if (node->flags & TOKU_DB_DUPSORT) {
return kv_pair_keylen(pk) + kv_pair_vallen(pk);
} else {
......@@ -1713,7 +1836,8 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE node, struct kv_pair *pk) {
}
}
int toku_db_badformat(void) {
int
toku_db_badformat(void) {
return DB_BADFORMAT;
}
......
......@@ -196,6 +196,92 @@ dump_fragmentation(int f, struct brt_header *h) {
printf("fragmentation: %.1f%%\n", 100. * ((double)fragsizes / (double)(total_space)));
}
static u_int32_t
get_unaligned_uint32(unsigned char *p) {
return *(u_int32_t *)p;
}
#define SUB_BLOCK_XSUM 0
struct sub_block {
u_int32_t compressed_size;
u_int32_t uncompressed_size;
#if SUB_BLOCK_XSUM
u_int32_t xsum;
#endif
};
static void
sub_block_deserialize(struct sub_block *sb, unsigned char *sub_block_header) {
sb->compressed_size = toku_dtoh32(get_unaligned_uint32(sub_block_header+0));
sb->uncompressed_size = toku_dtoh32(get_unaligned_uint32(sub_block_header+4));
#if SUB_BLOCK_XSUM
sb->xsum = toku_dtoh32(get_unaligned_uint32(sub_block_header+8));
#endif
}
static void
verify_block(unsigned char *cp, u_int64_t size) {
// verify the header checksum
const size_t node_header = 8 + sizeof (u_int32_t) + sizeof (u_int32_t);
unsigned char *sub_block_header = &cp[node_header];
u_int32_t n_sub_blocks = toku_dtoh32(get_unaligned_uint32(&sub_block_header[0]));
u_int32_t header_length = node_header + n_sub_blocks * sizeof (struct sub_block);
#if SUB_BLOCK_XSUM
header_length += sizeof (u_int32_t); // CRC
#endif
if (header_length > size) {
printf("header length too big: %u\n", header_length);
return;
}
#if SUB_BLOCK_XSUM
u_int32_t header_xsum = x1764_memory(cp, header_length);
u_int32_t expected_xsum = toku_dtoh32(get_unaligned_uint32(&cp[header_length]));
if (header_xsum != expected_xsum) {
printf("header checksum failed: %u %u\n", header_xsum, expected_xsum);
return;
}
#endif
// deserialize the sub block header
struct sub_block sub_block[n_sub_blocks];
sub_block_header += sizeof (u_int32_t);
for (u_int32_t i = 0 ; i < n_sub_blocks; i++) {
sub_block_deserialize(&sub_block[i], sub_block_header);
sub_block_header += sizeof (struct sub_block);
}
// verify the sub block header
u_int32_t offset = header_length + 4;
for (u_int32_t i = 0 ; i < n_sub_blocks; i++) {
#if SUB_BLOCK_XSUM
u_int32_t xsum = x1764_memory(cp + offset, sub_block[i].compressed_size);
printf("%u: %u %u %u", i, sub_block[i].compressed_size, sub_block[i].uncompressed_size, sub_block[i].xsum);
if (xsum != sub_block[i].xsum)
printf(" fail %u", xsum);
#else
printf("%u: %u %u", i, sub_block[i].compressed_size, sub_block[i].uncompressed_size);
#endif
printf("\n");
offset += sub_block[i].compressed_size;
}
if (offset != size)
printf("offset %u expected %"PRIu64"\n", offset, size);
}
static void
dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) {
DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size);
printf("%"PRIu64" at %"PRIu64" size %"PRIu64"\n", blocknum.b, offset, size);
unsigned char *vp = toku_malloc(size);
u_int64_t r = pread(f, vp, size, offset);
if (r == (u_int64_t)size)
verify_block(vp, size);
toku_free(vp);
}
static void
hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
u_int64_t i;
......@@ -327,6 +413,9 @@ main (int argc, const char *const argv[]) {
} else if (strcmp(fields[0], "header") == 0) {
toku_brtheader_free(h);
dump_header(f, &h, cf);
} else if (strcmp(fields[0], "block") == 0 && nfields == 2) {
BLOCKNUM blocknum = make_blocknum(getuint64(fields[1]));
dump_block(f, blocknum, h);
} else if (strcmp(fields[0], "node") == 0 && nfields == 2) {
BLOCKNUM off = make_blocknum(getuint64(fields[1]));
dump_node(f, off, h);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment