Commit 15b37bc4 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge -r 18992:head ptq to main refs[t:2351]

git-svn-id: file:///svn/toku/tokudb@19099 c7de825b-a66e-492c-adef-691d508d4ae1
parent ade9c07e
...@@ -502,6 +502,7 @@ static int print_usage (const char *argv0) { ...@@ -502,6 +502,7 @@ static int print_usage (const char *argv0) {
fprintf(stderr, " --numdbs N Insert same items into N dbs (1 to %d)\n", MAX_DBS); fprintf(stderr, " --numdbs N Insert same items into N dbs (1 to %d)\n", MAX_DBS);
fprintf(stderr, " --insertmultiple Use DB_ENV->put_multiple api. Requires transactions.\n"); fprintf(stderr, " --insertmultiple Use DB_ENV->put_multiple api. Requires transactions.\n");
fprintf(stderr, " --redzone N redzone in percent\n"); fprintf(stderr, " --redzone N redzone in percent\n");
fprintf(stderr, " --srandom N srandom(N)\n");
fprintf(stderr, " n_iterations how many iterations (default %lld)\n", default_n_items/DEFAULT_ITEMS_TO_INSERT_PER_ITERATION); fprintf(stderr, " n_iterations how many iterations (default %lld)\n", default_n_items/DEFAULT_ITEMS_TO_INSERT_PER_ITERATION);
return 1; return 1;
......
...@@ -44,8 +44,9 @@ static int print_usage (const char *argv0) { ...@@ -44,8 +44,9 @@ static int print_usage (const char *argv0) {
fprintf(stderr, " --mysql compare keys that are mysql big int not null types\n"); fprintf(stderr, " --mysql compare keys that are mysql big int not null types\n");
fprintf(stderr, " --env DIR put db files in DIR instead of default\n"); fprintf(stderr, " --env DIR put db files in DIR instead of default\n");
fprintf(stderr, " --log_dir LOGDIR put the logs in LOGDIR\n"); fprintf(stderr, " --log_dir LOGDIR put the logs in LOGDIR\n");
fprintf(stderr, " --range <low> <high> set the low and high key boundaries in which random range queries are made\n"); fprintf(stderr, " --range LOW HIGH set the LOW and HIGH key boundaries in which random range queries are made\n");
fprintf(stderr, " --experiments <n> run n experiments (default:%d)\n", n_experiments); fprintf(stderr, " --experiments N run N experiments (default:%d)\n", n_experiments);
fprintf(stderr, " --srandom N srandom(N)\n");
fprintf(stderr, " --recover run recovery\n"); fprintf(stderr, " --recover run recovery\n");
fprintf(stderr, " --verbose print verbose information\n"); fprintf(stderr, " --verbose print verbose information\n");
return 1; return 1;
...@@ -128,6 +129,9 @@ static void parse_args (int argc, const char *const argv[]) { ...@@ -128,6 +129,9 @@ static void parse_args (int argc, const char *const argv[]) {
} else if (strcmp(*argv, "--experiments") == 0 && argc > 1) { } else if (strcmp(*argv, "--experiments") == 0 && argc > 1) {
argc--; argv++; argc--; argv++;
n_experiments = strtol(*argv, NULL, 10); n_experiments = strtol(*argv, NULL, 10);
} else if (strcmp(*argv, "--srandom") == 0 && argc > 1) {
argc--; argv++;
srandom(atoi(*argv));
} else if (strcmp(*argv, "--recover") == 0) { } else if (strcmp(*argv, "--recover") == 0) {
env_open_flags_yesx |= DB_RECOVER; env_open_flags_yesx |= DB_RECOVER;
env_open_flags_nox |= DB_RECOVER; env_open_flags_nox |= DB_RECOVER;
...@@ -309,8 +313,7 @@ static void scanscan_lwc (void) { ...@@ -309,8 +313,7 @@ static void scanscan_lwc (void) {
static void scanscan_flatten (void) { static void scanscan_flatten (void) {
int r; int r;
int counter=0; for (int counter=0; counter<n_experiments; counter++) {
for (counter=0; counter<n_experiments; counter++) {
double prevtime = gettime(); double prevtime = gettime();
r = db->flatten(db, tid); r = db->flatten(db, tid);
assert(r==0); assert(r==0);
...@@ -325,20 +328,21 @@ static void scanscan_range (void) { ...@@ -325,20 +328,21 @@ static void scanscan_range (void) {
int r; int r;
double texperiments[n_experiments]; double texperiments[n_experiments];
u_int64_t k = 0;
char kv[8];
DBT key, val;
int counter; for (int counter = 0; counter < n_experiments; counter++) {
for (counter=0; counter<n_experiments; counter++) {
if (1) { //if ((counter&1) == 0) {
makekey: makekey:
;
// generate a random key in the key range // generate a random key in the key range
u_int64_t k = (start_range + (random() % (end_range - start_range))) * (1<<6); k = (start_range + (random() % (end_range - start_range))) * (1<<6);
char kv[8]; for (int i = 0; i < 8; i++)
int i;
for (i=0; i<8; i++)
kv[i] = k >> (56-8*i); kv[i] = k >> (56-8*i);
DBT key; memset(&key, 0, sizeof key); key.data = &kv, key.size = sizeof kv; }
DBT val; memset(&val, 0, sizeof val); memset(&key, 0, sizeof key); key.data = &kv, key.size = sizeof kv;
memset(&val, 0, sizeof val);
double tstart = gettime(); double tstart = gettime();
...@@ -349,7 +353,7 @@ static void scanscan_range (void) { ...@@ -349,7 +353,7 @@ static void scanscan_range (void) {
r = dbc->c_get(dbc, &key, &val, DB_SET_RANGE+lock_flag); r = dbc->c_get(dbc, &key, &val, DB_SET_RANGE+lock_flag);
if (r != 0) { if (r != 0) {
assert(r == DB_NOTFOUND); assert(r == DB_NOTFOUND);
printf("%s:%d\n", __FUNCTION__, __LINE__); printf("%s:%d %"PRIu64"\n", __FUNCTION__, __LINE__, k);
goto makekey; goto makekey;
} }
...@@ -369,12 +373,12 @@ static void scanscan_range (void) { ...@@ -369,12 +373,12 @@ static void scanscan_range (void) {
assert(r==0); assert(r==0);
texperiments[counter] = gettime() - tstart; texperiments[counter] = gettime() - tstart;
printf("%f\n", texperiments[counter]); fflush(stdout); printf("%"PRIu64" %f\n", k, texperiments[counter]); fflush(stdout);
} }
// print the times // print the times
double tsum = 0.0, tmin = 0.0, tmax = 0.0; double tsum = 0.0, tmin = 0.0, tmax = 0.0;
for (counter=0; counter<n_experiments; counter++) { for (int counter = 0; counter < n_experiments; counter++) {
if (counter==0 || texperiments[counter] < tmin) if (counter==0 || texperiments[counter] < tmin)
tmin = texperiments[counter]; tmin = texperiments[counter];
if (counter==0 || texperiments[counter] > tmax) if (counter==0 || texperiments[counter] > tmax)
......
...@@ -180,6 +180,9 @@ enum { ...@@ -180,6 +180,9 @@ enum {
4), // localfingerprint 4), // localfingerprint
}; };
#include "sub_block.h"
#include "sub_block_map.h"
static int static int
addupsize (OMTVALUE lev, u_int32_t UU(idx), void *vp) { addupsize (OMTVALUE lev, u_int32_t UU(idx), void *vp) {
LEAFENTRY le=lev; LEAFENTRY le=lev;
...@@ -215,6 +218,7 @@ toku_serialize_brtnode_size_slow (BRTNODE node) { ...@@ -215,6 +218,7 @@ toku_serialize_brtnode_size_slow (BRTNODE node) {
} }
assert(hsize==node->u.n.n_bytes_in_buffers); assert(hsize==node->u.n.n_bytes_in_buffers);
assert(csize==node->u.n.totalchildkeylens); assert(csize==node->u.n.totalchildkeylens);
size += node->u.n.n_children*stored_sub_block_map_size;
return size+hsize+csize; return size+hsize+csize;
} else { } else {
unsigned int hsize=0; unsigned int hsize=0;
...@@ -243,6 +247,7 @@ toku_serialize_brtnode_size (BRTNODE node) { ...@@ -243,6 +247,7 @@ toku_serialize_brtnode_size (BRTNODE node) {
result+=node->u.n.totalchildkeylens; /* the lengths of the pivot keys, without their key lengths. */ result+=node->u.n.totalchildkeylens; /* the lengths of the pivot keys, without their key lengths. */
result+=(8+4+4+1+3*8)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, the subtree fingerprint, and 3*8 for the subtree estimates and one for the exact bit. */ result+=(8+4+4+1+3*8)*(node->u.n.n_children); /* For each child, a child offset, a count for the number of hash table entries, the subtree fingerprint, and 3*8 for the subtree estimates and one for the exact bit. */
result+=node->u.n.n_bytes_in_buffers; result+=node->u.n.n_bytes_in_buffers;
result += node->u.n.n_children*stored_sub_block_map_size;
} else { } else {
result+=4; /* n_entries in buffer table. */ result+=4; /* n_entries in buffer table. */
result+=3*8; /* the three leaf stats. */ result+=3*8; /* the three leaf stats. */
...@@ -262,8 +267,6 @@ enum { ...@@ -262,8 +267,6 @@ enum {
uncompressed_version_offset = 8, uncompressed_version_offset = 8,
}; };
#include "sub_block.h"
static void static void
serialize_node_header(BRTNODE node, struct wbuf *wbuf) { serialize_node_header(BRTNODE node, struct wbuf *wbuf) {
if (node->height == 0) if (node->height == 0)
...@@ -326,14 +329,12 @@ serialize_nonleaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[], ...@@ -326,14 +329,12 @@ serialize_nonleaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[],
//printf("%s:%d w.ndone=%d\n", __FILE__, __LINE__, w.ndone); //printf("%s:%d w.ndone=%d\n", __FILE__, __LINE__, w.ndone);
} }
#if 0
// RFP
// map the child buffers // map the child buffers
// RFP maybe move sub block boundaries
struct sub_block_map child_buffer_map[node->u.n.n_children]; struct sub_block_map child_buffer_map[node->u.n.n_children];
size_t offset = wbuf_get_woffset(wbuf) + node->u.n.n_children * stored_sub_block_map_size; size_t offset = wbuf_get_woffset(wbuf) - node_header_overhead + node->u.n.n_children * stored_sub_block_map_size;
for (int i = 0; i < node->u.n.n_children; i++) { for (int i = 0; i < node->u.n.n_children; i++) {
int idx = get_sub_block_index(n_sub_blocks, sub_block, offset); int idx = get_sub_block_index(n_sub_blocks, sub_block, offset);
assert(idx >= 0);
size_t size = sizeof (u_int32_t) + BNC_NBYTESINBUF(node, i); // # elements + size of the elements size_t size = sizeof (u_int32_t) + BNC_NBYTESINBUF(node, i); // # elements + size of the elements
sub_block_map_init(&child_buffer_map[i], idx, offset, size); sub_block_map_init(&child_buffer_map[i], idx, offset, size);
offset += size; offset += size;
...@@ -342,10 +343,6 @@ serialize_nonleaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[], ...@@ -342,10 +343,6 @@ serialize_nonleaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[],
// serialize the child buffer map // serialize the child buffer map
for (int i = 0; i < node->u.n.n_children ; i++) for (int i = 0; i < node->u.n.n_children ; i++)
sub_block_map_serialize(&child_buffer_map[i], wbuf); sub_block_map_serialize(&child_buffer_map[i], wbuf);
#else
n_sub_blocks = n_sub_blocks;
sub_block = sub_block;
#endif
// serialize the child buffers // serialize the child buffers
{ {
...@@ -404,7 +401,7 @@ serialize_leaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[], str ...@@ -404,7 +401,7 @@ serialize_leaf(BRTNODE node, int n_sub_blocks, struct sub_block sub_block[], str
assert(0); assert(0);
} }
// serialize the partition maps // RFP serialize the partition maps
for (int i = 0; i < npartitions; i++) for (int i = 0; i < npartitions; i++)
sub_block_map_serialize(&part_map[i], wbuf); sub_block_map_serialize(&part_map[i], wbuf);
#else #else
...@@ -440,7 +437,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_th ...@@ -440,7 +437,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, int UU(n_workitems), int UU(n_th
unsigned int calculated_size = toku_serialize_brtnode_size(node); unsigned int calculated_size = toku_serialize_brtnode_size(node);
// choose sub block parameters // choose sub block parameters
int n_sub_blocks, sub_block_size; int n_sub_blocks = 0, sub_block_size = 0;
size_t data_size = calculated_size - node_header_overhead; size_t data_size = calculated_size - node_header_overhead;
choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &n_sub_blocks); choose_sub_block_size(data_size, max_sub_blocks, &sub_block_size, &n_sub_blocks);
assert(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks); assert(0 < n_sub_blocks && n_sub_blocks <= max_sub_blocks);
...@@ -541,6 +538,110 @@ toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_h ...@@ -541,6 +538,110 @@ toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_h
static void deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor *desc, BOOL temporary); static void deserialize_descriptor_from_rbuf(struct rbuf *rb, struct descriptor *desc, BOOL temporary);
#include "workset.h"
struct deserialize_child_buffer_work {
struct work base;
BRTNODE node; // in node pointer
int cnum; // in child number
struct rbuf rb; // in child rbuf
uint32_t local_fingerprint; // out node fingerprint
};
static void
deserialize_child_buffer_init(struct deserialize_child_buffer_work *dw, BRTNODE node, int cnum, unsigned char *buf, size_t size) {
dw->node = node;
dw->cnum = cnum;
rbuf_init(&dw->rb, buf, size);
}
static void
deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf, u_int32_t *local_fingerprint_ret) {
uint32_t local_fingerprint = 0;
int n_bytes_in_buffer = 0;
int n_in_this_buffer = rbuf_int(rbuf);
for (int i = 0; i < n_in_this_buffer; i++) {
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
//toku_verify_counts(result);
int type = rbuf_char(rbuf);
XIDS xids;
xids_create_from_buffer(rbuf, &xids);
rbuf_bytes(rbuf, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rbuf, &val, &vallen);
local_fingerprint += node->rand4fingerprint * toku_calc_fingerprint_cmd(type, xids, key, keylen, val, vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val);
int r = toku_fifo_enq(BNC_BUFFER(node, cnum), key, keylen, val, vallen, type, xids); /* Copies the data into the fifo */
assert(r == 0);
n_bytes_in_buffer += keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
//printf("Inserted\n");
xids_destroy(&xids);
}
assert(rbuf->ndone == rbuf->size);
BNC_NBYTESINBUF(node, cnum) = n_bytes_in_buffer;
*local_fingerprint_ret = local_fingerprint;
}
static void *
deserialize_child_buffer_worker(void *arg) {
struct workset *ws = (struct workset *) arg;
while (1) {
struct deserialize_child_buffer_work *dw = (struct deserialize_child_buffer_work *) workset_get(ws);
if (dw == NULL)
break;
deserialize_child_buffer(dw->node, dw->cnum, &dw->rb, &dw->local_fingerprint);
}
return arg;
}
static void
deserialize_all_child_buffers(BRTNODE result, struct rbuf *rbuf, struct sub_block_map child_buffer_map[], int my_num_cores, uint32_t *check_local_fingerprint_ret) {
int n_nonempty_fifos = 0; // how many fifos are nonempty?
for(int i = 0; i < result->u.n.n_children; i++) {
if (child_buffer_map[i].size > 4)
n_nonempty_fifos++;
}
int T = my_num_cores; // T = min(num_cores, n_nonempty_fifos) - 1
if (T > n_nonempty_fifos)
T = n_nonempty_fifos;
if (T > 0)
T = T - 1; // threads in addition to the running thread
struct workset ws;
workset_init(&ws);
struct deserialize_child_buffer_work work[result->u.n.n_children];
workset_lock(&ws);
for (int i = 0; i < result->u.n.n_children; i++) {
deserialize_child_buffer_init(&work[i], result, i, rbuf->buf + node_header_overhead + child_buffer_map[i].offset, child_buffer_map[i].size);
workset_put_locked(&ws, &work[i].base);
}
workset_unlock(&ws);
// deserialize the fifos
if (0) printf("%s:%d T=%d N=%d %d\n", __FUNCTION__, __LINE__, T, result->u.n.n_children, n_nonempty_fifos);
toku_pthread_t tids[T];
threadset_create(tids, &T, deserialize_child_buffer_worker, &ws);
deserialize_child_buffer_worker(&ws);
threadset_join(tids, T);
// combine the fingerprints and update the buffer counts
uint32_t check_local_fingerprint = 0;
for (int i = 0; i < result->u.n.n_children; i++) {
check_local_fingerprint += work[i].local_fingerprint;
result->u.n.n_bytes_in_buffers += BNC_NBYTESINBUF(result, i);
}
// cleanup
workset_destroy(&ws);
*check_local_fingerprint_ret = check_local_fingerprint;
}
static int static int
deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *rb) { deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbuf *rb) {
int r; int r;
...@@ -590,45 +691,26 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu ...@@ -590,45 +691,26 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu
BNC_NBYTESINBUF(result,i) = 0; BNC_NBYTESINBUF(result,i) = 0;
//printf("Child %d at %lld\n", i, result->children[i]); //printf("Child %d at %lld\n", i, result->children[i]);
} }
// deserialize the child buffer map
struct sub_block_map child_buffer_map[result->u.n.n_children];
for (int i = 0; i < result->u.n.n_children; i++)
sub_block_map_deserialize(&child_buffer_map[i], rb);
// init the child buffers
result->u.n.n_bytes_in_buffers = 0; result->u.n.n_bytes_in_buffers = 0;
for (int i=0; i<result->u.n.n_children; i++) { for (int i=0; i<result->u.n.n_children; i++) {
r=toku_fifo_create(&BNC_BUFFER(result,i)); r=toku_fifo_create(&BNC_BUFFER(result,i));
if (r!=0) { if (r!=0) {
int j; for (int j=0; j<i; j++) toku_fifo_free(&BNC_BUFFER(result,j));
if (0) { died_1: j=result->u.n.n_bytes_in_buffers; }
for (j=0; j<i; j++) toku_fifo_free(&BNC_BUFFER(result,j));
return toku_db_badformat(); return toku_db_badformat();
} }
} }
{
int cnum; // deserialize all child buffers, like the function says
u_int32_t check_local_fingerprint = 0; uint32_t check_local_fingerprint;
for (cnum=0; cnum<result->u.n.n_children; cnum++) { deserialize_all_child_buffers(result, rb, child_buffer_map, num_cores, &check_local_fingerprint);
int n_in_this_hash = rbuf_int(rb);
//printf("%d in hash\n", n_in_hash);
for (int i=0; i<n_in_this_hash; i++) {
int diff;
bytevec key; ITEMLEN keylen;
bytevec val; ITEMLEN vallen;
//toku_verify_counts(result);
int type = rbuf_char(rb);
XIDS xids;
xids_create_from_buffer(rb, &xids);
rbuf_bytes(rb, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(rb, &val, &vallen);
check_local_fingerprint += result->rand4fingerprint * toku_calc_fingerprint_cmd(type, xids, key, keylen, val, vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val);
{
r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type, xids); /* Copies the data into the hash table. */
if (r!=0) { goto died_1; }
}
diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
result->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(result,cnum) += diff;
//printf("Inserted\n");
xids_destroy(&xids);
}
}
if (check_local_fingerprint != result->local_fingerprint) { if (check_local_fingerprint != result->local_fingerprint) {
fprintf(stderr, "%s:%d local fingerprint is wrong (found %8x calcualted %8x\n", __FILE__, __LINE__, result->local_fingerprint, check_local_fingerprint); fprintf(stderr, "%s:%d local fingerprint is wrong (found %8x calcualted %8x\n", __FILE__, __LINE__, result->local_fingerprint, check_local_fingerprint);
return toku_db_badformat(); return toku_db_badformat();
...@@ -637,11 +719,7 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu ...@@ -637,11 +719,7 @@ deserialize_brtnode_nonleaf_from_rbuf (BRTNODE result, bytevec magic, struct rbu
fprintf(stderr, "%s:%d subtree fingerprint is wrong\n", __FILE__, __LINE__); fprintf(stderr, "%s:%d subtree fingerprint is wrong\n", __FILE__, __LINE__);
return toku_db_badformat(); return toku_db_badformat();
} }
}
// RFP REMOVE (void)rbuf_int(rb); //Ignore the crc (already verified).
if (rb->ndone != rb->size) { //Verify we read exactly the entire block.
r = toku_db_badformat(); goto died_1;
}
return 0; return 0;
} }
...@@ -825,7 +903,8 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb ...@@ -825,7 +903,8 @@ decompress_brtnode_from_raw_block_into_rbuf(u_int8_t *raw_block, struct rbuf *rb
unsigned char *uncompressed_data = rb->buf + node_header_overhead; unsigned char *uncompressed_data = rb->buf + node_header_overhead;
// decompress all the compressed sub blocks into the uncompressed buffer // decompress all the compressed sub blocks into the uncompressed buffer
decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores); r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, num_cores);
assert(r == 0);
toku_trace("decompress done"); toku_trace("decompress done");
......
...@@ -104,7 +104,10 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) { ...@@ -104,7 +104,10 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
printf(" children:\n"); printf(" children:\n");
for (i=0; i<n->u.n.n_children; i++) { for (i=0; i<n->u.n.n_children; i++) {
printf(" child %d: %" PRId64 "\n", i, BNC_BLOCKNUM(n, i).b); printf(" child %d: %" PRId64 "\n", i, BNC_BLOCKNUM(n, i).b);
printf(" buffer contains %u bytes (%d items)\n", BNC_NBYTESINBUF(n, i), toku_fifo_n_entries(BNC_BUFFER(n,i))); unsigned int n_bytes = BNC_NBYTESINBUF(n, i);
int n_entries = toku_fifo_n_entries(BNC_BUFFER(n, i));
if (n_bytes > 0 || n_entries > 0)
printf(" buffer contains %u bytes (%d items)\n", n_bytes, n_entries);
if (dump_data) { if (dump_data) {
FIFO_ITERATE(BNC_BUFFER(n,i), key, keylen, data, datalen, typ, xids, FIFO_ITERATE(BNC_BUFFER(n,i), key, keylen, data, datalen, typ, xids,
{ {
......
...@@ -24,6 +24,10 @@ static inline void rbuf_init(struct rbuf *r, unsigned char *buf, unsigned int si ...@@ -24,6 +24,10 @@ static inline void rbuf_init(struct rbuf *r, unsigned char *buf, unsigned int si
r->ndone = 0; r->ndone = 0;
} }
static inline unsigned int rbuf_get_roffset(struct rbuf *r) {
return r->ndone;
}
static inline unsigned int rbuf_char (struct rbuf *r) { static inline unsigned int rbuf_char (struct rbuf *r) {
assert(r->ndone<r->size); assert(r->ndone<r->size);
return r->buf[r->ndone++]; return r->buf[r->ndone++];
......
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <errno.h>
#include <zlib.h> #include <zlib.h>
#include "toku_portability.h" #include "toku_portability.h"
...@@ -53,13 +54,18 @@ alignup32(int a, int b) { ...@@ -53,13 +54,18 @@ alignup32(int a, int b) {
// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at // Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at
// least >= the target_sub_block_size. // least >= the target_sub_block_size.
void int
choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret) { choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret) {
if (total_size < 0 || n_sub_blocks_limit < 1)
return EINVAL;
const int alignment = 32; const int alignment = 32;
int n_sub_blocks, sub_block_size; int n_sub_blocks, sub_block_size;
n_sub_blocks = total_size / target_sub_block_size; n_sub_blocks = total_size / target_sub_block_size;
if (n_sub_blocks <= 1) { if (n_sub_blocks <= 1) {
n_sub_blocks = n_sub_blocks;
if (total_size > 0 && n_sub_blocks_limit > 0)
n_sub_blocks = 1; n_sub_blocks = 1;
sub_block_size = total_size; sub_block_size = total_size;
} else { } else {
...@@ -72,6 +78,8 @@ choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_siz ...@@ -72,6 +78,8 @@ choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_siz
*sub_block_size_ret = sub_block_size; *sub_block_size_ret = sub_block_size;
*n_sub_blocks_ret = n_sub_blocks; *n_sub_blocks_ret = n_sub_blocks;
return 0;
} }
void void
...@@ -86,6 +94,20 @@ set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, st ...@@ -86,6 +94,20 @@ set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, st
sub_block[i].uncompressed_size = size_left; sub_block[i].uncompressed_size = size_left;
} }
// find the index of the first sub block that contains offset
// Returns the sub block index, else returns -1
int
get_sub_block_index(int n_sub_blocks, struct sub_block sub_block[], size_t offset) {
size_t start_offset = 0;
for (int i = 0; i < n_sub_blocks; i++) {
size_t size = sub_block[i].uncompressed_size;
if (offset < start_offset + size)
return i;
start_offset += size;
}
return -1;
}
#include "workset.h" #include "workset.h"
void void
...@@ -198,13 +220,14 @@ int ...@@ -198,13 +220,14 @@ int
decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum) { decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompress_ptr, u_int32_t uncompress_size, u_int32_t expected_xsum) {
// verify checksum // verify checksum
u_int32_t xsum = x1764_memory(compress_ptr, compress_size); u_int32_t xsum = x1764_memory(compress_ptr, compress_size);
assert(xsum == expected_xsum); if (xsum != expected_xsum)
return EINVAL;
// decompress // decompress
uLongf destlen = uncompress_size; uLongf destlen = uncompress_size;
int r = uncompress(uncompress_ptr, &destlen, compress_ptr, compress_size); int r = uncompress(uncompress_ptr, &destlen, compress_ptr, compress_size);
assert(destlen == uncompress_size); if (r != Z_OK || destlen != uncompress_size)
assert(r==Z_OK); return EINVAL;
return 0; return 0;
} }
...@@ -222,10 +245,12 @@ decompress_worker(void *arg) { ...@@ -222,10 +245,12 @@ decompress_worker(void *arg) {
return arg; return arg;
} }
void int
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores) { decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores) {
int r;
if (n_sub_blocks == 1) { if (n_sub_blocks == 1) {
decompress_sub_block(compressed_data, sub_block[0].compressed_size, uncompressed_data, sub_block[0].uncompressed_size, sub_block[0].xsum); r = decompress_sub_block(compressed_data, sub_block[0].compressed_size, uncompressed_data, sub_block[0].uncompressed_size, sub_block[0].xsum);
} else { } else {
// compute the number of additional threads needed for decompressing this node // compute the number of additional threads needed for decompressing this node
int T = num_cores; // T = min(#cores, #blocks) - 1 int T = num_cores; // T = min(#cores, #blocks) - 1
...@@ -259,5 +284,14 @@ decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsign ...@@ -259,5 +284,14 @@ decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsign
// cleanup // cleanup
threadset_join(tids, T); threadset_join(tids, T);
workset_destroy(&ws); workset_destroy(&ws);
r = 0;
for (int i = 0; i < n_sub_blocks; i++) {
r = decompress_work[i].error;
if (r != 0)
break;
} }
}
return r;
} }
...@@ -38,12 +38,17 @@ get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]); ...@@ -38,12 +38,17 @@ get_sum_uncompressed_size(int n_sub_blocks, struct sub_block sub_block[]);
// Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at // Choose n_sub_blocks and sub_block_size such that the product is >= total_size and the sub_block_size is at
// least >= the target_sub_block_size. // least >= the target_sub_block_size.
void int
choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret); choose_sub_block_size(int total_size, int n_sub_blocks_limit, int *sub_block_size_ret, int *n_sub_blocks_ret);
void void
set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]); set_all_sub_block_sizes(int total_size, int sub_block_size, int n_sub_blocks, struct sub_block sub_block[]);
// find the index of the first sub block that contains the offset
// Returns the index if found, else returns -1
int
get_sub_block_index(int n_sub_blocks, struct sub_block sub_block[], size_t offset);
#include "workset.h" #include "workset.h"
struct compress_work { struct compress_work {
...@@ -88,7 +93,9 @@ decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompre ...@@ -88,7 +93,9 @@ decompress_sub_block(void *compress_ptr, u_int32_t compress_size, void *uncompre
void * void *
decompress_worker(void *arg); decompress_worker(void *arg);
void // decompress all sub blocks from the compressed_data buffer to the uncompressed_data buffer
// Returns 0 if success, otherwise an error
int
decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores); decompress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], unsigned char *compressed_data, unsigned char *uncompressed_data, int num_cores);
......
#ifndef _TOKU_SUB_BLOCK_MAP_H
#define TOKU_SUB_BLOCK_MAP_H
// Map objects to a sequence of sub block
struct sub_block_map {
u_int32_t idx;
u_int32_t offset;
u_int32_t size;
};
enum {
stored_sub_block_map_size = sizeof (struct sub_block_map), // size of a sub-block map on disk
};
static inline void
sub_block_map_init(struct sub_block_map *sbmap, u_int32_t idx, u_int32_t offset, u_int32_t size) {
sbmap->idx = idx;
sbmap->offset = offset;
sbmap->size = size;
}
static inline void
sub_block_map_serialize(struct sub_block_map *sbmap, struct wbuf *wbuf) {
wbuf_nocrc_int(wbuf, sbmap->idx);
wbuf_nocrc_int(wbuf, sbmap->offset);
wbuf_nocrc_int(wbuf, sbmap->size);
}
static inline void
sub_block_map_deserialize(struct sub_block_map *sbmap, struct rbuf *rbuf) {
sbmap->idx = rbuf_int(rbuf);
sbmap->offset = rbuf_int(rbuf);
sbmap->size = rbuf_int(rbuf);
}
#endif
// test that corrupt checksums are detected
#include <toku_portability.h>
#include "test.h"
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "sub_block.h"
int verbose;
static uint8_t
get_uint8_at_offset(void *vp, size_t offset) {
uint8_t *ip = (uint8_t *) vp;
return ip[offset];
}
static void
set_uint8_at_offset(void *vp, size_t offset, uint8_t newv) {
uint8_t *ip = (uint8_t *) vp;
ip[offset] = newv;
}
static void
test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_cores) {
if (verbose)
printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, total_size, my_max_sub_blocks);
int r;
int sub_block_size, n_sub_blocks;
r = choose_sub_block_size(total_size, my_max_sub_blocks, &sub_block_size, &n_sub_blocks);
assert(r == 0);
if (verbose)
printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, sub_block_size, n_sub_blocks);
struct sub_block sub_blocks[n_sub_blocks];
set_all_sub_block_sizes(total_size, sub_block_size, n_sub_blocks, sub_blocks);
size_t cbuf_size_bound = get_sum_compressed_size_bound(n_sub_blocks, sub_blocks);
void *cbuf = toku_malloc(cbuf_size_bound);
assert(cbuf);
size_t cbuf_size = compress_all_sub_blocks(n_sub_blocks, sub_blocks, buf, cbuf, n_cores);
assert(cbuf_size <= cbuf_size_bound);
void *ubuf = toku_malloc(total_size);
assert(ubuf);
for (int xidx = 0; xidx < n_sub_blocks; xidx++) {
// corrupt a checksum
sub_blocks[xidx].xsum += 1;
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores);
assert(r != 0);
// reset the checksums
sub_blocks[xidx].xsum -= 1;
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores);
assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0);
// corrupt the data
size_t offset = random() % cbuf_size;
unsigned char c = get_uint8_at_offset(cbuf, offset);
set_uint8_at_offset(cbuf, offset, c+1);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores);
assert(r != 0);
// reset the data
set_uint8_at_offset(cbuf, offset, c);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores);
assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0);
}
toku_free(ubuf);
toku_free(cbuf);
}
static void
set_random(void *buf, int total_size) {
char *bp = (char *) buf;
for (int i = 0; i < total_size; i++)
bp[i] = random();
}
static void
run_test(int total_size, int n_cores) {
void *buf = toku_malloc(total_size);
assert(buf);
for (int my_max_sub_blocks = 1; my_max_sub_blocks <= max_sub_blocks; my_max_sub_blocks++) {
memset(buf, 0, total_size);
test_sub_block_checksum(buf, total_size, my_max_sub_blocks, n_cores);
set_random(buf, total_size);
test_sub_block_checksum(buf, total_size, my_max_sub_blocks, n_cores);
}
toku_free(buf);
}
int
test_main (int argc, const char *argv[]) {
int n_cores = 1;
int e = 1;
for (int i = 1; i < argc; i++) {
const char *arg = argv[i];
if (strcmp(arg, "-v") == 0 || strcmp(arg, "--verbose") == 0) {
verbose++;
continue;
}
if (strcmp(arg, "-n") == 0) {
if (i+1 < argc) {
n_cores = atoi(argv[++i]);
continue;
}
}
if (strcmp(arg, "-e") == 0) {
if (i+1 < argc) {
e = atoi(argv[++i]);
continue;
}
}
}
for (int total_size = 256*1024; total_size <= 4*1024*1024; total_size *= 2) {
for (int size = total_size - e; size <= total_size + e; size++) {
run_test(size, n_cores);
}
}
return 0;
}
// test sub block compression and decompression
#include <toku_portability.h>
#include "test.h"
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "sub_block.h"
int verbose;
static void
test_sub_block_compression(void *buf, int total_size, int my_max_sub_blocks, int n_cores) {
if (verbose)
printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, total_size, my_max_sub_blocks);
int r;
int sub_block_size, n_sub_blocks;
r = choose_sub_block_size(total_size, my_max_sub_blocks, &sub_block_size, &n_sub_blocks);
assert(r == 0);
if (verbose)
printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, sub_block_size, n_sub_blocks);
struct sub_block sub_blocks[n_sub_blocks];
set_all_sub_block_sizes(total_size, sub_block_size, n_sub_blocks, sub_blocks);
size_t cbuf_size_bound = get_sum_compressed_size_bound(n_sub_blocks, sub_blocks);
void *cbuf = toku_malloc(cbuf_size_bound);
assert(cbuf);
size_t cbuf_size = compress_all_sub_blocks(n_sub_blocks, sub_blocks, buf, cbuf, n_cores);
assert(cbuf_size <= cbuf_size_bound);
void *ubuf = toku_malloc(total_size);
assert(ubuf);
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores);
assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0);
toku_free(ubuf);
toku_free(cbuf);
}
static void
set_random(void *buf, int total_size) {
char *bp = (char *) buf;
for (int i = 0; i < total_size; i++)
bp[i] = random();
}
static void
run_test(int total_size, int n_cores) {
void *buf = toku_malloc(total_size);
assert(buf);
for (int my_max_sub_blocks = 1; my_max_sub_blocks <= max_sub_blocks; my_max_sub_blocks++) {
memset(buf, 0, total_size);
test_sub_block_compression(buf, total_size, my_max_sub_blocks, n_cores);
set_random(buf, total_size);
test_sub_block_compression(buf, total_size, my_max_sub_blocks, n_cores);
}
toku_free(buf);
}
int
test_main (int argc, const char *argv[]) {
int n_cores = 1;
int e = 1;
for (int i = 1; i < argc; i++) {
const char *arg = argv[i];
if (strcmp(arg, "-v") == 0 || strcmp(arg, "--verbose") == 0) {
verbose++;
continue;
}
if (strcmp(arg, "-n") == 0) {
if (i+1 < argc) {
n_cores = atoi(argv[++i]);
continue;
}
}
if (strcmp(arg, "-e") == 0) {
if (i+1 < argc) {
e = atoi(argv[++i]);
continue;
}
}
}
for (int total_size = 256*1024; total_size <= 4*1024*1024; total_size *= 2) {
for (int size = total_size - e; size <= total_size + e; size++) {
run_test(size, n_cores);
}
}
return 0;
}
// test the sub block index function
#include <toku_portability.h>
#include "test.h"
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "sub_block.h"
int verbose;
static void
test_sub_block_index(void) {
if (verbose)
printf("%s:%d\n", __FUNCTION__, __LINE__);
const int n_sub_blocks = max_sub_blocks;
struct sub_block sub_block[n_sub_blocks];
size_t max_offset = 0;
for (int i = 0 ; i < n_sub_blocks; i++) {
size_t size = i+1;
sub_block_init(&sub_block[i]);
sub_block[i].uncompressed_size = size;
max_offset += size;
}
int offset_to_sub_block[max_offset];
for (int i = 0; i < (int) max_offset; i++)
offset_to_sub_block[i] = -1;
size_t start_offset = 0;
for (int i = 0; i < n_sub_blocks; i++) {
size_t size = sub_block[i].uncompressed_size;
for (int j = 0; j < (int) (start_offset + size); j++) {
if (offset_to_sub_block[j] == -1)
offset_to_sub_block[j] = i;
}
start_offset += size;
}
int r;
for (size_t offset = 0; offset < max_offset; offset++) {
r = get_sub_block_index(n_sub_blocks, sub_block, offset);
if (verbose)
printf("%s:%d %u %d\n", __FUNCTION__, __LINE__, (unsigned int) offset, r);
assert(0 <= r && r < n_sub_blocks);
assert(r == offset_to_sub_block[offset]);
}
r = get_sub_block_index(n_sub_blocks, sub_block, max_offset);
assert(r == -1);
}
int
test_main (int argc, const char *argv[]) {
int i;
for (i=1; i<argc; i++) {
const char *arg = argv[i];
if (strcmp(arg, "-v") == 0)
verbose++;
}
test_sub_block_index();
return 0;
}
// test the choose sub block size function
#include <toku_portability.h>
#include "test.h"
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "sub_block.h"
int verbose;
static void
test_sub_block_size(int total_size) {
if (verbose)
printf("%s:%d %d\n", __FUNCTION__, __LINE__, total_size);
int r;
int sub_block_size, n_sub_blocks;
r = choose_sub_block_size(total_size, 0, &sub_block_size, &n_sub_blocks);
assert(r == EINVAL);
for (int i = 1; i < max_sub_blocks; i++) {
r = choose_sub_block_size(total_size, i, &sub_block_size, &n_sub_blocks);
assert(r == 0);
assert(0 <= n_sub_blocks && n_sub_blocks <= i);
assert(total_size <= n_sub_blocks * sub_block_size);
}
}
int
test_main (int argc, const char *argv[]) {
int i;
for (i=1; i<argc; i++) {
const char *arg = argv[i];
if (strcmp(arg, "-v") == 0)
verbose++;
}
test_sub_block_size(0);
for (int total_size = 1; total_size <= 4*1024*1024; total_size *= 2) {
test_sub_block_size(total_size);
}
return 0;
}
...@@ -35,6 +35,10 @@ static inline void wbuf_init (struct wbuf *w, void *buf, DISKOFF size) { ...@@ -35,6 +35,10 @@ static inline void wbuf_init (struct wbuf *w, void *buf, DISKOFF size) {
x1764_init(&w->checksum); x1764_init(&w->checksum);
} }
static inline size_t wbuf_get_woffset(struct wbuf *w) {
return w->ndone;
}
/* Write a character. */ /* Write a character. */
static inline void wbuf_nocrc_char (struct wbuf *w, unsigned char ch) { static inline void wbuf_nocrc_char (struct wbuf *w, unsigned char ch) {
assert(w->ndone<w->size); assert(w->ndone<w->size);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment