Commit 31260a71 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3164 fix the brtloader pivot key problem refs[t:3164]

git-svn-id: file:///svn/toku/tokudb@27062 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0353b4fc
...@@ -25,6 +25,53 @@ print_item (bytevec val, ITEMLEN len) { ...@@ -25,6 +25,53 @@ print_item (bytevec val, ITEMLEN len) {
printf("\""); printf("\"");
} }
static void
simple_hex_dump(unsigned char *vp, u_int64_t size) {
for (u_int64_t i = 0; i < size; i++) {
unsigned char c = vp[i];
printf("%2.2X", c);
}
}
static void
hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) {
u_int64_t n = size / 32;
for (u_int64_t i = 0; i < n; i++) {
printf("%"PRIu64": ", offset);
for (u_int64_t j = 0; j < 32; j++) {
unsigned char c = vp[j];
printf("%2.2X", c);
if (((j+1) % 4) == 0)
printf(" ");
}
for (u_int64_t j = 0; j < 32; j++) {
unsigned char c = vp[j];
printf("%c", isprint(c) ? c : ' ');
}
printf("\n");
vp += 32;
offset += 32;
}
size = size % 32;
for (u_int64_t i=0; i<size; i++) {
if ((i % 32) == 0)
printf("%"PRIu64": ", offset+i);
printf("%2.2X", vp[i]);
if (((i+1) % 4) == 0)
printf(" ");
if (((i+1) % 32) == 0)
printf("\n");
}
printf("\n");
}
static void
dump_descriptor(DESCRIPTOR d) {
printf(" descriptor version %u size %u ", d->version, d->dbt.size);
simple_hex_dump(d->dbt.data, d->dbt.size);
printf("\n");
}
static void static void
dump_header (int f, struct brt_header **header, CACHEFILE cf) { dump_header (int f, struct brt_header **header, CACHEFILE cf) {
struct brt_header *h; struct brt_header *h;
...@@ -39,6 +86,7 @@ dump_header (int f, struct brt_header **header, CACHEFILE cf) { ...@@ -39,6 +86,7 @@ dump_header (int f, struct brt_header **header, CACHEFILE cf) {
printf(" nodesize=%u\n", h->nodesize); printf(" nodesize=%u\n", h->nodesize);
printf(" unnamed_root=%" PRId64 "\n", h->root.b); printf(" unnamed_root=%" PRId64 "\n", h->root.b);
printf(" flags=%u\n", h->flags); printf(" flags=%u\n", h->flags);
dump_descriptor(&h->descriptor);
*header = h; *header = h;
} }
...@@ -217,7 +265,7 @@ sub_block_deserialize(struct sub_block *sb, unsigned char *sub_block_header) { ...@@ -217,7 +265,7 @@ sub_block_deserialize(struct sub_block *sb, unsigned char *sub_block_header) {
} }
static void static void
verify_block(unsigned char *cp, u_int64_t size) { verify_block(unsigned char *cp, u_int64_t file_offset, u_int64_t size) {
// verify the header checksum // verify the header checksum
const size_t node_header = 8 + sizeof (u_int32_t) + sizeof (u_int32_t); const size_t node_header = 8 + sizeof (u_int32_t) + sizeof (u_int32_t);
unsigned char *sub_block_header = &cp[node_header]; unsigned char *sub_block_header = &cp[node_header];
...@@ -249,7 +297,7 @@ verify_block(unsigned char *cp, u_int64_t size) { ...@@ -249,7 +297,7 @@ verify_block(unsigned char *cp, u_int64_t size) {
u_int32_t xsum = x1764_memory(cp + offset, sub_block[i].compressed_size); u_int32_t xsum = x1764_memory(cp + offset, sub_block[i].compressed_size);
printf("%u: %u %u %u", i, sub_block[i].compressed_size, sub_block[i].uncompressed_size, sub_block[i].xsum); printf("%u: %u %u %u", i, sub_block[i].compressed_size, sub_block[i].uncompressed_size, sub_block[i].xsum);
if (xsum != sub_block[i].xsum) if (xsum != sub_block[i].xsum)
printf(" fail %u", xsum); printf(" fail %u offset %"PRIu64, xsum, file_offset + offset);
printf("\n"); printf("\n");
offset += sub_block[i].compressed_size; offset += sub_block[i].compressed_size;
} }
...@@ -267,49 +315,21 @@ dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) { ...@@ -267,49 +315,21 @@ dump_block(int f, BLOCKNUM blocknum, struct brt_header *h) {
u_int64_t r = pread(f, vp, size, offset); u_int64_t r = pread(f, vp, size, offset);
if (r == (u_int64_t)size) { if (r == (u_int64_t)size) {
printf("%.8s layout_version=%u %u\n", (char*)vp, get_unaligned_uint32(vp+8), get_unaligned_uint32(vp+12)); printf("%.8s layout_version=%u %u\n", (char*)vp, get_unaligned_uint32(vp+8), get_unaligned_uint32(vp+12));
verify_block(vp, size); verify_block(vp, offset, size);
} }
toku_free(vp); toku_free(vp);
} }
static void static void
hex_dump(unsigned char *vp, u_int64_t offset, u_int64_t size) { dump_file(int f, u_int64_t offset, u_int64_t size, FILE *outfp) {
u_int64_t n = size / 32;
for (u_int64_t i = 0; i < n; i++) {
printf("%"PRIu64": ", offset);
for (u_int64_t j = 0; j < 32; j++) {
unsigned char c = vp[j];
printf("%2.2X", c);
if (((j+1) % 4) == 0)
printf(" ");
}
for (u_int64_t j = 0; j < 32; j++) {
unsigned char c = vp[j];
printf("%c", isprint(c) ? c : ' ');
}
printf("\n");
vp += 32;
offset += 32;
}
size = size % 32;
for (u_int64_t i=0; i<size; i++) {
if ((i % 32) == 0)
printf("%"PRIu64": ", offset+i);
printf("%2.2X", vp[i]);
if (((i+1) % 4) == 0)
printf(" ");
if (((i+1) % 32) == 0)
printf("\n");
}
printf("\n");
}
static void
dump_file(int f, u_int64_t offset, u_int64_t size) {
unsigned char *vp = toku_malloc(size); unsigned char *vp = toku_malloc(size);
u_int64_t r = pread(f, vp, size, offset); u_int64_t r = pread(f, vp, size, offset);
if (r == size) if (r == size) {
hex_dump(vp, offset, size); if (outfp == stdout)
hex_dump(vp, offset, size);
else
fwrite(vp, size, 1, outfp);
}
toku_free(vp); toku_free(vp);
} }
...@@ -359,7 +379,7 @@ interactive_help(void) { ...@@ -359,7 +379,7 @@ interactive_help(void) {
fprintf(stderr, "bx OFFSET | block_translation OFFSET\n"); fprintf(stderr, "bx OFFSET | block_translation OFFSET\n");
fprintf(stderr, "dumpdata 0|1\n"); fprintf(stderr, "dumpdata 0|1\n");
fprintf(stderr, "fragmentation\n"); fprintf(stderr, "fragmentation\n");
fprintf(stderr, "file OFFSET SIZE\n"); fprintf(stderr, "file OFFSET SIZE [outfilename]\n");
fprintf(stderr, "quit\n"); fprintf(stderr, "quit\n");
} }
...@@ -435,10 +455,13 @@ main (int argc, const char *const argv[]) { ...@@ -435,10 +455,13 @@ main (int argc, const char *const argv[]) {
dump_block_translation(h, offset); dump_block_translation(h, offset);
} else if (strcmp(fields[0], "fragmentation") == 0) { } else if (strcmp(fields[0], "fragmentation") == 0) {
dump_fragmentation(f, h); dump_fragmentation(f, h);
} else if (strcmp(fields[0], "file") == 0 && nfields == 3) { } else if (strcmp(fields[0], "file") == 0 && nfields >= 3) {
u_int64_t offset = getuint64(fields[1]); u_int64_t offset = getuint64(fields[1]);
u_int64_t size = getuint64(fields[2]); u_int64_t size = getuint64(fields[2]);
dump_file(f, offset, size); FILE *outfp = stdout;
if (nfields >= 4)
outfp = fopen(fields[3], "w");
dump_file(f, offset, size, outfp);
} else if (strcmp(fields[0], "quit") == 0 || strcmp(fields[0], "q") == 0) { } else if (strcmp(fields[0], "quit") == 0 || strcmp(fields[0], "q") == 0) {
break; break;
} }
......
...@@ -2272,6 +2272,28 @@ static void drain_writer_q(QUEUE q) { ...@@ -2272,6 +2272,28 @@ static void drain_writer_q(QUEUE q) {
} }
} }
static void cleanup_maxkey(DBT *maxkey) {
if (maxkey->flags == DB_DBT_REALLOC) {
toku_free(maxkey->data);
maxkey->data = NULL;
maxkey->flags = 0;
}
}
static void update_maxkey(DBT *maxkey, DBT *key) {
cleanup_maxkey(maxkey);
*maxkey = *key;
}
static int copy_maxkey(DBT *maxkey) {
DBT newkey;
toku_init_dbt_flags(&newkey, DB_DBT_REALLOC);
int r = toku_dbt_set(maxkey->size, maxkey->data, &newkey, NULL);
if (r == 0)
update_maxkey(maxkey, &newkey);
return r;
}
CILK_BEGIN CILK_BEGIN
static int toku_loader_write_brt_from_q (BRTLOADER bl, static int toku_loader_write_brt_from_q (BRTLOADER bl,
const DESCRIPTOR descriptor, const DESCRIPTOR descriptor,
...@@ -2349,7 +2371,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2349,7 +2371,9 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
u_int64_t n_rows_remaining = bl->n_rows; u_int64_t n_rows_remaining = bl->n_rows;
u_int64_t old_n_rows_remaining = bl->n_rows; u_int64_t old_n_rows_remaining = bl->n_rows;
uint64_t used_estimate = 0; // how much diskspace have we used up? uint64_t used_estimate = 0; // how much diskspace have we used up?
DBT maxkey = make_dbt(0, 0); // keep track of the max key of the current node
while (result == 0) { while (result == 0) {
void *item; void *item;
...@@ -2394,7 +2418,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2394,7 +2418,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
n_pivots++; n_pivots++;
if ((r = bl_write_dbt(&key, pivots_stream, NULL, bl))) { invariant(maxkey.data != NULL);
if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, bl))) {
brt_loader_set_panic(bl, r, TRUE); // error after cilk sync brt_loader_set_panic(bl, r, TRUE); // error after cilk sync
if (result == 0) result = r; if (result == 0) result = r;
break; break;
...@@ -2414,8 +2439,11 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2414,8 +2439,11 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
add_pair_to_leafnode(lbuf, (unsigned char *) key.data, key.size, (unsigned char *) val.data, val.size); add_pair_to_leafnode(lbuf, (unsigned char *) key.data, key.size, (unsigned char *) val.data, val.size);
n_rows_remaining--; n_rows_remaining--;
update_maxkey(&maxkey, &key); // set the new maxkey to the current key
} }
result = copy_maxkey(&maxkey); // make a copy of maxkey before the rowset is destroyed
destroy_rowset(output_rowset); destroy_rowset(output_rowset);
toku_free(output_rowset); toku_free(output_rowset);
...@@ -2423,6 +2451,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl, ...@@ -2423,6 +2451,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
result = brt_loader_get_error(&bl->error_callback); // check if an error was posted and terminate this quickly result = brt_loader_get_error(&bl->error_callback); // check if an error was posted and terminate this quickly
} }
cleanup_maxkey(&maxkey);
if (lbuf) { if (lbuf) {
struct subtree_estimates est = make_subtree_estimates(lbuf->nkeys, lbuf->ndata, lbuf->dsize, TRUE); struct subtree_estimates est = make_subtree_estimates(lbuf->nkeys, lbuf->ndata, lbuf->dsize, TRUE);
allocate_node(&sts, lblock, est, lbuf->local_fingerprint); allocate_node(&sts, lblock, est, lbuf->local_fingerprint);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment