Commit fea0f82b authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

close[t:4077] Merge 4060 branch to main (use two reads on brtnodes).

{{{
svn merge -r35827:36428 ../tokudb.4060
}}}
Refs #4060, Fixes #4077.


git-svn-id: file:///svn/toku/tokudb@36435 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5b7415e2
...@@ -1032,6 +1032,28 @@ read_block_from_fd_into_rbuf( ...@@ -1032,6 +1032,28 @@ read_block_from_fd_into_rbuf(
return 0; return 0;
} }
static const int read_header_heuristic_max = 32*1024;
#define MIN(a,b) (((a)>(b)) ? (b) : (a))
static void read_brtnode_header_from_fd_into_rbuf_if_small_enough (int fd, BLOCKNUM blocknum, struct brt_header *h, struct rbuf *rb)
// Effect: If the header part of the node is small enough, then read it into the rbuf. The rbuf will be allocated to be big enough in any case.
{
assert(!h->panic);
DISKOFF offset, size;
toku_translate_blocknum_to_offset_size(h->blocktable, blocknum, &offset, &size);
DISKOFF read_size = MIN(read_header_heuristic_max, size);
u_int8_t *XMALLOC_N(size, raw_block);
rbuf_init(rb, raw_block, read_size);
{
// read the block
ssize_t rlen = toku_os_pread(fd, raw_block, read_size, offset);
assert(rlen>=0);
rbuf_init(rb, raw_block, rlen);
}
}
// //
// read the compressed partition into the sub_block, // read the compressed partition into the sub_block,
// validate the checksum of the compressed data // validate the checksum of the compressed data
...@@ -1163,8 +1185,14 @@ setup_available_brtnode_partition(BRTNODE node, int i) { ...@@ -1163,8 +1185,14 @@ setup_available_brtnode_partition(BRTNODE node, int i) {
} }
} }
static void static void setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* bfe, bool data_in_memory)
setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* bfe) { // Effect: Used when reading a brtnode into main memory, this sets up the partitions.
// We set bfe->child_to_read as well as the BP_STATE and the data pointers (e.g., with set_BSB or set_BNULL or other set_ operations).
// Arguments: Node: the node to set up.
// bfe: Describes the key range needed.
// data_in_memory: true if we have all the data (in which case we set the BP_STATE to be either PT_AVAIL or PT_COMPRESSED depending on the bfe.
// false if we don't have the partitions in main memory (in which case we set the state to PT_ON_DISK.
{
if (bfe->type == brtnode_fetch_subset && bfe->search != NULL) { if (bfe->type == brtnode_fetch_subset && bfe->search != NULL) {
// we do not take into account prefetching yet // we do not take into account prefetching yet
// as of now, if we need a subset, the only thing // as of now, if we need a subset, the only thing
...@@ -1194,21 +1222,28 @@ setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* bfe) { ...@@ -1194,21 +1222,28 @@ setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* bfe) {
//printf("node height %d, blocknum %"PRId64", type %d lc %d rc %d\n", node->height, node->thisnodename.b, bfe->type, lc, rc); //printf("node height %d, blocknum %"PRId64", type %d lc %d rc %d\n", node->height, node->thisnodename.b, bfe->type, lc, rc);
for (int i = 0; i < node->n_children; i++) { for (int i = 0; i < node->n_children; i++) {
BP_INIT_UNTOUCHED_CLOCK(node,i); BP_INIT_UNTOUCHED_CLOCK(node,i);
BP_STATE(node, i) = ((toku_bfe_wants_child_available(bfe, i) || (lc <= i && i <= rc)) if (data_in_memory) {
? PT_AVAIL : PT_COMPRESSED); BP_STATE(node, i) = ((toku_bfe_wants_child_available(bfe, i) || (lc <= i && i <= rc))
? PT_AVAIL : PT_COMPRESSED);
} else {
BP_STATE(node, i) = PT_ON_DISK;
}
BP_WORKDONE(node,i) = 0; BP_WORKDONE(node,i) = 0;
if (BP_STATE(node,i) == PT_AVAIL) { switch (BP_STATE(node,i)) {
//printf(" %d is available\n", i); case PT_AVAIL:
setup_available_brtnode_partition(node, i); setup_available_brtnode_partition(node, i);
BP_TOUCH_CLOCK(node,i); BP_TOUCH_CLOCK(node,i);
} continue;
else if (BP_STATE(node,i) == PT_COMPRESSED) { case PT_COMPRESSED:
//printf(" %d is compressed\n", i);
set_BSB(node, i, sub_block_creat()); set_BSB(node, i, sub_block_creat());
} continue;
else { case PT_ON_DISK:
assert(FALSE); set_BNULL(node, i);
} continue;
case PT_INVALID:
break;
}
assert(FALSE);
} }
} }
...@@ -1281,9 +1316,147 @@ check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf, struct sub_blo ...@@ -1281,9 +1316,147 @@ check_and_copy_compressed_sub_block_worker(struct rbuf curr_rbuf, struct sub_blo
memcpy(bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size); memcpy(bp_sb->compressed_ptr, curr_sb.compressed_ptr, bp_sb->compressed_size);
} }
// static int deserialize_brtnode_header_from_rbuf_if_small_enough (BRTNODE *brtnode,
// deserializes a brtnode that is in rb (with pointer of rb just past the magic) into a BRTNODE BLOCKNUM blocknum,
// u_int32_t fullhash,
struct brtnode_fetch_extra *bfe,
struct rbuf *rb,
int fd)
// If we have enough information in the rbuf to construct a header, then do so.
// Also fetch in the basement node if needed.
// Return 0 if it worked. If something goes wrong (including that we are looking at some old data format that doesn't have partitions) then return nonzero.
{
int r;
BRTNODE node = toku_xmalloc(sizeof(*node));
// fill in values that are known and not stored in rb
node->fullhash = fullhash;
node->thisnodename = blocknum;
node->dirty = 0;
node->bp = NULL; // fill this in so we can free without a leak.
if (rb->size < 24) {
r = EINVAL;
goto cleanup;
}
bytevec magic;
rbuf_literal_bytes(rb, &magic, 8);
if (memcmp(magic, "tokuleaf", 8)!=0 &&
memcmp(magic, "tokunode", 8)!=0) {
r = toku_db_badformat();
goto cleanup;
}
node->layout_version_read_from_disk = rbuf_int(rb);
if (node->layout_version_read_from_disk < BRT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
// This code path doesn't have to worry about upgrade.
r = EINVAL;
goto cleanup;
}
node->layout_version = node->layout_version_read_from_disk;
node->layout_version_original = rbuf_int(rb);
node->build_id = rbuf_int(rb);
node->n_children = rbuf_int(rb);
// Guaranteed to be have been able to read up to here. If n_children is too big, we may have a problem, so check that we won't overflow while
// reading the partition locations.
unsigned int nhsize = serialize_node_header_size(node); // we can do this because n_children is filled in.
unsigned int needed_size = nhsize + 12; // we need 12 more so that we can read the compressed block size information that follows for the nodeinfo.
if (needed_size > rb->size) {
r = EINVAL;
goto cleanup;
}
XMALLOC_N(node->n_children, node->bp);
// read the partition locations
for (int i=0; i<node->n_children; i++) {
BP_START(node,i) = rbuf_int(rb);
BP_SIZE (node,i) = rbuf_int(rb);
}
u_int32_t checksum = x1764_memory(rb->buf, rb->ndone);
u_int32_t stored_checksum = rbuf_int(rb);
if (stored_checksum != checksum) {
dump_bad_block(rb->buf, rb->size);
invariant(stored_checksum == checksum);
}
// Now we want to read the pivot information.
struct sub_block sb_node_info;
sub_block_init(&sb_node_info);
sb_node_info.compressed_size = rbuf_int(rb); // we'll be able to read these because we checked the size earlier.
sb_node_info.uncompressed_size = rbuf_int(rb);
if (rb->size-rb->ndone < sb_node_info.compressed_size + 8) {
r = EINVAL; // we won't
goto cleanup;
}
// We got the entire header and node info!
// Finish reading compressed the sub_block
bytevec* cp = (bytevec*)&sb_node_info.compressed_ptr;
rbuf_literal_bytes(rb, cp, sb_node_info.compressed_size);
sb_node_info.xsum = rbuf_int(rb);
// let's check the checksum
u_int32_t actual_xsum = x1764_memory((char *)sb_node_info.compressed_ptr-8, 8+sb_node_info.compressed_size);
invariant(sb_node_info.xsum == actual_xsum);
// Now decompress the subblock
sb_node_info.uncompressed_ptr = toku_xmalloc(sb_node_info.uncompressed_size);
assert(sb_node_info.uncompressed_ptr);
toku_decompress(
sb_node_info.uncompressed_ptr,
sb_node_info.uncompressed_size,
sb_node_info.compressed_ptr,
sb_node_info.compressed_size
);
// at this point sb->uncompressed_ptr stores the serialized node info.
deserialize_brtnode_info(&sb_node_info, node);
toku_free(sb_node_info.uncompressed_ptr);
sb_node_info.uncompressed_ptr = NULL;
// Now we have the brtnode_info. We have a bunch more stuff in the rbuf, so we might be able to store the compressed data for some objects.
// We can proceed to deserialize the individual subblocks.
assert(bfe->type == brtnode_fetch_none || bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_all || bfe->type == brtnode_fetch_prefetch);
// setup the memory of the partitions
// for partitions being decompressed, create either FIFO or basement node
// for partitions staying compressed, create sub_block
setup_brtnode_partitions(node, bfe, false);
// determine the range to preetch
int lc, rc;
if (bfe->type == brtnode_fetch_subset || bfe->type == brtnode_fetch_prefetch) {
lc = toku_bfe_leftmost_child_wanted(bfe, node);
rc = toku_bfe_rightmost_child_wanted(bfe, node);
} else {
lc = -1;
rc = -1;
}
cilk_for (int i = 0; i < node->n_children; i++) {
assert(BP_STATE(node, i) == PT_ON_DISK);
if ((lc <= i && i <= rc) || toku_bfe_wants_child_available(bfe, i)) {
assert(BP_STATE(node,i) == PT_ON_DISK);
toku_deserialize_bp_from_disk(node, i, fd, bfe);
}
}
*brtnode = node;
r = 0;
cleanup:
if (r!=0) {
if (node) {
toku_free(node->bp);
toku_free(node);
}
}
return r;
}
static int static int
deserialize_brtnode_from_rbuf( deserialize_brtnode_from_rbuf(
BRTNODE *brtnode, BRTNODE *brtnode,
...@@ -1292,13 +1465,11 @@ deserialize_brtnode_from_rbuf( ...@@ -1292,13 +1465,11 @@ deserialize_brtnode_from_rbuf(
struct brtnode_fetch_extra* bfe, struct brtnode_fetch_extra* bfe,
struct rbuf *rb struct rbuf *rb
) )
// Effect: deserializes a brtnode that is in rb (with pointer of rb just past the magic) into a BRTNODE.
{ {
int r = 0; int r = 0;
BRTNODE node = NULL; BRTNODE node = toku_xmalloc(sizeof(*node));
u_int32_t stored_checksum, checksum;
struct sub_block sb_node_info; struct sub_block sb_node_info;
node = toku_xmalloc(sizeof(*node));
if (node == NULL) goto cleanup;
// fill in values that are known and not stored in rb // fill in values that are known and not stored in rb
node->fullhash = fullhash; node->fullhash = fullhash;
...@@ -1308,10 +1479,18 @@ deserialize_brtnode_from_rbuf( ...@@ -1308,10 +1479,18 @@ deserialize_brtnode_from_rbuf(
// now start reading from rbuf // now start reading from rbuf
// first thing we do is read the header information // first thing we do is read the header information
bytevec magic;
rbuf_literal_bytes(rb, &magic, 8);
if (memcmp(magic, "tokuleaf", 8)!=0 &&
memcmp(magic, "tokunode", 8)!=0) {
r = toku_db_badformat();
goto cleanup;
}
node->layout_version_read_from_disk = rbuf_int(rb); node->layout_version_read_from_disk = rbuf_int(rb);
// TODO: (Zardosht), worry about upgrade // TODO: (Zardosht), worry about upgrade
if (node->layout_version_read_from_disk != BRT_LAYOUT_VERSION) { if (node->layout_version_read_from_disk != BRT_LAYOUT_VERSION) {
r = EINVAL; r = toku_db_badformat();
goto cleanup; goto cleanup;
} }
node->layout_version = node->layout_version_read_from_disk; node->layout_version = node->layout_version_read_from_disk;
...@@ -1325,8 +1504,8 @@ deserialize_brtnode_from_rbuf( ...@@ -1325,8 +1504,8 @@ deserialize_brtnode_from_rbuf(
BP_SIZE (node,i) = rbuf_int(rb); BP_SIZE (node,i) = rbuf_int(rb);
} }
// verify checksum of header stored // verify checksum of header stored
checksum = x1764_memory(rb->buf, rb->ndone); u_int32_t checksum = x1764_memory(rb->buf, rb->ndone);
stored_checksum = rbuf_int(rb); u_int32_t stored_checksum = rbuf_int(rb);
if (stored_checksum != checksum) { if (stored_checksum != checksum) {
dump_bad_block(rb->buf, rb->size); dump_bad_block(rb->buf, rb->size);
invariant(stored_checksum == checksum); invariant(stored_checksum == checksum);
...@@ -1346,7 +1525,7 @@ deserialize_brtnode_from_rbuf( ...@@ -1346,7 +1525,7 @@ deserialize_brtnode_from_rbuf(
// setup the memory of the partitions // setup the memory of the partitions
// for partitions being decompressed, create either FIFO or basement node // for partitions being decompressed, create either FIFO or basement node
// for partitions staying compressed, create sub_block // for partitions staying compressed, create sub_block
setup_brtnode_partitions(node,bfe); setup_brtnode_partitions(node, bfe, true);
// Previously, this code was a for loop with spawns inside and a sync at the end. // Previously, this code was a for loop with spawns inside and a sync at the end.
// But now the loop is parallelizeable since we don't have a dependency on the work done so far. // But now the loop is parallelizeable since we don't have a dependency on the work done so far.
...@@ -1484,41 +1663,39 @@ toku_deserialize_bp_from_compressed(BRTNODE node, int childnum, ...@@ -1484,41 +1663,39 @@ toku_deserialize_bp_from_compressed(BRTNODE node, int childnum,
// Read brt node from file into struct. Perform version upgrade if necessary. // Read brt node from file into struct. Perform version upgrade if necessary.
int int toku_deserialize_brtnode_from (int fd,
toku_deserialize_brtnode_from ( BLOCKNUM blocknum,
int fd, u_int32_t fullhash,
BLOCKNUM blocknum, BRTNODE *brtnode,
u_int32_t fullhash, struct brtnode_fetch_extra* bfe)
BRTNODE *brtnode, // Effect: Read a node in. If possible, read just the header.
struct brtnode_fetch_extra* bfe
)
{ {
toku_trace("deserial start"); toku_trace("deserial start");
int r; struct rbuf rb = RBUF_INITIALIZER;
struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0}; read_brtnode_header_from_fd_into_rbuf_if_small_enough(fd, blocknum, bfe->h, &rb);
r = read_block_from_fd_into_rbuf(fd, blocknum, bfe->h, &rb); int r = deserialize_brtnode_header_from_rbuf_if_small_enough(brtnode, blocknum, fullhash, bfe, &rb, fd);
if (r != 0) { goto cleanup; } if (r != 0) {
toku_free(rb.buf);
rb = RBUF_INITIALIZER;
// Something went wrong, go back to doing it the old way.
bytevec magic; r = read_block_from_fd_into_rbuf(fd, blocknum, bfe->h, &rb);
rbuf_literal_bytes(&rb, &magic, 8); if (r != 0) { goto cleanup; } // if we were successful, then we are done.
if (memcmp(magic, "tokuleaf", 8)!=0 &&
memcmp(magic, "tokunode", 8)!=0) {
r = toku_db_badformat();
goto cleanup;
}
r = deserialize_brtnode_from_rbuf(brtnode, blocknum, fullhash, bfe, &rb); r = deserialize_brtnode_from_rbuf(brtnode, blocknum, fullhash, bfe, &rb);
if (r!=0) { if (r!=0) {
dump_bad_block(rb.buf,rb.size); dump_bad_block(rb.buf,rb.size);
} }
lazy_assert_zero(r); lazy_assert_zero(r);
}
toku_trace("deserial done"); toku_trace("deserial done");
cleanup: cleanup:
if (rb.buf) toku_free(rb.buf); toku_free(rb.buf);
return r; return r;
} }
......
...@@ -21,6 +21,7 @@ struct rbuf { ...@@ -21,6 +21,7 @@ struct rbuf {
unsigned int size; unsigned int size;
unsigned int ndone; unsigned int ndone;
}; };
#define RBUF_INITIALIZER ((struct rbuf){.buf = NULL, .size=0, .ndone=0})
static inline void rbuf_init(struct rbuf *r, unsigned char *buf, unsigned int size) { static inline void rbuf_init(struct rbuf *r, unsigned char *buf, unsigned int size) {
r->buf = buf; r->buf = buf;
......
...@@ -105,9 +105,9 @@ setup_dn(enum brtnode_verify_type bft, int fd, struct brt_header *brt_h, BRTNODE ...@@ -105,9 +105,9 @@ setup_dn(enum brtnode_verify_type bft, int fd, struct brt_header *brt_h, BRTNODE
fill_bfe_for_min_read(&bfe, brt_h, NULL, string_key_cmp); fill_bfe_for_min_read(&bfe, brt_h, NULL, string_key_cmp);
r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, dn, &bfe); r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, dn, &bfe);
assert(r==0); assert(r==0);
// assert all bp's are compressed // assert all bp's are compressed or on disk.
for (int i = 0; i < (*dn)->n_children; i++) { for (int i = 0; i < (*dn)->n_children; i++) {
assert(BP_STATE(*dn,i) == PT_COMPRESSED); assert(BP_STATE(*dn,i) == PT_COMPRESSED || BP_STATE(*dn, i) == PT_ON_DISK);
} }
// if read_none, get rid of the compressed bp's // if read_none, get rid of the compressed bp's
if (bft == read_none) { if (bft == read_none) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment