Commit c19ff63d authored by Christian Rober's avatar Christian Rober Committed by Yoni Fogel

[t:4567] Merging new node error reporting to main.

git-svn-id: file:///svn/toku/tokudb@41836 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8e517062
...@@ -465,9 +465,9 @@ int toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE ...@@ -465,9 +465,9 @@ int toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE
struct brt_header *h, int n_workitems, int n_threads, struct brt_header *h, int n_workitems, int n_threads,
BOOL for_checkpoint); BOOL for_checkpoint);
int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, ROLLBACK_LOG_NODE *logp, struct brt_header *h); int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, ROLLBACK_LOG_NODE *logp, struct brt_header *h);
void toku_deserialize_bp_from_disk(BRTNODE node, BRTNODE_DISK_DATA ndd, int childnum, int fd, struct brtnode_fetch_extra* bfe); enum deserialize_error_code toku_deserialize_bp_from_disk(BRTNODE node, BRTNODE_DISK_DATA ndd, int childnum, int fd, struct brtnode_fetch_extra* bfe);
void toku_deserialize_bp_from_compressed(BRTNODE node, int childnum, DESCRIPTOR desc, brt_compare_func cmp); enum deserialize_error_code toku_deserialize_bp_from_compressed(BRTNODE node, int childnum, DESCRIPTOR desc, brt_compare_func cmp);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, BRTNODE_DISK_DATA* ndd, struct brtnode_fetch_extra* bfe); enum deserialize_error_code toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, BRTNODE_DISK_DATA* ndd, struct brtnode_fetch_extra* bfe);
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */ unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len); int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......
...@@ -1199,9 +1199,10 @@ static void read_brtnode_header_from_fd_into_rbuf_if_small_enough (int fd, BLOCK ...@@ -1199,9 +1199,10 @@ static void read_brtnode_header_from_fd_into_rbuf_if_small_enough (int fd, BLOCK
// read the compressed partition into the sub_block, // read the compressed partition into the sub_block,
// validate the checksum of the compressed data // validate the checksum of the compressed data
// //
static void static enum deserialize_error_code
read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb) read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb)
{ {
enum deserialize_error_code e = DS_OK;
sb->compressed_size = rbuf_int(rb); sb->compressed_size = rbuf_int(rb);
sb->uncompressed_size = rbuf_int(rb); sb->uncompressed_size = rbuf_int(rb);
bytevec* cp = (bytevec*)&sb->compressed_ptr; bytevec* cp = (bytevec*)&sb->compressed_ptr;
...@@ -1209,14 +1210,22 @@ read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb) ...@@ -1209,14 +1210,22 @@ read_compressed_sub_block(struct rbuf *rb, struct sub_block *sb)
sb->xsum = rbuf_int(rb); sb->xsum = rbuf_int(rb);
// let's check the checksum // let's check the checksum
u_int32_t actual_xsum = x1764_memory((char *)sb->compressed_ptr-8, 8+sb->compressed_size); u_int32_t actual_xsum = x1764_memory((char *)sb->compressed_ptr-8, 8+sb->compressed_size);
invariant(sb->xsum == actual_xsum); if (sb->xsum != actual_xsum) {
e = DS_XSUM_FAIL;
}
return e;
} }
static void static enum deserialize_error_code
read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb) read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb)
{ {
read_compressed_sub_block(rb, sb); enum deserialize_error_code e = DS_OK;
e = read_compressed_sub_block(rb, sb);
if (e != DS_OK) {
goto exit;
}
sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size); sb->uncompressed_ptr = toku_xmalloc(sb->uncompressed_size);
assert(sb->uncompressed_ptr); assert(sb->uncompressed_ptr);
...@@ -1226,20 +1235,24 @@ read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb) ...@@ -1226,20 +1235,24 @@ read_and_decompress_sub_block(struct rbuf *rb, struct sub_block *sb)
sb->compressed_ptr, sb->compressed_ptr,
sb->compressed_size sb->compressed_size
); );
exit:
return e;
} }
// verify the checksum // verify the checksum
static void static enum deserialize_error_code
verify_brtnode_sub_block (struct sub_block *sb) verify_brtnode_sub_block (struct sub_block *sb)
{ {
enum deserialize_error_code e = DS_OK;
// first verify the checksum // first verify the checksum
u_int32_t data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end u_int32_t data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
u_int32_t stored_xsum = toku_dtoh32(*((u_int32_t *)((char *)sb->uncompressed_ptr + data_size))); u_int32_t stored_xsum = toku_dtoh32(*((u_int32_t *)((char *)sb->uncompressed_ptr + data_size)));
u_int32_t actual_xsum = x1764_memory(sb->uncompressed_ptr, data_size); u_int32_t actual_xsum = x1764_memory(sb->uncompressed_ptr, data_size);
if (stored_xsum != actual_xsum) { if (stored_xsum != actual_xsum) {
dump_bad_block(sb->uncompressed_ptr, sb->uncompressed_size); dump_bad_block(sb->uncompressed_ptr, sb->uncompressed_size);
assert(FALSE); e = DS_XSUM_FAIL;
} }
return e;
} }
// This function deserializes the data stored by serialize_brtnode_info // This function deserializes the data stored by serialize_brtnode_info
...@@ -1412,7 +1425,7 @@ static void setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* b ...@@ -1412,7 +1425,7 @@ static void setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* b
/* deserialize the partition from the sub-block's uncompressed buffer /* deserialize the partition from the sub-block's uncompressed buffer
* and destroy the uncompressed buffer * and destroy the uncompressed buffer
*/ */
static void static enum deserialize_error_code
deserialize_brtnode_partition( deserialize_brtnode_partition(
struct sub_block *sb, struct sub_block *sb,
BRTNODE node, BRTNODE node,
...@@ -1421,7 +1434,11 @@ deserialize_brtnode_partition( ...@@ -1421,7 +1434,11 @@ deserialize_brtnode_partition(
brt_compare_func cmp brt_compare_func cmp
) )
{ {
verify_brtnode_sub_block(sb); enum deserialize_error_code e = DS_OK;
e = verify_brtnode_sub_block(sb);
if (e != DS_OK) {
goto exit;
}
u_int32_t data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end u_int32_t data_size = sb->uncompressed_size - 4; // checksum is 4 bytes at end
// now with the data verified, we can read the information into the node // now with the data verified, we can read the information into the node
...@@ -1467,39 +1484,55 @@ deserialize_brtnode_partition( ...@@ -1467,39 +1484,55 @@ deserialize_brtnode_partition(
} }
assert(rb.ndone == rb.size); assert(rb.ndone == rb.size);
toku_free(sb->uncompressed_ptr); toku_free(sb->uncompressed_ptr);
exit:
return e;
} }
static void static enum deserialize_error_code
decompress_and_deserialize_worker(struct rbuf curr_rbuf, struct sub_block curr_sb, BRTNODE node, int child, DESCRIPTOR desc, brt_compare_func cmp) decompress_and_deserialize_worker(struct rbuf curr_rbuf, struct sub_block curr_sb, BRTNODE node, int child, DESCRIPTOR desc, brt_compare_func cmp)
{ {
read_and_decompress_sub_block(&curr_rbuf, &curr_sb); enum deserialize_error_code e = DS_OK;
e = read_and_decompress_sub_block(&curr_rbuf, &curr_sb);
if (e != DS_OK) {
goto exit;
}
// at this point, sb->uncompressed_ptr stores the serialized node partition // at this point, sb->uncompressed_ptr stores the serialized node partition
deserialize_brtnode_partition(&curr_sb, node, child, desc, cmp); e = deserialize_brtnode_partition(&curr_sb, node, child, desc, cmp);
exit:
return e;
} }
static void static enum deserialize_error_code
check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf, struct sub_block curr_sb, BRTNODE node, int child) check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf, struct sub_block curr_sb, BRTNODE node, int child)
{ {
read_compressed_sub_block(&curr_rbuf, &curr_sb); enum deserialize_error_code e = DS_OK;
e = read_compressed_sub_block(&curr_rbuf, &curr_sb);
if (e != DS_OK) {
goto exit;
}
SUB_BLOCK bp_sb = BSB(node, child); SUB_BLOCK bp_sb = BSB(node, child);
bp_sb->compressed_size = curr_sb.compressed_size; bp_sb->compressed_size = curr_sb.compressed_size;
bp_sb->uncompressed_size = curr_sb.uncompressed_size; bp_sb->uncompressed_size = curr_sb.uncompressed_size;
bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size); bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size);
memcpy(bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size); memcpy(bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size);
exit:
return e;
} }
static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnode, static enum deserialize_error_code
BRTNODE_DISK_DATA* ndd, deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnode,
BLOCKNUM blocknum, BRTNODE_DISK_DATA* ndd,
u_int32_t fullhash, BLOCKNUM blocknum,
struct brtnode_fetch_extra *bfe, u_int32_t fullhash,
struct rbuf *rb, struct brtnode_fetch_extra *bfe,
int fd) struct rbuf *rb,
int fd)
// If we have enough information in the rbuf to construct a header, then do so. // If we have enough information in the rbuf to construct a header, then do so.
// Also fetch in the basement node if needed. // Also fetch in the basement node if needed.
// Return 0 if it worked. If something goes wrong (including that we are looking at some old data format that doesn't have partitions) then return nonzero. // Return 0 if it worked. If something goes wrong (including that we are looking at some old data format that doesn't have partitions) then return nonzero.
{ {
int r; enum deserialize_error_code e = DS_OK;
BRTNODE node = toku_xmalloc(sizeof(*node)); BRTNODE node = toku_xmalloc(sizeof(*node));
// fill in values that are known and not stored in rb // fill in values that are known and not stored in rb
...@@ -1509,7 +1542,7 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod ...@@ -1509,7 +1542,7 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
node->bp = NULL; // fill this in so we can free without a leak. node->bp = NULL; // fill this in so we can free without a leak.
if (rb->size < 24) { if (rb->size < 24) {
r = EINVAL; e = DS_ERRNO;
goto cleanup; goto cleanup;
} }
...@@ -1517,14 +1550,16 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod ...@@ -1517,14 +1550,16 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
rbuf_literal_bytes(rb, &magic, 8); rbuf_literal_bytes(rb, &magic, 8);
if (memcmp(magic, "tokuleaf", 8)!=0 && if (memcmp(magic, "tokuleaf", 8)!=0 &&
memcmp(magic, "tokunode", 8)!=0) { memcmp(magic, "tokunode", 8)!=0) {
r = toku_db_badformat(); // int r = toku_db_badformat();
// #define DB_BADFORMAT -30500
e = DS_ERRNO;
goto cleanup; goto cleanup;
} }
node->layout_version_read_from_disk = rbuf_int(rb); node->layout_version_read_from_disk = rbuf_int(rb);
if (node->layout_version_read_from_disk < BRT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) { if (node->layout_version_read_from_disk < BRT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
// This code path doesn't have to worry about upgrade. // This code path doesn't have to worry about upgrade.
r = EINVAL; e = DS_ERRNO;
goto cleanup; goto cleanup;
} }
...@@ -1547,7 +1582,7 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod ...@@ -1547,7 +1582,7 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
unsigned int nhsize = serialize_node_header_size(node); // we can do this because n_children is filled in. unsigned int nhsize = serialize_node_header_size(node); // we can do this because n_children is filled in.
unsigned int needed_size = nhsize + 12; // we need 12 more so that we can read the compressed block size information that follows for the nodeinfo. unsigned int needed_size = nhsize + 12; // we need 12 more so that we can read the compressed block size information that follows for the nodeinfo.
if (needed_size > rb->size) { if (needed_size > rb->size) {
r = EINVAL; e = DS_ERRNO;
goto cleanup; goto cleanup;
} }
...@@ -1563,7 +1598,10 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod ...@@ -1563,7 +1598,10 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
u_int32_t stored_checksum = rbuf_int(rb); u_int32_t stored_checksum = rbuf_int(rb);
if (stored_checksum != checksum) { if (stored_checksum != checksum) {
dump_bad_block(rb->buf, rb->size); dump_bad_block(rb->buf, rb->size);
invariant(stored_checksum == checksum); if (stored_checksum != checksum) {
e = DS_XSUM_FAIL;
goto cleanup;
}
} }
// Now we want to read the pivot information. // Now we want to read the pivot information.
...@@ -1572,7 +1610,7 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod ...@@ -1572,7 +1610,7 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
sb_node_info.compressed_size = rbuf_int(rb); // we'll be able to read these because we checked the size earlier. sb_node_info.compressed_size = rbuf_int(rb); // we'll be able to read these because we checked the size earlier.
sb_node_info.uncompressed_size = rbuf_int(rb); sb_node_info.uncompressed_size = rbuf_int(rb);
if (rb->size-rb->ndone < sb_node_info.compressed_size + 8) { if (rb->size-rb->ndone < sb_node_info.compressed_size + 8) {
r = EINVAL; // we won't e = DS_ERRNO; // we won't
goto cleanup; goto cleanup;
} }
// We got the entire header and node info! // We got the entire header and node info!
...@@ -1584,7 +1622,10 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod ...@@ -1584,7 +1622,10 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
sb_node_info.xsum = rbuf_int(rb); sb_node_info.xsum = rbuf_int(rb);
// let's check the checksum // let's check the checksum
u_int32_t actual_xsum = x1764_memory((char *)sb_node_info.compressed_ptr-8, 8+sb_node_info.compressed_size); u_int32_t actual_xsum = x1764_memory((char *)sb_node_info.compressed_ptr-8, 8+sb_node_info.compressed_size);
invariant(sb_node_info.xsum == actual_xsum); if (sb_node_info.xsum != actual_xsum) {
e = DS_XSUM_FAIL;
goto cleanup;
}
// Now decompress the subblock // Now decompress the subblock
sb_node_info.uncompressed_ptr = toku_xmalloc(sb_node_info.uncompressed_size); sb_node_info.uncompressed_ptr = toku_xmalloc(sb_node_info.uncompressed_size);
...@@ -1615,7 +1656,11 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod ...@@ -1615,7 +1656,11 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
if (bfe->type != brtnode_fetch_none) { if (bfe->type != brtnode_fetch_none) {
PAIR_ATTR attr; PAIR_ATTR attr;
toku_brtnode_pf_callback(node, *ndd, bfe, fd, &attr); int r = toku_brtnode_pf_callback(node, *ndd, bfe, fd, &attr);
if (r != 0) {
e = DS_ERRNO;
goto cleanup;
}
} }
// handle clock // handle clock
for (int i = 0; i < node->n_children; i++) { for (int i = 0; i < node->n_children; i++) {
...@@ -1625,17 +1670,17 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod ...@@ -1625,17 +1670,17 @@ static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnod
} }
} }
*brtnode = node; *brtnode = node;
r = 0; e = DS_OK;
cleanup: cleanup:
if (r!=0) { if (e != DS_OK) {
if (node) { if (node) {
toku_free(*ndd); toku_free(*ndd);
toku_free(node->bp); toku_free(node->bp);
toku_free(node); toku_free(node);
} }
} }
return r; return e;
} }
// This function takes a deserialized version 13 or 14 buffer and // This function takes a deserialized version 13 or 14 buffer and
...@@ -1848,6 +1893,9 @@ deserialize_and_upgrade_internal_node(BRTNODE node, ...@@ -1848,6 +1893,9 @@ deserialize_and_upgrade_internal_node(BRTNODE node,
__LINE__, __LINE__,
expected_xsum, expected_xsum,
actual_xsum); actual_xsum);
fprintf(stderr,
"Checksum failure while reading node in file %s.\n",
toku_cachefile_fname_in_env(bfe->h->cf));
fflush(stderr); fflush(stderr);
return toku_db_badformat(); return toku_db_badformat();
} }
...@@ -2005,8 +2053,16 @@ deserialize_and_upgrade_leaf_node(BRTNODE node, ...@@ -2005,8 +2053,16 @@ deserialize_and_upgrade_leaf_node(BRTNODE node,
u_int32_t expected_xsum = rbuf_int(rb); u_int32_t expected_xsum = rbuf_int(rb);
u_int32_t actual_xsum = x1764_memory(rb->buf, rb->size - 4); u_int32_t actual_xsum = x1764_memory(rb->buf, rb->size - 4);
if (expected_xsum != actual_xsum) { if (expected_xsum != actual_xsum) {
// TODO: Error handling. fprintf(stderr, "%s:%d: Bad checksum: expected = %"PRIx32", actual= %"PRIx32"\n",
return 1; __FUNCTION__,
__LINE__,
expected_xsum,
actual_xsum);
fprintf(stderr,
"Checksum failure while reading node in file %s.\n",
toku_cachefile_fname_in_env(bfe->h->cf));
fflush(stderr);
return toku_db_badformat();
} }
} }
...@@ -2042,11 +2098,14 @@ deserialize_and_upgrade_brtnode(BRTNODE node, ...@@ -2042,11 +2098,14 @@ deserialize_and_upgrade_brtnode(BRTNODE node,
// I. First we need to de-compress the entire node, only then can // I. First we need to de-compress the entire node, only then can
// we read the different sub-sections. // we read the different sub-sections.
struct rbuf rb; struct rbuf rb;
read_and_decompress_block_from_fd_into_rbuf(fd, r = read_and_decompress_block_from_fd_into_rbuf(fd,
blocknum, blocknum,
bfe->h, bfe->h,
&rb, &rb,
&version); &version);
if (r != 0) {
goto exit;
}
// Re-read the magic field from the previous call, since we are // Re-read the magic field from the previous call, since we are
// restarting with a fresh rbuf. // restarting with a fresh rbuf.
...@@ -2102,10 +2161,11 @@ deserialize_and_upgrade_brtnode(BRTNODE node, ...@@ -2102,10 +2161,11 @@ deserialize_and_upgrade_brtnode(BRTNODE node,
} }
toku_free(rb.buf); toku_free(rb.buf);
exit:
return r; return r;
} }
static int static enum deserialize_error_code
deserialize_brtnode_from_rbuf( deserialize_brtnode_from_rbuf(
BRTNODE *brtnode, BRTNODE *brtnode,
BRTNODE_DISK_DATA* ndd, BRTNODE_DISK_DATA* ndd,
...@@ -2118,6 +2178,7 @@ deserialize_brtnode_from_rbuf( ...@@ -2118,6 +2178,7 @@ deserialize_brtnode_from_rbuf(
// Effect: deserializes a brtnode that is in rb (with pointer of rb just past the magic) into a BRTNODE. // Effect: deserializes a brtnode that is in rb (with pointer of rb just past the magic) into a BRTNODE.
{ {
int r = 0; int r = 0;
enum deserialize_error_code e = DS_OK;
BRTNODE node = toku_xmalloc(sizeof(*node)); BRTNODE node = toku_xmalloc(sizeof(*node));
struct sub_block sb_node_info; struct sub_block sb_node_info;
// fill in values that are known and not stored in rb // fill in values that are known and not stored in rb
...@@ -2188,7 +2249,11 @@ deserialize_brtnode_from_rbuf( ...@@ -2188,7 +2249,11 @@ deserialize_brtnode_from_rbuf(
//now we read and decompress the pivot and child information //now we read and decompress the pivot and child information
sub_block_init(&sb_node_info); sub_block_init(&sb_node_info);
read_and_decompress_sub_block(rb, &sb_node_info); e = read_and_decompress_sub_block(rb, &sb_node_info);
if (e != DS_OK) {
goto cleanup;
}
// at this point, sb->uncompressed_ptr stores the serialized node info // at this point, sb->uncompressed_ptr stores the serialized node info
deserialize_brtnode_info(&sb_node_info, node); deserialize_brtnode_info(&sb_node_info, node);
toku_free(sb_node_info.uncompressed_ptr); toku_free(sb_node_info.uncompressed_ptr);
...@@ -2238,11 +2303,17 @@ deserialize_brtnode_from_rbuf( ...@@ -2238,11 +2303,17 @@ deserialize_brtnode_from_rbuf(
switch (BP_STATE(node,i)) { switch (BP_STATE(node,i)) {
case PT_AVAIL: case PT_AVAIL:
// case where we read and decompress the partition // case where we read and decompress the partition
decompress_and_deserialize_worker(curr_rbuf, curr_sb, node, i, &bfe->h->cmp_descriptor, bfe->h->compare_fun); e = decompress_and_deserialize_worker(curr_rbuf, curr_sb, node, i, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
if (e != DS_OK) {
goto cleanup;
}
continue; continue;
case PT_COMPRESSED: case PT_COMPRESSED:
// case where we leave the partition in the compressed state // case where we leave the partition in the compressed state
check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i); e = check_and_copy_compressed_sub_block_worker(curr_rbuf, curr_sb, node, i);
if (e != DS_OK) {
goto cleanup;
}
continue; continue;
case PT_INVALID: // this is really bad case PT_INVALID: // this is really bad
case PT_ON_DISK: // it's supposed to be in memory. case PT_ON_DISK: // it's supposed to be in memory.
...@@ -2255,13 +2326,16 @@ deserialize_brtnode_from_rbuf( ...@@ -2255,13 +2326,16 @@ deserialize_brtnode_from_rbuf(
r = 0; r = 0;
cleanup: cleanup:
if (r != 0) { if (r != 0) {
e = DS_ERRNO;
if (node) toku_free(node); if (node) toku_free(node);
} }
return r;
return e;
} }
void enum deserialize_error_code
toku_deserialize_bp_from_disk(BRTNODE node, BRTNODE_DISK_DATA ndd, int childnum, int fd, struct brtnode_fetch_extra* bfe) { toku_deserialize_bp_from_disk(BRTNODE node, BRTNODE_DISK_DATA ndd, int childnum, int fd, struct brtnode_fetch_extra* bfe) {
enum deserialize_error_code e = DS_OK;
assert(BP_STATE(node,childnum) == PT_ON_DISK); assert(BP_STATE(node,childnum) == PT_ON_DISK);
assert(node->bp[childnum].ptr.tag == BCT_NULL); assert(node->bp[childnum].ptr.tag == BCT_NULL);
...@@ -2292,22 +2366,31 @@ toku_deserialize_bp_from_disk(BRTNODE node, BRTNODE_DISK_DATA ndd, int childnum, ...@@ -2292,22 +2366,31 @@ toku_deserialize_bp_from_disk(BRTNODE node, BRTNODE_DISK_DATA ndd, int childnum,
{ {
// read the block // read the block
ssize_t rlen = toku_os_pread(fd, raw_block, curr_size, node_offset+curr_offset); ssize_t rlen = toku_os_pread(fd, raw_block, curr_size, node_offset+curr_offset);
// <CER> TODO: Should we return an error for this mismatched size?
lazy_assert((DISKOFF)rlen == curr_size); lazy_assert((DISKOFF)rlen == curr_size);
} }
struct sub_block curr_sb; struct sub_block curr_sb;
sub_block_init(&curr_sb); sub_block_init(&curr_sb);
read_and_decompress_sub_block(&rb, &curr_sb); e = read_and_decompress_sub_block(&rb, &curr_sb);
if (e != DS_OK) {
goto exit;
}
// at this point, sb->uncompressed_ptr stores the serialized node partition // at this point, sb->uncompressed_ptr stores the serialized node partition
deserialize_brtnode_partition(&curr_sb, node, childnum, &bfe->h->cmp_descriptor, bfe->h->compare_fun); e = deserialize_brtnode_partition(&curr_sb, node, childnum, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
exit:
toku_free(raw_block); toku_free(raw_block);
return e;
} }
// Take a brtnode partition that is in the compressed state, and make it avail // Take a brtnode partition that is in the compressed state, and make it avail
void enum deserialize_error_code
toku_deserialize_bp_from_compressed(BRTNODE node, int childnum, toku_deserialize_bp_from_compressed(BRTNODE node, int childnum,
DESCRIPTOR desc, brt_compare_func cmp) { DESCRIPTOR desc, brt_compare_func cmp) {
enum deserialize_error_code e = DS_OK;
assert(BP_STATE(node, childnum) == PT_COMPRESSED); assert(BP_STATE(node, childnum) == PT_COMPRESSED);
SUB_BLOCK curr_sb = BSB(node, childnum); SUB_BLOCK curr_sb = BSB(node, childnum);
...@@ -2323,48 +2406,52 @@ toku_deserialize_bp_from_compressed(BRTNODE node, int childnum, ...@@ -2323,48 +2406,52 @@ toku_deserialize_bp_from_compressed(BRTNODE node, int childnum,
curr_sb->compressed_ptr, curr_sb->compressed_ptr,
curr_sb->compressed_size curr_sb->compressed_size
); );
deserialize_brtnode_partition(curr_sb, node, childnum, desc, cmp); e = deserialize_brtnode_partition(curr_sb, node, childnum, desc, cmp);
toku_free(curr_sb->compressed_ptr); toku_free(curr_sb->compressed_ptr);
toku_free(curr_sb); toku_free(curr_sb);
return e;
} }
// Read brt node from file into struct. Perform version upgrade if necessary. // Read brt node from file into struct. Perform version upgrade if necessary.
int toku_deserialize_brtnode_from (int fd, enum deserialize_error_code
BLOCKNUM blocknum, toku_deserialize_brtnode_from (int fd,
u_int32_t fullhash, BLOCKNUM blocknum,
BRTNODE *brtnode, u_int32_t fullhash,
BRTNODE_DISK_DATA* ndd, BRTNODE *brtnode,
struct brtnode_fetch_extra* bfe) BRTNODE_DISK_DATA* ndd,
struct brtnode_fetch_extra* bfe)
// Effect: Read a node in. If possible, read just the header. // Effect: Read a node in. If possible, read just the header.
{ {
toku_trace("deserial start"); toku_trace("deserial start");
enum deserialize_error_code e = DS_OK;
struct rbuf rb = RBUF_INITIALIZER; struct rbuf rb = RBUF_INITIALIZER;
read_brtnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->h, &rb); read_brtnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->h, &rb);
int r = deserialize_brtnode_header_from_rbuf_if_small_enough(brtnode, ndd, blocknum, fullhash, bfe, &rb, fd); e = deserialize_brtnode_header_from_rbuf_if_small_enough(brtnode, ndd, blocknum, fullhash, bfe, &rb, fd);
if (r != 0) { if (e == DS_ERRNO) {
e = DS_OK;
toku_free(rb.buf); toku_free(rb.buf);
rb = RBUF_INITIALIZER; rb = RBUF_INITIALIZER;
// Something went wrong, go back to doing it the old way. // Something went wrong, go back to doing it the old way.
int r = 0;
r = read_block_from_fd_into_rbuf(fd, blocknum, bfe->h, &rb); r = read_block_from_fd_into_rbuf(fd, blocknum, bfe->h, &rb);
if (r != 0) { goto cleanup; } // if we were successful, then we are done. if (r != 0) {
e = DS_ERRNO;
goto cleanup;
} // if we were successful, then we are done.
r = deserialize_brtnode_from_rbuf(brtnode, ndd, blocknum, fullhash, bfe, &rb, fd); e = deserialize_brtnode_from_rbuf(brtnode, ndd, blocknum, fullhash, bfe, &rb, fd);
if (r!=0) { if (e != DS_OK) {
dump_bad_block(rb.buf,rb.size); dump_bad_block(rb.buf,rb.size);
} }
lazy_assert_zero(r);
} }
toku_trace("deserial done"); toku_trace("deserial done");
cleanup: cleanup:
toku_free(rb.buf); toku_free(rb.buf);
return r; return e;
} }
void void
...@@ -2661,10 +2748,11 @@ deserialize_descriptor_from(int fd, BLOCK_TABLE bt, DESCRIPTOR desc, int layout_ ...@@ -2661,10 +2748,11 @@ deserialize_descriptor_from(int fd, BLOCK_TABLE bt, DESCRIPTOR desc, int layout_
return e; return e;
} }
static void static enum deserialize_error_code
upgrade_subtree_estimates_to_stat64info(int UU(fd), struct brt_header *h) upgrade_subtree_estimates_to_stat64info(int UU(fd), struct brt_header *h)
{ {
int r; int r;
enum deserialize_error_code e = DS_OK;
// 15 was the last version with subtree estimates // 15 was the last version with subtree estimates
invariant(h->layout_version_read_from_disk <= BRT_LAYOUT_VERSION_15); invariant(h->layout_version_read_from_disk <= BRT_LAYOUT_VERSION_15);
BLOCKNUM b = h->root_blocknum; BLOCKNUM b = h->root_blocknum;
...@@ -2697,7 +2785,10 @@ upgrade_subtree_estimates_to_stat64info(int UU(fd), struct brt_header *h) ...@@ -2697,7 +2785,10 @@ upgrade_subtree_estimates_to_stat64info(int UU(fd), struct brt_header *h)
invariant(header_length <= size); invariant(header_length <= size);
u_int32_t xsum = x1764_memory(raw_block, header_length); u_int32_t xsum = x1764_memory(raw_block, header_length);
u_int32_t stored_xsum = toku_dtoh32(*(u_int32_t *)(raw_block + header_length)); u_int32_t stored_xsum = toku_dtoh32(*(u_int32_t *)(raw_block + header_length));
invariant(xsum == stored_xsum); if (xsum != stored_xsum) {
e = DS_XSUM_FAIL;
goto exit;
}
} }
struct sub_block sub_block[n_sub_blocks]; struct sub_block sub_block[n_sub_blocks];
u_int32_t *sub_block_header = (u_int32_t *) &raw_block[node_header_overhead + 4]; u_int32_t *sub_block_header = (u_int32_t *) &raw_block[node_header_overhead + 4];
...@@ -2724,6 +2815,7 @@ upgrade_subtree_estimates_to_stat64info(int UU(fd), struct brt_header *h) ...@@ -2724,6 +2815,7 @@ upgrade_subtree_estimates_to_stat64info(int UU(fd), struct brt_header *h)
if (r != 0) { if (r != 0) {
fprintf(stderr, "%s:%d block %"PRId64" failed %d at %p size %zu\n", __FUNCTION__, __LINE__, b.b, r, raw_block, size); fprintf(stderr, "%s:%d block %"PRId64" failed %d at %p size %zu\n", __FUNCTION__, __LINE__, b.b, r, raw_block, size);
dump_bad_block(raw_block, size); dump_bad_block(raw_block, size);
fprintf(stderr, "Unknown failure while reading node in file %s.\n", toku_cachefile_fname_in_env(h->cf));
} }
lazy_assert_zero(r); lazy_assert_zero(r);
rb->ndone = 0; rb->ndone = 0;
...@@ -2771,6 +2863,8 @@ upgrade_subtree_estimates_to_stat64info(int UU(fd), struct brt_header *h) ...@@ -2771,6 +2863,8 @@ upgrade_subtree_estimates_to_stat64info(int UU(fd), struct brt_header *h)
// done, discard the rest // done, discard the rest
toku_free(rb->buf); toku_free(rb->buf);
exit:
return e;
} }
// We only deserialize brt header once and then share everything with all the brts. // We only deserialize brt header once and then share everything with all the brts.
...@@ -2904,7 +2998,10 @@ deserialize_brtheader_versioned(int fd, struct rbuf *rb, struct brt_header **brt ...@@ -2904,7 +2998,10 @@ deserialize_brtheader_versioned(int fd, struct rbuf *rb, struct brt_header **brt
h->count_of_optimize_in_progress_read_from_disk = h->count_of_optimize_in_progress; h->count_of_optimize_in_progress_read_from_disk = h->count_of_optimize_in_progress;
h->msn_at_start_of_last_completed_optimize = rbuf_msn(rb); h->msn_at_start_of_last_completed_optimize = rbuf_msn(rb);
} else { } else {
upgrade_subtree_estimates_to_stat64info(fd, h); e = upgrade_subtree_estimates_to_stat64info(fd, h);
if (e != DS_OK) {
goto exit;
}
h->time_of_last_optimize_begin = 0; h->time_of_last_optimize_begin = 0;
h->time_of_last_optimize_end = 0; h->time_of_last_optimize_end = 0;
h->count_of_optimize_in_progress = 0; h->count_of_optimize_in_progress = 0;
...@@ -3453,9 +3550,11 @@ deserialize_rollback_log_from_rbuf_versioned (u_int32_t version, BLOCKNUM blockn ...@@ -3453,9 +3550,11 @@ deserialize_rollback_log_from_rbuf_versioned (u_int32_t version, BLOCKNUM blockn
return r; return r;
} }
static int static enum deserialize_error_code
decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) { decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
toku_trace("decompress"); toku_trace("decompress");
int r = 0;
enum deserialize_error_code e = DS_OK;
// get the number of compressed sub blocks // get the number of compressed sub blocks
int n_sub_blocks; int n_sub_blocks;
n_sub_blocks = toku_dtoh32(*(u_int32_t*)(&raw_block[node_header_overhead])); n_sub_blocks = toku_dtoh32(*(u_int32_t*)(&raw_block[node_header_overhead]));
...@@ -3468,9 +3567,10 @@ decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size, ...@@ -3468,9 +3567,10 @@ decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size,
invariant(header_length <= raw_block_size); invariant(header_length <= raw_block_size);
u_int32_t xsum = x1764_memory(raw_block, header_length); u_int32_t xsum = x1764_memory(raw_block, header_length);
u_int32_t stored_xsum = toku_dtoh32(*(u_int32_t *)(raw_block + header_length)); u_int32_t stored_xsum = toku_dtoh32(*(u_int32_t *)(raw_block + header_length));
invariant(xsum == stored_xsum); if (xsum != stored_xsum) {
e = DS_XSUM_FAIL;
}
} }
int r;
// deserialize the sub block header // deserialize the sub block header
struct sub_block sub_block[n_sub_blocks]; struct sub_block sub_block[n_sub_blocks];
...@@ -3483,14 +3583,28 @@ decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size, ...@@ -3483,14 +3583,28 @@ decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size,
sub_block_header += 3; sub_block_header += 3;
} }
// This predicate needs to be here and instead of where it is set
// for the compiler.
if (e == DS_XSUM_FAIL) {
goto exit;
}
// verify sub block sizes // verify sub block sizes
for (int i = 0; i < n_sub_blocks; i++) { for (int i = 0; i < n_sub_blocks; i++) {
u_int32_t compressed_size = sub_block[i].compressed_size; u_int32_t compressed_size = sub_block[i].compressed_size;
if (compressed_size<=0 || compressed_size>(1<<30)) { r = toku_db_badformat(); return r; } if (compressed_size<=0 || compressed_size>(1<<30)) {
r = toku_db_badformat();
e = DS_ERRNO;
goto exit;
}
u_int32_t uncompressed_size = sub_block[i].uncompressed_size; u_int32_t uncompressed_size = sub_block[i].uncompressed_size;
if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size); if (0) printf("Block %" PRId64 " Compressed size = %u, uncompressed size=%u\n", blocknum.b, compressed_size, uncompressed_size);
if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { r = toku_db_badformat(); return r; } if (uncompressed_size<=0 || uncompressed_size>(1<<30)) {
r = toku_db_badformat();
e = DS_ERRNO;
goto exit;
}
} }
// sum up the uncompressed size of the sub blocks // sum up the uncompressed size of the sub blocks
...@@ -3516,31 +3630,33 @@ decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size, ...@@ -3516,31 +3630,33 @@ decompress_from_raw_block_into_rbuf(u_int8_t *raw_block, size_t raw_block_size,
if (r != 0) { if (r != 0) {
fprintf(stderr, "%s:%d block %"PRId64" failed %d at %p size %lu\n", __FUNCTION__, __LINE__, blocknum.b, r, raw_block, raw_block_size); fprintf(stderr, "%s:%d block %"PRId64" failed %d at %p size %lu\n", __FUNCTION__, __LINE__, blocknum.b, r, raw_block, raw_block_size);
dump_bad_block(raw_block, raw_block_size); dump_bad_block(raw_block, raw_block_size);
e = DS_ERRNO;
goto exit;
} }
lazy_assert_zero(r); lazy_assert_zero(r);
toku_trace("decompress done"); toku_trace("decompress done");
rb->ndone=0; rb->ndone=0;
exit:
return 0; return e;
} }
static int static enum deserialize_error_code
decompress_from_raw_block_into_rbuf_versioned(u_int32_t version, u_int8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) { decompress_from_raw_block_into_rbuf_versioned(u_int32_t version, u_int8_t *raw_block, size_t raw_block_size, struct rbuf *rb, BLOCKNUM blocknum) {
// This function exists solely to accomodate future changes in compression. // This function exists solely to accomodate future changes in compression.
int r; enum deserialize_error_code e = DS_OK;
switch (version) { switch (version) {
case BRT_LAYOUT_VERSION_13: case BRT_LAYOUT_VERSION_13:
case BRT_LAYOUT_VERSION_14: case BRT_LAYOUT_VERSION_14:
case BRT_LAYOUT_VERSION: case BRT_LAYOUT_VERSION:
r = decompress_from_raw_block_into_rbuf(raw_block, raw_block_size, rb, blocknum); e = decompress_from_raw_block_into_rbuf(raw_block, raw_block_size, rb, blocknum);
break; break;
default: default:
lazy_assert(FALSE); lazy_assert(FALSE);
} }
return r; return e;
} }
static int static int
...@@ -3548,7 +3664,7 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum, ...@@ -3548,7 +3664,7 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum,
struct brt_header *h, struct brt_header *h,
struct rbuf *rb, struct rbuf *rb,
/* out */ int *layout_version_p) { /* out */ int *layout_version_p) {
int r; int r = 0;
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b); if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
if (h->panic) return h->panic; if (h->panic) return h->panic;
...@@ -3581,8 +3697,21 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum, ...@@ -3581,8 +3697,21 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum,
} }
} }
r = decompress_from_raw_block_into_rbuf_versioned(layout_version, raw_block, size, rb, blocknum); enum deserialize_error_code e = DS_OK;
if (r!=0) goto cleanup; e = decompress_from_raw_block_into_rbuf_versioned(layout_version, raw_block, size, rb, blocknum);
if (e != DS_OK) {
// We either failed the checksome, or there is a bad format in
// the buffer.
if (e == DS_XSUM_FAIL) {
fprintf(stderr,
"Checksum failure while reading raw block in file %s.\n",
toku_cachefile_fname_in_env(h->cf));
assert(false);
} else {
r = toku_db_badformat();
goto cleanup;
}
}
*layout_version_p = layout_version; *layout_version_p = layout_version;
cleanup: cleanup:
......
...@@ -798,7 +798,23 @@ int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM noden ...@@ -798,7 +798,23 @@ int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM noden
// deserialize the node, must pass the bfe in because we cannot // deserialize the node, must pass the bfe in because we cannot
// evaluate what piece of the the node is necessary until we get it at // evaluate what piece of the the node is necessary until we get it at
// least partially into memory // least partially into memory
// <CER> TODO: Use checksum error code as a return HERE!
enum deserialize_error_code e;
int r = 0;
e = toku_deserialize_brtnode_from(fd, nodename, fullhash, node, ndd, bfe);
if (e == DS_XSUM_FAIL) {
fprintf(stderr, "Checksum failure while reading node in file %s.\n", toku_cachefile_fname_in_env(cachefile));
assert(false); // make absolutely sure we crash before doing anything else
} else if (e == DS_ERRNO) {
r = errno;
} else if (e == DS_OK) {
r = 0;
} else {
assert(false);
}
/*
int r = toku_deserialize_brtnode_from(fd, nodename, fullhash, node, ndd, bfe); int r = toku_deserialize_brtnode_from(fd, nodename, fullhash, node, ndd, bfe);
*/
if (r == 0) { if (r == 0) {
(*node)->h = bfe->h; // copy reference to header from bfe (*node)->h = bfe->h; // copy reference to header from bfe
*sizep = make_brtnode_pair_attr(*node); *sizep = make_brtnode_pair_attr(*node);
...@@ -1169,6 +1185,7 @@ brt_status_update_partial_fetch_reason( ...@@ -1169,6 +1185,7 @@ brt_status_update_partial_fetch_reason(
// callback for partially reading a node // callback for partially reading a node
// could have just used toku_brtnode_fetch_callback, but wanted to separate the two cases to separate functions // could have just used toku_brtnode_fetch_callback, but wanted to separate the two cases to separate functions
int toku_brtnode_pf_callback(void* brtnode_pv, void* disk_data, void* read_extraargs, int fd, PAIR_ATTR* sizep) { int toku_brtnode_pf_callback(void* brtnode_pv, void* disk_data, void* read_extraargs, int fd, PAIR_ATTR* sizep) {
enum deserialize_error_code e = DS_OK;
BRTNODE node = brtnode_pv; BRTNODE node = brtnode_pv;
BRTNODE_DISK_DATA ndd = disk_data; BRTNODE_DISK_DATA ndd = disk_data;
struct brtnode_fetch_extra *bfe = read_extraargs; struct brtnode_fetch_extra *bfe = read_extraargs;
...@@ -1196,18 +1213,25 @@ int toku_brtnode_pf_callback(void* brtnode_pv, void* disk_data, void* read_extra ...@@ -1196,18 +1213,25 @@ int toku_brtnode_pf_callback(void* brtnode_pv, void* disk_data, void* read_extra
if ((lc <= i && i <= rc) || toku_bfe_wants_child_available(bfe, i)) { if ((lc <= i && i <= rc) || toku_bfe_wants_child_available(bfe, i)) {
brt_status_update_partial_fetch_reason(bfe, i, BP_STATE(node, i), (node->height == 0)); brt_status_update_partial_fetch_reason(bfe, i, BP_STATE(node, i), (node->height == 0));
if (BP_STATE(node,i) == PT_COMPRESSED) { if (BP_STATE(node,i) == PT_COMPRESSED) {
cilk_spawn toku_deserialize_bp_from_compressed(node, i, &bfe->h->cmp_descriptor, bfe->h->compare_fun); e = toku_deserialize_bp_from_compressed(node, i, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
} }
else if (BP_STATE(node,i) == PT_ON_DISK) { else if (BP_STATE(node,i) == PT_ON_DISK) {
cilk_spawn toku_deserialize_bp_from_disk(node, ndd, i, fd, bfe); e = toku_deserialize_bp_from_disk(node, ndd, i, fd, bfe);
} }
else { else {
assert(FALSE); assert(FALSE);
} }
} }
if (e != DS_OK) {
fprintf(stderr, "Unknown failure while reading node in file %s.\n", toku_cachefile_fname_in_env(bfe->h->cf));
assert(false);
}
} }
cilk_sync; cilk_sync;
*sizep = make_brtnode_pair_attr(node); *sizep = make_brtnode_pair_attr(node);
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment