Commit 26dec9cb authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

[t:3564] Merge in barry's stuff to the main line. {{{svn merge -r32637:32693...

[t:3564] Merge in barry's stuff to the main line. {{{svn merge -r32637:32693 ../tokudb.3564b+3312}}} Refs #3564.

git-svn-id: file:///svn/toku/tokudb@32694 c7de825b-a66e-492c-adef-691d508d4ae1
parent cfc64c2b
...@@ -50,7 +50,7 @@ enum { BUFFER_HEADER_SIZE = (4 // height// ...@@ -50,7 +50,7 @@ enum { BUFFER_HEADER_SIZE = (4 // height//
+ TREE_FANOUT * 8 // children + TREE_FANOUT * 8 // children
) }; ) };
struct subtree_estimates { struct __attribute__((__packed__)) subtree_estimates {
// estimate number of rows in the tree by counting the number of rows // estimate number of rows in the tree by counting the number of rows
// in the leaves. The stuff in the internal nodes is likely to be off O(1). // in the leaves. The stuff in the internal nodes is likely to be off O(1).
u_int64_t nkeys; // number of distinct keys (obsolete with removal of dupsort, but not worth removing) u_int64_t nkeys; // number of distinct keys (obsolete with removal of dupsort, but not worth removing)
...@@ -140,6 +140,7 @@ struct brtnode_leaf_basement_node { ...@@ -140,6 +140,7 @@ struct brtnode_leaf_basement_node {
OMT buffer; OMT buffer;
unsigned int n_bytes_in_buffer; /* How many bytes to represent the OMT (including the per-key overheads, but not including the overheads for the node. */ unsigned int n_bytes_in_buffer; /* How many bytes to represent the OMT (including the per-key overheads, but not including the overheads for the node. */
unsigned int seqinsert; /* number of sequential inserts to this leaf */ unsigned int seqinsert; /* number of sequential inserts to this leaf */
MSN max_msn_applied;
}; };
#define PT_INVALID 0 #define PT_INVALID 0
...@@ -147,8 +148,25 @@ struct brtnode_leaf_basement_node { ...@@ -147,8 +148,25 @@ struct brtnode_leaf_basement_node {
#define PT_COMPRESSED 2 #define PT_COMPRESSED 2
#define PT_AVAIL 3 #define PT_AVAIL 3
enum brtnode_child_tag {
BCT_INVALID = 0,
BCT_NULL,
BCT_SUBBLOCK,
BCT_LEAF,
BCT_NONLEAF
};
typedef struct __attribute__((__packed__)) brtnode_child_pointer {
u_int8_t tag;
union {
struct sub_block *subblock;
struct brtnode_nonleaf_childinfo *nonleaf;
struct brtnode_leaf_basement_node *leaf;
} u;
} BRTNODE_CHILD_POINTER;
// a brtnode partition represents // a brtnode partition represents
struct brtnode_partition { struct __attribute__((__packed__)) brtnode_partition {
BLOCKNUM blocknum; BLOCKNUM blocknum;
BOOL have_fullhash; // do we have the full hash? BOOL have_fullhash; // do we have the full hash?
u_int32_t fullhash; // the fullhash of the child u_int32_t fullhash; // the fullhash of the child
...@@ -176,44 +194,15 @@ struct brtnode_partition { ...@@ -176,44 +194,15 @@ struct brtnode_partition {
// a struct brtnode_nonleaf_childinfo for internal nodes, // a struct brtnode_nonleaf_childinfo for internal nodes,
// a struct brtnode_leaf_basement_node for leaf nodes // a struct brtnode_leaf_basement_node for leaf nodes
// //
void* ptr; struct brtnode_child_pointer ptr;
// clock count used to for pe_callback to determine if a node should be evicted or not // clock count used to for pe_callback to determine if a node should be evicted or not
// for now, saturating the count at 1 // for now, saturating the count at 1
u_int8_t clock_count; u_int8_t clock_count;
};
// brtnode partition macros
#define BP_BLOCKNUM(node,i) ((node)->bp[i].blocknum)
#define BP_HAVE_FULLHASH(node,i) ((node)->bp[i].have_fullhash)
#define BP_FULLHASH(node,i) ((node)->bp[i].fullhash)
#define BP_STATE(node,i) ((node)->bp[i].state)
#define BP_OFFSET(node,i) ((node)->bp[i].offset)
#define BP_SUBTREE_EST(node,i) ((node)->bp[i].subtree_estimates)
//
// macros for managing a node's clock
// Should be managed by brt.c, NOT by serialize/deserialize
//
#define BP_TOUCH_CLOCK(node, i) ((node)->bp[i].clock_count = 1)
#define BP_SWEEP_CLOCK(node, i) ((node)->bp[i].clock_count = 0)
#define BP_SHOULD_EVICT(node, i) ((node)->bp[i].clock_count == 0)
// not crazy about having these two here, one is for the case where we create new
// nodes, such as in splits and creating new roots, and the other is for when
// we are deserializing a node and not all bp's are touched
#define BP_INIT_TOUCHED_CLOCK(node, i) ((node)->bp[i].clock_count = 1)
#define BP_INIT_UNTOUCHED_CLOCK(node, i) ((node)->bp[i].clock_count = 0)
// internal node macros // How many bytes worth of work was performed by messages in each buffer.
#define BNC_BUFFER(node,i) (((struct brtnode_nonleaf_childinfo*)((node)->bp[i].ptr))->buffer) uint64_t workdone;
#define BNC_NBYTESINBUF(node,i) (((struct brtnode_nonleaf_childinfo*)((node)->bp[i].ptr))->n_bytes_in_buffer) };
// leaf node macros
#define BLB_OPTIMIZEDFORUPGRADE(node,i) (((struct brtnode_leaf_basement_node*)((node)->bp[i].ptr))->optimized_for_upgrade)
#define BLB_SOFTCOPYISUPTODATE(node,i) (((struct brtnode_leaf_basement_node*)((node)->bp[i].ptr))->soft_copy_is_up_to_date)
#define BLB_BUFFER(node,i) (((struct brtnode_leaf_basement_node*)((node)->bp[i].ptr))->buffer)
#define BLB_NBYTESINBUF(node,i) (((struct brtnode_leaf_basement_node*)((node)->bp[i].ptr))->n_bytes_in_buffer)
#define BLB_SEQINSERT(node,i) (((struct brtnode_leaf_basement_node*)((node)->bp[i].ptr))->seqinsert)
struct brtnode { struct brtnode {
MSN max_msn_applied_to_node_in_memory; // max msn that has been applied to this node (for root node, this is max msn for the tree) MSN max_msn_applied_to_node_in_memory; // max msn that has been applied to this node (for root node, this is max msn for the tree)
...@@ -241,6 +230,86 @@ struct brtnode { ...@@ -241,6 +230,86 @@ struct brtnode {
struct brtnode_partition *bp; struct brtnode_partition *bp;
}; };
// brtnode partition macros
#define BP_BLOCKNUM(node,i) ((node)->bp[i].blocknum)
#define BP_HAVE_FULLHASH(node,i) ((node)->bp[i].have_fullhash)
#define BP_FULLHASH(node,i) ((node)->bp[i].fullhash)
#define BP_STATE(node,i) ((node)->bp[i].state)
#define BP_OFFSET(node,i) ((node)->bp[i].offset)
#define BP_SUBTREE_EST(node,i) ((node)->bp[i].subtree_estimates)
#define BP_WORKDONE(node, i)((node)->bp[i].workdone)
//
// macros for managing a node's clock
// Should be managed by brt.c, NOT by serialize/deserialize
//
#define BP_TOUCH_CLOCK(node, i) ((node)->bp[i].clock_count = 1)
#define BP_SWEEP_CLOCK(node, i) ((node)->bp[i].clock_count = 0)
#define BP_SHOULD_EVICT(node, i) ((node)->bp[i].clock_count == 0)
// not crazy about having these two here, one is for the case where we create new
// nodes, such as in splits and creating new roots, and the other is for when
// we are deserializing a node and not all bp's are touched
#define BP_INIT_TOUCHED_CLOCK(node, i) ((node)->bp[i].clock_count = 1)
#define BP_INIT_UNTOUCHED_CLOCK(node, i) ((node)->bp[i].clock_count = 0)
// internal node macros
static inline void set_BNULL(BRTNODE node, int i) {
assert(0<=i && i<node->n_children);
node->bp[i].ptr.tag = BCT_NULL;
}
static inline bool is_BNULL (BRTNODE node, int i) {
assert(0<=i && i<node->n_children);
return node->bp[i].ptr.tag == BCT_NULL;
}
static inline NONLEAF_CHILDINFO BNC(BRTNODE node, int i) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER p = node->bp[i].ptr;
assert(p.tag==BCT_NONLEAF);
return p.u.nonleaf;
}
static inline void set_BNC(BRTNODE node, int i, NONLEAF_CHILDINFO nl) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER *p = &node->bp[i].ptr;
p->tag = BCT_NONLEAF;
p->u.nonleaf = nl;
}
static inline BASEMENTNODE BLB(BRTNODE node, int i) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER p = node->bp[i].ptr;
assert(p.tag==BCT_LEAF);
return p.u.leaf;
}
static inline void set_BLB(BRTNODE node, int i, BASEMENTNODE bn) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER *p = &node->bp[i].ptr;
p->tag = BCT_LEAF;
p->u.leaf = bn;
}
static inline SUB_BLOCK BSB(BRTNODE node, int i) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER p = node->bp[i].ptr;
assert(p.tag==BCT_SUBBLOCK);
return p.u.subblock;
}
static inline void set_BSB(BRTNODE node, int i, SUB_BLOCK sb) {
assert(0<=i && i<node->n_children);
BRTNODE_CHILD_POINTER *p = &node->bp[i].ptr;
p->tag = BCT_SUBBLOCK;
p->u.subblock = sb;
}
#define BNC_BUFFER(node,i) (BNC(node,i)->buffer)
#define BNC_NBYTESINBUF(node,i) (BNC(node,i)->n_bytes_in_buffer)
#define BNC_NBYTESINBUF(node,i) (BNC(node,i)->n_bytes_in_buffer)
// leaf node macros
#define BLB_OPTIMIZEDFORUPGRADE(node,i) (BLB(node,i)->optimized_for_upgrade)
#define BLB_SOFTCOPYISUPTODATE(node,i) (BLB(node,i)->soft_copy_is_up_to_date)
#define BLB_BUFFER(node,i) (BLB(node,i)->buffer)
#define BLB_NBYTESINBUF(node,i) (BLB(node,i)->n_bytes_in_buffer)
#define BLB_SEQINSERT(node,i) (BLB(node,i)->seqinsert)
/* pivot flags (must fit in 8 bits) */ /* pivot flags (must fit in 8 bits) */
enum { enum {
BRT_PIVOT_TRUNC = 4, BRT_PIVOT_TRUNC = 4,
...@@ -354,7 +423,11 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h, int6 ...@@ -354,7 +423,11 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h, int6
int toku_deserialize_brtheader_from (int fd, LSN max_acceptable_lsn, struct brt_header **brth); int toku_deserialize_brtheader_from (int fd, LSN max_acceptable_lsn, struct brt_header **brth);
int toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF offset); int toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF offset);
void toku_serialize_descriptor_contents_to_wbuf(struct wbuf *wb, const DESCRIPTOR desc); void toku_serialize_descriptor_contents_to_wbuf(struct wbuf *wb, const DESCRIPTOR desc);
void toku_setup_empty_bn(BASEMENTNODE bn); BASEMENTNODE toku_create_empty_bn(void);
BASEMENTNODE toku_create_empty_bn_no_buffer(void); // create a basement node with a null buffer.
NONLEAF_CHILDINFO toku_create_empty_nl(void);
void destroy_basement_node (BASEMENTNODE bn);
void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl);
void toku_destroy_brtnode_internals(BRTNODE node); void toku_destroy_brtnode_internals(BRTNODE node);
void toku_brtnode_free (BRTNODE *node); void toku_brtnode_free (BRTNODE *node);
void toku_assert_entire_node_in_memory(BRTNODE node); void toku_assert_entire_node_in_memory(BRTNODE node);
...@@ -420,9 +493,9 @@ struct brt_cursor { ...@@ -420,9 +493,9 @@ struct brt_cursor {
typedef struct ancestors *ANCESTORS; typedef struct ancestors *ANCESTORS;
struct ancestors { struct ancestors {
BRTNODE node; BRTNODE node; // This is the root node if next is NULL.
int childnum; // which buffer holds our ancestors. int childnum; // which buffer holds messages destined to the node whose ancestors this list represents.
ANCESTORS next; ANCESTORS next; // Parent of this node (so next->node.(next->childnum) refers to this node).
}; };
struct pivot_bounds { struct pivot_bounds {
struct kv_pair const * const lower_bound_exclusive; struct kv_pair const * const lower_bound_exclusive;
...@@ -536,11 +609,13 @@ brt_leaf_apply_cmd_once ( ...@@ -536,11 +609,13 @@ brt_leaf_apply_cmd_once (
const BRT_MSG cmd, const BRT_MSG cmd,
u_int32_t idx, u_int32_t idx,
LEAFENTRY le, LEAFENTRY le,
TOKULOGGER logger TOKULOGGER logger,
uint64_t *workdonep
); );
void brt_leaf_put_cmd (BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRT_MSG cmd, bool *made_change, uint64_t *workdonep);
void void
toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, int *made_change); toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, uint64_t *workdonep);
void toku_reset_root_xid_that_created(BRT brt, TXNID new_root_xid_that_created); void toku_reset_root_xid_that_created(BRT brt, TXNID new_root_xid_that_created);
// Reset the root_xid_that_created field to the given value. // Reset the root_xid_that_created field to the given value.
......
...@@ -372,8 +372,8 @@ serialize_brtnode_info_size(BRTNODE node) ...@@ -372,8 +372,8 @@ serialize_brtnode_info_size(BRTNODE node)
static void static void
serialize_brtnode_info( serialize_brtnode_info(
BRTNODE node, BRTNODE node,
struct sub_block *sb_parts, SUB_BLOCK sb_parts,
struct sub_block *sb // output SUB_BLOCK sb // output
) )
{ {
assert(sb->uncompressed_size == 0); assert(sb->uncompressed_size == 0);
...@@ -537,10 +537,7 @@ rebalance_brtnode_leaf(BRTNODE node) ...@@ -537,10 +537,7 @@ rebalance_brtnode_leaf(BRTNODE node)
node->n_children = num_children; node->n_children = num_children;
XMALLOC_N(num_children, node->bp); XMALLOC_N(num_children, node->bp);
for (int i = 0; i < num_children; i++) { for (int i = 0; i < num_children; i++) {
node->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node)); set_BLB(node, i, toku_create_empty_bn());
BASEMENTNODE bn = (BASEMENTNODE)node->bp[i].ptr;
memset(bn, 0, sizeof(struct brtnode_leaf_basement_node));
toku_setup_empty_bn(bn);
} }
// now we start to fill in the data // now we start to fill in the data
...@@ -615,7 +612,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node, ...@@ -615,7 +612,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
struct sub_block sb[npartitions]; struct sub_block sb[npartitions];
struct sub_block sb_node_info; struct sub_block sb_node_info;
for (int i = 0; i < npartitions; i++) { for (int i = 0; i < npartitions; i++) {
sub_block_init(&sb[i]); sub_block_init(&sb[i]);;
} }
sub_block_init(&sb_node_info); sub_block_init(&sb_node_info);
...@@ -753,6 +750,7 @@ deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf) { ...@@ -753,6 +750,7 @@ deserialize_child_buffer(BRTNODE node, int cnum, struct rbuf *rbuf) {
invariant(rbuf->ndone == rbuf->size); invariant(rbuf->ndone == rbuf->size);
BNC_NBYTESINBUF(node, cnum) = n_bytes_in_buffer; BNC_NBYTESINBUF(node, cnum) = n_bytes_in_buffer;
BP_WORKDONE(node, cnum) = 0;
} }
// dump a buffer to stderr // dump a buffer to stderr
...@@ -791,14 +789,46 @@ dump_bad_block(unsigned char *vp, u_int64_t size) { ...@@ -791,14 +789,46 @@ dump_bad_block(unsigned char *vp, u_int64_t size) {
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////
void toku_setup_empty_bn(BASEMENTNODE bn) { BASEMENTNODE toku_create_empty_bn(void) {
bn->soft_copy_is_up_to_date = TRUE; BASEMENTNODE bn = toku_create_empty_bn_no_buffer();
int r; int r;
r = toku_omt_create(&bn->buffer); r = toku_omt_create(&bn->buffer);
assert_zero(r); assert_zero(r);
return bn;
}
BASEMENTNODE toku_create_empty_bn_no_buffer(void) {
BASEMENTNODE XMALLOC(bn);
bn->soft_copy_is_up_to_date = TRUE;
bn->buffer = NULL;
bn->n_bytes_in_buffer = 0; bn->n_bytes_in_buffer = 0;
bn->seqinsert = 0; bn->seqinsert = 0;
bn->optimized_for_upgrade = 0; bn->optimized_for_upgrade = 0;
bn->max_msn_applied = ZERO_MSN;
return bn;
}
NONLEAF_CHILDINFO toku_create_empty_nl(void) {
NONLEAF_CHILDINFO XMALLOC(cn);
cn->n_bytes_in_buffer = 0;
int r = toku_fifo_create(&cn->buffer);
assert(r==0);
return cn;
}
void destroy_basement_node (BASEMENTNODE bn)
{
// The buffer may have been freed already, in some cases.
if (bn->buffer) {
toku_omt_destroy(&bn->buffer);
}
toku_free(bn);
}
void destroy_nonleaf_childinfo (NONLEAF_CHILDINFO nl)
{
toku_fifo_free(&nl->buffer);
toku_free(nl);
} }
// //
...@@ -939,6 +969,7 @@ deserialize_brtnode_info( ...@@ -939,6 +969,7 @@ deserialize_brtnode_info(
BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb); BP_BLOCKNUM(node,i) = rbuf_blocknum(&rb);
BP_HAVE_FULLHASH(node, i) = FALSE; BP_HAVE_FULLHASH(node, i) = FALSE;
BP_FULLHASH(node,i) = 0; BP_FULLHASH(node,i) = 0;
BP_WORKDONE(node, i) = 0;
} }
} }
...@@ -957,14 +988,10 @@ deserialize_brtnode_info( ...@@ -957,14 +988,10 @@ deserialize_brtnode_info(
static void static void
setup_available_brtnode_partition(BRTNODE node, int i) { setup_available_brtnode_partition(BRTNODE node, int i) {
if (node->height == 0) { if (node->height == 0) {
node->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node)); set_BLB(node, i, toku_create_empty_bn());
BASEMENTNODE bn = (BASEMENTNODE)node->bp[i].ptr;
toku_setup_empty_bn(bn);
} }
else { else {
node->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_nonleaf_childinfo)); set_BNC(node, i, toku_create_empty_nl());
int r = toku_fifo_create(&BNC_BUFFER(node,i));
assert(r == 0);
} }
} }
...@@ -995,8 +1022,7 @@ setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* bfe) { ...@@ -995,8 +1022,7 @@ setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* bfe) {
BP_TOUCH_CLOCK(node,i); BP_TOUCH_CLOCK(node,i);
} }
else if (BP_STATE(node,i) == PT_COMPRESSED) { else if (BP_STATE(node,i) == PT_COMPRESSED) {
node->bp[i].ptr = toku_xmalloc(sizeof(struct sub_block)); set_BSB(node, i, sub_block_creat());
sub_block_init((struct sub_block*)node->bp[i].ptr);
} }
else { else {
assert(FALSE); assert(FALSE);
...@@ -1131,7 +1157,7 @@ deserialize_brtnode_from_rbuf( ...@@ -1131,7 +1157,7 @@ deserialize_brtnode_from_rbuf(
rbuf_init(&curr_rbuf, rb->buf + rb->ndone + curr_offset, curr_size); rbuf_init(&curr_rbuf, rb->buf + rb->ndone + curr_offset, curr_size);
struct sub_block curr_sb; struct sub_block curr_sb;
sub_block_init(&curr_sb); sub_block_init(&curr_sb);
// //
// now we are at the point where we have: // now we are at the point where we have:
...@@ -1158,7 +1184,7 @@ deserialize_brtnode_from_rbuf( ...@@ -1158,7 +1184,7 @@ deserialize_brtnode_from_rbuf(
// case where we leave the partition in the compressed state // case where we leave the partition in the compressed state
else if (BP_STATE(node,i) == PT_COMPRESSED) { else if (BP_STATE(node,i) == PT_COMPRESSED) {
read_compressed_sub_block(&curr_rbuf, &curr_sb); read_compressed_sub_block(&curr_rbuf, &curr_sb);
struct sub_block* bp_sb = (struct sub_block*)node->bp[i].ptr; SUB_BLOCK bp_sb = BSB(node, i);
bp_sb->compressed_size = curr_sb.compressed_size; bp_sb->compressed_size = curr_sb.compressed_size;
bp_sb->uncompressed_size = curr_sb.uncompressed_size; bp_sb->uncompressed_size = curr_sb.uncompressed_size;
bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size); bp_sb->compressed_ptr = toku_xmalloc(bp_sb->compressed_size);
...@@ -1168,7 +1194,6 @@ deserialize_brtnode_from_rbuf( ...@@ -1168,7 +1194,6 @@ deserialize_brtnode_from_rbuf(
bp_sb->compressed_size bp_sb->compressed_size
); );
} }
} }
*brtnode = node; *brtnode = node;
r = 0; r = 0;
...@@ -1182,7 +1207,7 @@ deserialize_brtnode_from_rbuf( ...@@ -1182,7 +1207,7 @@ deserialize_brtnode_from_rbuf(
void void
toku_deserialize_bp_from_disk(BRTNODE node, int childnum, int fd, struct brtnode_fetch_extra* bfe) { toku_deserialize_bp_from_disk(BRTNODE node, int childnum, int fd, struct brtnode_fetch_extra* bfe) {
assert(BP_STATE(node,childnum) == PT_ON_DISK); assert(BP_STATE(node,childnum) == PT_ON_DISK);
assert(node->bp[childnum].ptr == NULL); assert(node->bp[childnum].ptr.tag == BCT_NULL);
// //
// setup the partition // setup the partition
...@@ -1229,7 +1254,7 @@ toku_deserialize_bp_from_disk(BRTNODE node, int childnum, int fd, struct brtnode ...@@ -1229,7 +1254,7 @@ toku_deserialize_bp_from_disk(BRTNODE node, int childnum, int fd, struct brtnode
void void
toku_deserialize_bp_from_compressed(BRTNODE node, int childnum) { toku_deserialize_bp_from_compressed(BRTNODE node, int childnum) {
assert(BP_STATE(node, childnum) == PT_COMPRESSED); assert(BP_STATE(node, childnum) == PT_COMPRESSED);
struct sub_block* curr_sb = (struct sub_block*)node->bp[childnum].ptr; SUB_BLOCK curr_sb = BSB(node, childnum);
assert(curr_sb->uncompressed_ptr == NULL); assert(curr_sb->uncompressed_ptr == NULL);
curr_sb->uncompressed_ptr = toku_xmalloc(curr_sb->uncompressed_size); curr_sb->uncompressed_ptr = toku_xmalloc(curr_sb->uncompressed_size);
...@@ -2012,7 +2037,7 @@ serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calc ...@@ -2012,7 +2037,7 @@ serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calc
static int static int
serialize_uncompressed_block_to_memory(char * uncompressed_buf, serialize_uncompressed_block_to_memory(char * uncompressed_buf,
int n_sub_blocks, int n_sub_blocks,
struct sub_block sub_block[n_sub_blocks], struct sub_block sub_block[/*n_sub_blocks*/],
/*out*/ size_t *n_bytes_to_write, /*out*/ size_t *n_bytes_to_write,
/*out*/ char **bytes_to_write) { /*out*/ char **bytes_to_write) {
// allocate space for the compressed uncompressed_buf // allocate space for the compressed uncompressed_buf
......
...@@ -182,7 +182,7 @@ toku_verify_brtnode (BRT brt, ...@@ -182,7 +182,7 @@ toku_verify_brtnode (BRT brt,
}); });
} }
else { else {
BASEMENTNODE bn = (BASEMENTNODE)node->bp[i].ptr; BASEMENTNODE bn = BLB(node, i);
for (u_int32_t j = 0; j < toku_omt_size(bn->buffer); j++) { for (u_int32_t j = 0; j < toku_omt_size(bn->buffer); j++) {
VERIFY_ASSERTION((rootmsn.msn >= thismsn.msn), 0, "leaf may have latest msn, but cannot be greater than root msn"); VERIFY_ASSERTION((rootmsn.msn >= thismsn.msn), 0, "leaf may have latest msn, but cannot be greater than root msn");
LEAFENTRY le = get_ith_leafentry(bn, j); LEAFENTRY le = get_ith_leafentry(bn, j);
......
...@@ -189,9 +189,12 @@ get_node_reactivity (BRTNODE node) { ...@@ -189,9 +189,12 @@ get_node_reactivity (BRTNODE node) {
return get_nonleaf_reactivity(node); return get_nonleaf_reactivity(node);
} }
static BOOL // return TRUE if the size of the buffers plus the amount of work done is large enough. (But return false if there is nothing to be flushed (the buffers empty)).
static bool
nonleaf_node_is_gorged (BRTNODE node) { nonleaf_node_is_gorged (BRTNODE node) {
BOOL buffers_are_empty = TRUE; u_int64_t size = toku_serialize_brtnode_size(node);
bool buffers_are_empty = TRUE;
toku_assert_entire_node_in_memory(node); toku_assert_entire_node_in_memory(node);
assert(node->height > 0); assert(node->height > 0);
for (int child = 0; child < node->n_children; ++child) { for (int child = 0; child < node->n_children; ++child) {
...@@ -199,10 +202,11 @@ nonleaf_node_is_gorged (BRTNODE node) { ...@@ -199,10 +202,11 @@ nonleaf_node_is_gorged (BRTNODE node) {
buffers_are_empty = FALSE; buffers_are_empty = FALSE;
break; break;
} }
size += BP_WORKDONE(node, child);
} }
return (BOOL)((toku_serialize_brtnode_size(node) > node->nodesize) return ((size > node->nodesize)
&& &&
(!buffers_are_empty)); (!buffers_are_empty));
} }
static void brtnode_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd); static void brtnode_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd);
...@@ -251,7 +255,7 @@ static u_int32_t compute_child_fullhash (CACHEFILE cf, BRTNODE node, int childnu ...@@ -251,7 +255,7 @@ static u_int32_t compute_child_fullhash (CACHEFILE cf, BRTNODE node, int childnu
abort(); return 0; abort(); return 0;
} }
static void maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds); static void maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool *made_change);
static long brtnode_memory_size (BRTNODE node); static long brtnode_memory_size (BRTNODE node);
...@@ -277,7 +281,8 @@ int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash, ...@@ -277,7 +281,8 @@ int toku_pin_brtnode (BRT brt, BLOCKNUM blocknum, u_int32_t fullhash,
unlockers); unlockers);
if (r==0) { if (r==0) {
BRTNODE node = node_v; BRTNODE node = node_v;
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds); bool made_change;
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds, &made_change);
*node_p = node; *node_p = node;
// printf("%*sPin %ld\n", 8-node->height, "", blocknum.b); // printf("%*sPin %ld\n", 8-node->height, "", blocknum.b);
} else { } else {
...@@ -308,7 +313,8 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha ...@@ -308,7 +313,8 @@ void toku_pin_brtnode_holding_lock (BRT brt, BLOCKNUM blocknum, u_int32_t fullha
); );
assert(r==0); assert(r==0);
BRTNODE node = node_v; BRTNODE node = node_v;
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds); bool made_change;
maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds, &made_change);
*node_p = node; *node_p = node;
} }
...@@ -478,18 +484,18 @@ brtnode_memory_size (BRTNODE node) ...@@ -478,18 +484,18 @@ brtnode_memory_size (BRTNODE node)
continue; continue;
} }
else if (BP_STATE(node,i) == PT_COMPRESSED) { else if (BP_STATE(node,i) == PT_COMPRESSED) {
struct sub_block* sb = node->bp[i].ptr; SUB_BLOCK sb = BSB(node, i);
retval += sizeof(*sb); retval += sizeof(*sb);
retval += sb->compressed_size; retval += sb->compressed_size;
} }
else if (BP_STATE(node,i) == PT_AVAIL) { else if (BP_STATE(node,i) == PT_AVAIL) {
if (node->height > 0) { if (node->height > 0) {
NONLEAF_CHILDINFO childinfo = node->bp[i].ptr; NONLEAF_CHILDINFO childinfo = BNC(node, i);
retval += sizeof(*childinfo); retval += sizeof(*childinfo);
retval += toku_fifo_memory_size(BNC_BUFFER(node, i)); retval += toku_fifo_memory_size(BNC_BUFFER(node, i));
} }
else { else {
BASEMENTNODE bn = node->bp[i].ptr; BASEMENTNODE bn = BLB(node, i);
retval += sizeof(*bn); retval += sizeof(*bn);
retval += BLB_NBYTESINBUF(node,i); retval += BLB_NBYTESINBUF(node,i);
OMT curr_omt = BLB_BUFFER(node, i); OMT curr_omt = BLB_BUFFER(node, i);
...@@ -513,17 +519,6 @@ next_dict_id(void) { ...@@ -513,17 +519,6 @@ next_dict_id(void) {
return d; return d;
} }
static void
destroy_basement_node (BASEMENTNODE bn)
{
// The buffer may have been freed already, in some cases.
if (bn->buffer) {
toku_omt_destroy(&bn->buffer);
bn->buffer = NULL;
}
}
u_int8_t u_int8_t
toku_brtnode_partition_state (struct brtnode_fetch_extra* bfe, int childnum) toku_brtnode_partition_state (struct brtnode_fetch_extra* bfe, int childnum)
{ {
...@@ -583,6 +578,7 @@ int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM noden ...@@ -583,6 +578,7 @@ int toku_brtnode_fetch_callback (CACHEFILE UU(cachefile), int fd, BLOCKNUM noden
*sizep = brtnode_memory_size(*result); *sizep = brtnode_memory_size(*result);
*dirtyp = (*result)->dirty; *dirtyp = (*result)->dirty;
} }
// printf("fetch node %"PRIu64"\n", nodename.b);
return r; return r;
} }
...@@ -608,22 +604,20 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_ ...@@ -608,22 +604,20 @@ int toku_brtnode_pe_callback (void *brtnode_pv, long bytes_to_free, long* bytes_
for (int i = 0; i < node->n_children; i++) { for (int i = 0; i < node->n_children; i++) {
// Get rid of compressed stuff no matter what. // Get rid of compressed stuff no matter what.
if (BP_STATE(node,i) == PT_COMPRESSED) { if (BP_STATE(node,i) == PT_COMPRESSED) {
struct sub_block* sb = node->bp[i].ptr; SUB_BLOCK sb = BSB(node, i);
toku_free(sb->compressed_ptr); toku_free(sb->compressed_ptr);
toku_free(node->bp[i].ptr); toku_free(sb);
node->bp[i].ptr = NULL; set_BNULL(node, i);
BP_STATE(node,i) = PT_ON_DISK; BP_STATE(node,i) = PT_ON_DISK;
} }
else if (BP_STATE(node,i) == PT_AVAIL) { else if (BP_STATE(node,i) == PT_AVAIL) {
if (BP_SHOULD_EVICT(node,i)) { if (BP_SHOULD_EVICT(node,i)) {
// free the basement node // free the basement node
BASEMENTNODE bn = node->bp[i].ptr; BASEMENTNODE bn = BLB(node, i);
OMT curr_omt = BLB_BUFFER(node, i); OMT curr_omt = BLB_BUFFER(node, i);
toku_omt_free_items(curr_omt); toku_omt_free_items(curr_omt);
destroy_basement_node(bn); destroy_basement_node(bn);
set_BNULL(node,i);
toku_free(node->bp[i].ptr);
node->bp[i].ptr = NULL;
BP_STATE(node,i) = PT_ON_DISK; BP_STATE(node,i) = PT_ON_DISK;
} }
else { else {
...@@ -782,24 +776,16 @@ void toku_destroy_brtnode_internals(BRTNODE node) ...@@ -782,24 +776,16 @@ void toku_destroy_brtnode_internals(BRTNODE node)
for (int i=0; i < node->n_children; i++) { for (int i=0; i < node->n_children; i++) {
if (BP_STATE(node,i) == PT_AVAIL) { if (BP_STATE(node,i) == PT_AVAIL) {
if (node->height > 0) { if (node->height > 0) {
if (BNC_BUFFER(node,i)) { destroy_nonleaf_childinfo(BNC(node,i));
toku_fifo_free(&BNC_BUFFER(node,i)); } else {
} destroy_basement_node(BLB(node, i));
}
else {
BASEMENTNODE bn = node->bp[i].ptr;
destroy_basement_node(bn);
} }
} else if (BP_STATE(node,i) == PT_COMPRESSED) {
toku_free(BSB(node,i));
} else {
assert(is_BNULL(node, i));
} }
else if (BP_STATE(node,i) == PT_COMPRESSED) { set_BNULL(node, i);
struct sub_block* sb = node->bp[i].ptr;
toku_free(sb->compressed_ptr);
}
else {
assert(node->bp[i].ptr == NULL);
}
// otherwise, there is nothing
toku_free(node->bp[i].ptr);
} }
toku_free(node->bp); toku_free(node->bp);
node->bp = NULL; node->bp = NULL;
...@@ -913,20 +899,13 @@ toku_initialize_empty_brtnode (BRTNODE n, BLOCKNUM nodename, int height, int num ...@@ -913,20 +899,13 @@ toku_initialize_empty_brtnode (BRTNODE n, BLOCKNUM nodename, int height, int num
BP_STATE(n,i) = PT_INVALID; BP_STATE(n,i) = PT_INVALID;
BP_OFFSET(n,i) = 0; BP_OFFSET(n,i) = 0;
BP_SUBTREE_EST(n,i) = zero_estimates; BP_SUBTREE_EST(n,i) = zero_estimates;
BP_WORKDONE(n,i) = 0;
BP_INIT_TOUCHED_CLOCK(n, i); BP_INIT_TOUCHED_CLOCK(n, i);
n->bp[i].ptr = NULL; set_BNULL(n,i);
if (height > 0) { if (height > 0) {
n->bp[i].ptr = toku_malloc(sizeof(struct brtnode_nonleaf_childinfo)); set_BNC(n, i, toku_create_empty_nl());
memset(n->bp[i].ptr, 0, sizeof(struct brtnode_nonleaf_childinfo)); } else {
int r = toku_fifo_create(&BNC_BUFFER(n,i)); set_BLB(n, i, toku_create_empty_bn());
assert_zero(r);
BNC_NBYTESINBUF(n,i) = 0;
}
else {
n->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node));
BASEMENTNODE bn = n->bp[i].ptr;
memset(bn, 0, sizeof(struct brtnode_leaf_basement_node));
toku_setup_empty_bn(bn);
} }
} }
} }
...@@ -953,8 +932,6 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r ...@@ -953,8 +932,6 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
newroot->totalchildkeylens=splitk.size; newroot->totalchildkeylens=splitk.size;
BP_BLOCKNUM(newroot,0)=nodea->thisnodename; BP_BLOCKNUM(newroot,0)=nodea->thisnodename;
BP_BLOCKNUM(newroot,1)=nodeb->thisnodename; BP_BLOCKNUM(newroot,1)=nodeb->thisnodename;
BP_HAVE_FULLHASH(newroot, 0) = FALSE;
BP_HAVE_FULLHASH(newroot, 1) = FALSE;
fixup_child_estimates(newroot, 0, nodea, TRUE); fixup_child_estimates(newroot, 0, nodea, TRUE);
fixup_child_estimates(newroot, 1, nodeb, TRUE); fixup_child_estimates(newroot, 1, nodeb, TRUE);
{ {
...@@ -1006,11 +983,8 @@ init_childinfo(BRTNODE node, int childnum, BRTNODE child) { ...@@ -1006,11 +983,8 @@ init_childinfo(BRTNODE node, int childnum, BRTNODE child) {
BP_STATE(node,childnum) = PT_AVAIL; BP_STATE(node,childnum) = PT_AVAIL;
BP_OFFSET(node,childnum) = 0; BP_OFFSET(node,childnum) = 0;
BP_SUBTREE_EST(node,childnum) = zero_estimates; BP_SUBTREE_EST(node,childnum) = zero_estimates;
node->bp[childnum].ptr = toku_malloc(sizeof(struct brtnode_nonleaf_childinfo)); BP_WORKDONE(node, childnum) = 0;
assert(node->bp[childnum].ptr); set_BNC(node, childnum, toku_create_empty_nl());
BNC_NBYTESINBUF(node,childnum) = 0;
int r = toku_fifo_create(&BNC_BUFFER(node,childnum));
resource_assert_zero(r);
} }
static void static void
...@@ -1040,7 +1014,7 @@ static struct pivot_bounds next_pivot_keys (BRTNODE node, int childnum, struct p ...@@ -1040,7 +1014,7 @@ static struct pivot_bounds next_pivot_keys (BRTNODE node, int childnum, struct p
return pb; return pb;
} }
// append a child node to a parent node // Used only by test programs: append a child node to a parent node
void void
toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivotkey, size_t pivotkeysize) { toku_brt_nonleaf_append_child(BRTNODE node, BRTNODE child, struct kv_pair *pivotkey, size_t pivotkeysize) {
int childnum = node->n_children; int childnum = node->n_children;
...@@ -1212,6 +1186,7 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, ...@@ -1212,6 +1186,7 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
B = *nodeb; B = *nodeb;
REALLOC_N(num_children_in_b-1, B->childkeys); REALLOC_N(num_children_in_b-1, B->childkeys);
REALLOC_N(num_children_in_b, B->bp); REALLOC_N(num_children_in_b, B->bp);
B->n_children = num_children_in_b;
for (int i = 0; i < num_children_in_b; i++) { for (int i = 0; i < num_children_in_b; i++) {
BP_STATE(B,i) = PT_AVAIL; BP_STATE(B,i) = PT_AVAIL;
BP_OFFSET(B,i) = 0; BP_OFFSET(B,i) = 0;
...@@ -1219,9 +1194,8 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, ...@@ -1219,9 +1194,8 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
BP_FULLHASH(B,i) = 0; BP_FULLHASH(B,i) = 0;
BP_HAVE_FULLHASH(B,i) = FALSE; BP_HAVE_FULLHASH(B,i) = FALSE;
BP_SUBTREE_EST(B,i)= zero_estimates; BP_SUBTREE_EST(B,i)= zero_estimates;
B->bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node)); BP_WORKDONE(B,i) = 0;
BASEMENTNODE bn = B->bp[i].ptr; set_BLB(B, i, toku_create_empty_bn());
toku_setup_empty_bn(bn);
} }
} }
// //
...@@ -1233,7 +1207,9 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, ...@@ -1233,7 +1207,9 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
BP_STATE(B,0) = PT_AVAIL; BP_STATE(B,0) = PT_AVAIL;
struct subtree_estimates se_diff = zero_estimates; struct subtree_estimates se_diff = zero_estimates;
u_int32_t diff_size = 0; u_int32_t diff_size = 0;
destroy_basement_node ((BASEMENTNODE)B->bp[0].ptr); // Destroy B's empty OMT, so I can rebuild it from an array destroy_basement_node (BLB(B, 0)); // Destroy B's empty OMT, so I can rebuild it from an array
set_BNULL(B, 0);
set_BLB(B, 0, toku_create_empty_bn_no_buffer());
move_leafentries( move_leafentries(
&BLB_BUFFER(B, 0), &BLB_BUFFER(B, 0),
BLB_BUFFER(node, split_node), BLB_BUFFER(node, split_node),
...@@ -1250,12 +1226,11 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, ...@@ -1250,12 +1226,11 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
// move the rest of the basement nodes // move the rest of the basement nodes
int curr_dest_bn_index = 1; int curr_dest_bn_index = 1;
for (int i = num_children_in_node; i < node->n_children; i++, curr_dest_bn_index++) { for (int i = num_children_in_node; i < node->n_children; i++, curr_dest_bn_index++) {
destroy_basement_node((BASEMENTNODE)B->bp[curr_dest_bn_index].ptr); destroy_basement_node(BLB(B, curr_dest_bn_index));
toku_free(B->bp[curr_dest_bn_index].ptr); set_BNULL(B, curr_dest_bn_index);
B->bp[curr_dest_bn_index] = node->bp[i]; B->bp[curr_dest_bn_index] = node->bp[i];
} }
node->n_children = num_children_in_node; node->n_children = num_children_in_node;
B->n_children = num_children_in_b;
// //
// now handle the pivots // now handle the pivots
...@@ -1274,7 +1249,6 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, ...@@ -1274,7 +1249,6 @@ brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk,
toku_brt_leaf_reset_calc_leaf_stats(node); toku_brt_leaf_reset_calc_leaf_stats(node);
toku_brt_leaf_reset_calc_leaf_stats(B); toku_brt_leaf_reset_calc_leaf_stats(B);
} }
if (splitk) { if (splitk) {
memset(splitk, 0, sizeof *splitk); memset(splitk, 0, sizeof *splitk);
OMTVALUE lev = 0; OMTVALUE lev = 0;
...@@ -1336,10 +1310,7 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl ...@@ -1336,10 +1310,7 @@ brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
// in anything for the bp's. // in anything for the bp's.
// Now we have to go free what it just created so we can // Now we have to go free what it just created so we can
// slide the bp over // slide the bp over
if (BNC_BUFFER(B,targchild)) { destroy_nonleaf_childinfo(BNC(B, targchild));
toku_fifo_free(&BNC_BUFFER(B,targchild));
}
toku_free(B->bp[targchild].ptr);
// now move the bp over // now move the bp over
B->bp[targchild] = node->bp[i]; B->bp[targchild] = node->bp[i];
memset(&node->bp[i], 0, sizeof(node->bp[0])); memset(&node->bp[i], 0, sizeof(node->bp[0]));
...@@ -1402,7 +1373,6 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -1402,7 +1373,6 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
int old_count = BNC_NBYTESINBUF(node, childnum); int old_count = BNC_NBYTESINBUF(node, childnum);
assert(old_count==0); assert(old_count==0);
int cnum; int cnum;
int r;
WHEN_NOT_GCOV( WHEN_NOT_GCOV(
if (toku_brt_debug_mode) { if (toku_brt_debug_mode) {
int i; int i;
...@@ -1433,15 +1403,13 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum, ...@@ -1433,15 +1403,13 @@ handle_split_of_child (BRT t, BRTNODE node, int childnum,
BP_HAVE_FULLHASH(node, childnum+1) = TRUE; BP_HAVE_FULLHASH(node, childnum+1) = TRUE;
BP_FULLHASH(node, childnum+1) = childb->fullhash; BP_FULLHASH(node, childnum+1) = childb->fullhash;
BP_SUBTREE_EST(node,childnum+1) = zero_estimates; BP_SUBTREE_EST(node,childnum+1) = zero_estimates;
BP_WORKDONE(node, childnum+1) = 0;
BP_STATE(node,childnum+1) = PT_AVAIL; BP_STATE(node,childnum+1) = PT_AVAIL;
BP_OFFSET(node,childnum+1) = 0; BP_OFFSET(node,childnum+1) = 0;
fixup_child_estimates(node, childnum, childa, TRUE); fixup_child_estimates(node, childnum, childa, TRUE);
fixup_child_estimates(node, childnum+1, childb, TRUE); fixup_child_estimates(node, childnum+1, childb, TRUE);
node->bp[childnum+1].ptr = toku_malloc(sizeof(struct brtnode_nonleaf_childinfo)); set_BNC(node, childnum+1, toku_create_empty_nl());
assert(node->bp[childnum+1].ptr);
r=toku_fifo_create(&BNC_BUFFER(node,childnum+1)); assert_zero(r);
BNC_NBYTESINBUF(node, childnum+1) = 0;
// Slide the keys over // Slide the keys over
{ {
...@@ -1578,16 +1546,25 @@ brt_leaf_apply_cmd_once ( ...@@ -1578,16 +1546,25 @@ brt_leaf_apply_cmd_once (
const BRT_MSG cmd, const BRT_MSG cmd,
u_int32_t idx, u_int32_t idx,
LEAFENTRY le, LEAFENTRY le,
TOKULOGGER logger TOKULOGGER logger,
uint64_t *workdonep
) )
// Effect: Apply cmd to leafentry (msn is ignored) // Effect: Apply cmd to leafentry (msn is ignored)
// Calculate work done by message on leafentry and return it to caller.
// idx is the location where it goes // idx is the location where it goes
// le is old leafentry // le is old leafentry
{ {
// brt_leaf_check_leaf_stats(node); // brt_leaf_check_leaf_stats(node);
size_t newlen=0, newdisksize=0; size_t newlen=0, newdisksize=0, oldsize=0, workdone=0;
LEAFENTRY new_le=0; LEAFENTRY new_le=0;
if (le)
oldsize = leafentry_memsize(le);
// This function may call mempool_malloc_dont_release() to allocate more space.
// That means the old pointers are guaranteed to still be good, but the data may have been copied into a new mempool.
// We'll have to release the old mempool later.
{ {
OMT snapshot_txnids = logger ? logger->snapshot_txnids : NULL; OMT snapshot_txnids = logger ? logger->snapshot_txnids : NULL;
OMT live_list_reverse = logger ? logger->live_list_reverse : NULL; OMT live_list_reverse = logger ? logger->live_list_reverse : NULL;
...@@ -1617,13 +1594,15 @@ brt_leaf_apply_cmd_once ( ...@@ -1617,13 +1594,15 @@ brt_leaf_apply_cmd_once (
{ int r = toku_omt_set_at(bn->buffer, new_le, idx); assert(r==0); } { int r = toku_omt_set_at(bn->buffer, new_le, idx); assert(r==0); }
toku_free(le); toku_free(le);
workdone = (oldsize > newlen ? oldsize : newlen); // work done is max of le size before and after message application
} else { } else {
if (le) { if (le) {
brt_leaf_delete_leafentry (bn, se, idx, le); brt_leaf_delete_leafentry (bn, se, idx, le);
toku_free(le); toku_free(le);
workdone = oldsize;
} }
if (new_le) { if (new_le) {
int r = toku_omt_insert_at(bn->buffer, new_le, idx); int r = toku_omt_insert_at(bn->buffer, new_le, idx);
assert(r==0); assert(r==0);
...@@ -1632,10 +1611,12 @@ brt_leaf_apply_cmd_once ( ...@@ -1632,10 +1611,12 @@ brt_leaf_apply_cmd_once (
se->dsize += le_latest_vallen(new_le) + le_keylen(new_le); se->dsize += le_latest_vallen(new_le) + le_keylen(new_le);
assert(se->dsize < (1U<<31)); // make sure we didn't underflow assert(se->dsize < (1U<<31)); // make sure we didn't underflow
se->ndata++; se->ndata++;
// Look at the key to the left and the one to the right. If both are different then increment nkeys. workdone = newlen;
bump_nkeys(se, +1); }
}
} }
if (workdonep) // test programs may call with NULL
*workdonep = workdone;
// brt_leaf_check_leaf_stats(node); // brt_leaf_check_leaf_stats(node);
} }
...@@ -1655,6 +1636,7 @@ struct setval_extra_s { ...@@ -1655,6 +1636,7 @@ struct setval_extra_s {
LEAFENTRY le; LEAFENTRY le;
TOKULOGGER logger; TOKULOGGER logger;
int made_change; int made_change;
uint64_t * workdonep; // set by brt_leaf_apply_cmd_once()
}; };
/* /*
...@@ -1685,7 +1667,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) { ...@@ -1685,7 +1667,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
} }
brt_leaf_apply_cmd_once(svextra->bn, svextra->se, &msg, brt_leaf_apply_cmd_once(svextra->bn, svextra->se, &msg,
svextra->idx, svextra->le, svextra->idx, svextra->le,
svextra->logger); svextra->logger, svextra->workdonep);
svextra->setval_r = 0; svextra->setval_r = 0;
} }
svextra->made_change = TRUE; svextra->made_change = TRUE;
...@@ -1703,7 +1685,7 @@ toku_update_get_status(UPDATE_STATUS s) { ...@@ -1703,7 +1685,7 @@ toku_update_get_status(UPDATE_STATUS s) {
// would be to put a dummy msn in the messages created by setval_fun(), but preserving // would be to put a dummy msn in the messages created by setval_fun(), but preserving
// the original msn seems cleaner and it preserves accountability at a lower layer. // the original msn seems cleaner and it preserves accountability at a lower layer.
static int do_update(BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRT_MSG cmd, int idx, static int do_update(BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRT_MSG cmd, int idx,
LEAFENTRY le, TOKULOGGER logger, int* made_change) { LEAFENTRY le, TOKULOGGER logger, bool* made_change, uint64_t * workdonep) {
LEAFENTRY le_for_update; LEAFENTRY le_for_update;
DBT key; DBT key;
const DBT *keyp; const DBT *keyp;
...@@ -1745,7 +1727,7 @@ static int do_update(BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRT_MSG cmd, int id ...@@ -1745,7 +1727,7 @@ static int do_update(BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRT_MSG cmd, int id
} }
struct setval_extra_s setval_extra = {setval_tag, FALSE, 0, bn, se, cmd->msn, cmd->xids, struct setval_extra_s setval_extra = {setval_tag, FALSE, 0, bn, se, cmd->msn, cmd->xids,
keyp, idx, le_for_update, logger, 0}; keyp, idx, le_for_update, logger, 0, workdonep};
// call handlerton's brt->update_fun(), which passes setval_extra to setval_fun() // call handlerton's brt->update_fun(), which passes setval_extra to setval_fun()
int r = t->update_fun(t->db, int r = t->update_fun(t->db,
keyp, keyp,
...@@ -1762,17 +1744,27 @@ static int do_update(BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRT_MSG cmd, int id ...@@ -1762,17 +1744,27 @@ static int do_update(BRT t, BASEMENTNODE bn, SUBTREE_EST se, BRT_MSG cmd, int id
} }
// should be static, but used by test program(s) // should be static, but used by test program(s)
static void void
brt_leaf_put_cmd ( brt_leaf_put_cmd (
BRT t, BRT t,
BASEMENTNODE bn, BASEMENTNODE bn,
SUBTREE_EST se, SUBTREE_EST se,
BRT_MSG cmd, BRT_MSG cmd,
int* made_change bool* made_change,
uint64_t *workdonep
) )
// Effect: Put a cmd into a leaf. // Effect: Put a cmd into a leaf.
// Return the workdone counter via workdonep
// The leaf could end up "too big" or "too small". The caller must fix that up. // The leaf could end up "too big" or "too small". The caller must fix that up.
{ {
uint64_t workdone_total = 0; // may be for one row or for many (or all) rows in leaf (if broadcast message)
// ignore messages that have already been applied to this leaf
if (cmd->msn.msn <= bn->max_msn_applied.msn) {
// TODO3514 add accountability counter here
goto exit;
}
else bn->max_msn_applied = cmd->msn;
TOKULOGGER logger = toku_cachefile_logger(t->cf); TOKULOGGER logger = toku_cachefile_logger(t->cf);
...@@ -1811,8 +1803,7 @@ brt_leaf_put_cmd ( ...@@ -1811,8 +1803,7 @@ brt_leaf_put_cmd (
assert(r==0); assert(r==0);
storeddata=storeddatav; storeddata=storeddatav;
} }
brt_leaf_apply_cmd_once(bn, se, cmd, idx, storeddata, logger, &workdone_total);
brt_leaf_apply_cmd_once(bn, se, cmd, idx, storeddata, logger);
// if the insertion point is within a window of the right edge of // if the insertion point is within a window of the right edge of
// the leaf then it is sequential // the leaf then it is sequential
...@@ -1842,9 +1833,11 @@ brt_leaf_put_cmd ( ...@@ -1842,9 +1833,11 @@ brt_leaf_put_cmd (
storeddata=storeddatav; storeddata=storeddatav;
while (1) { while (1) {
uint64_t workdone_this_le = 0;
u_int32_t num_leafentries_before = toku_omt_size(bn->buffer); u_int32_t num_leafentries_before = toku_omt_size(bn->buffer);
brt_leaf_apply_cmd_once(bn, se, cmd, idx, storeddata, logger); brt_leaf_apply_cmd_once(bn, se, cmd, idx, storeddata, logger, &workdone_this_le);
workdone_total += workdone_this_le;
*made_change = 1; *made_change = 1;
{ {
...@@ -1892,7 +1885,9 @@ brt_leaf_put_cmd ( ...@@ -1892,7 +1885,9 @@ brt_leaf_put_cmd (
storeddata=storeddatav; storeddata=storeddatav;
int deleted = 0; int deleted = 0;
if (!le_is_clean(storeddata)) { //If already clean, nothing to do. if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
brt_leaf_apply_cmd_once(bn, se, cmd, idx, storeddata, logger); uint64_t workdone_this_le = 0;
brt_leaf_apply_cmd_once(bn, se, cmd, idx, storeddata, logger, &workdone_this_le);
workdone_total += workdone_this_le;
u_int32_t new_omt_size = toku_omt_size(bn->buffer); u_int32_t new_omt_size = toku_omt_size(bn->buffer);
if (new_omt_size != omt_size) { if (new_omt_size != omt_size) {
assert(new_omt_size+1 == omt_size); assert(new_omt_size+1 == omt_size);
...@@ -1914,12 +1909,14 @@ brt_leaf_put_cmd ( ...@@ -1914,12 +1909,14 @@ brt_leaf_put_cmd (
// Apply to all leafentries if txn is represented // Apply to all leafentries if txn is represented
omt_size = toku_omt_size(bn->buffer); omt_size = toku_omt_size(bn->buffer);
for (u_int32_t idx = 0; idx < omt_size; ) { for (u_int32_t idx = 0; idx < omt_size; ) {
r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL); r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL);
assert_zero(r); assert_zero(r);
storeddata=storeddatav; storeddata=storeddatav;
int deleted = 0; int deleted = 0;
if (le_has_xids(storeddata, cmd->xids)) { if (le_has_xids(storeddata, cmd->xids)) {
brt_leaf_apply_cmd_once(bn, se, cmd, idx, storeddata, logger); uint64_t workdone_this_le;
brt_leaf_apply_cmd_once(bn, se, cmd, idx, storeddata, logger, &workdone_this_le);
workdone_total += workdone_this_le;
u_int32_t new_omt_size = toku_omt_size(bn->buffer); u_int32_t new_omt_size = toku_omt_size(bn->buffer);
if (new_omt_size != omt_size) { if (new_omt_size != omt_size) {
assert(new_omt_size+1 == omt_size); assert(new_omt_size+1 == omt_size);
...@@ -1941,10 +1938,10 @@ brt_leaf_put_cmd ( ...@@ -1941,10 +1938,10 @@ brt_leaf_put_cmd (
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be, r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx, NULL); &storeddatav, &idx, NULL);
if (r==DB_NOTFOUND) { if (r==DB_NOTFOUND) {
r = do_update(t, bn, se, cmd, idx, NULL, logger, made_change); r = do_update(t, bn, se, cmd, idx, NULL, logger, made_change, &workdone_total);
} else if (r==0) { } else if (r==0) {
storeddata=storeddatav; storeddata=storeddatav;
r = do_update(t, bn, se, cmd, idx, storeddata, logger, made_change); r = do_update(t, bn, se, cmd, idx, storeddata, logger, made_change, &workdone_total);
} // otherwise, a worse error, just return it } // otherwise, a worse error, just return it
break; break;
} }
...@@ -1953,10 +1950,12 @@ brt_leaf_put_cmd ( ...@@ -1953,10 +1950,12 @@ brt_leaf_put_cmd (
u_int32_t idx = 0; u_int32_t idx = 0;
u_int32_t num_leafentries_before; u_int32_t num_leafentries_before;
while (idx < (num_leafentries_before = toku_omt_size(bn->buffer))) { while (idx < (num_leafentries_before = toku_omt_size(bn->buffer))) {
uint64_t workdone_this_le = 0;
r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL); r = toku_omt_fetch(bn->buffer, idx, &storeddatav, NULL);
assert(r==0); assert(r==0);
storeddata=storeddatav; storeddata=storeddatav;
r = do_update(t, bn, se, cmd, idx, storeddata, logger, made_change); r = do_update(t, bn, se, cmd, idx, storeddata, logger, made_change, &workdone_this_le);
workdone_total += workdone_this_le;
// TODO(leif): This early return means get_leaf_reactivity() // TODO(leif): This early return means get_leaf_reactivity()
// and VERIFY_NODE() never get called. Is this a problem? // and VERIFY_NODE() never get called. Is this a problem?
assert(r==0); assert(r==0);
...@@ -1971,6 +1970,12 @@ brt_leaf_put_cmd ( ...@@ -1971,6 +1970,12 @@ brt_leaf_put_cmd (
case BRT_NONE: break; // don't do anything case BRT_NONE: break; // don't do anything
} }
// node->dirty = 1;
exit:
if (workdonep)
*workdonep = workdone_total;
VERIFY_NODE(t, node);
return; return;
} }
...@@ -2194,8 +2199,8 @@ merge_leaf_nodes (BRTNODE a, BRTNODE b) { ...@@ -2194,8 +2199,8 @@ merge_leaf_nodes (BRTNODE a, BRTNODE b) {
// move the estimates // move the estimates
int num_children = a->n_children + b->n_children; int num_children = a->n_children + b->n_children;
if (!a_has_tail) { if (!a_has_tail) {
destroy_basement_node((BASEMENTNODE)a->bp[a->n_children-1].ptr); destroy_basement_node(BLB(a, a->n_children-1));
toku_free(a->bp[a->n_children-1].ptr); set_BNULL(a, a->n_children-1);
num_children--; num_children--;
} }
...@@ -2472,8 +2477,8 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react, ...@@ -2472,8 +2477,8 @@ brt_merge_child (BRT t, BRTNODE node, int childnum_to_merge, BOOL *did_react,
node->totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes. node->totalchildkeylens -= deleted_size; // The key was free()'d inside the maybe_merge_pinned_nodes.
if (did_merge) { if (did_merge) {
toku_fifo_free(&BNC_BUFFER(node, childnumb)); destroy_nonleaf_childinfo(BNC(node, childnumb));
toku_free(node->bp[childnumb].ptr); set_BNULL(node, childnumb);
node->n_children--; node->n_children--;
memmove(&node->bp[childnumb], memmove(&node->bp[childnumb],
&node->bp[childnumb+1], &node->bp[childnumb+1],
...@@ -2555,13 +2560,15 @@ brt_handle_maybe_reactive_root (BRT brt, CACHEKEY *rootp, BRTNODE *nodep) { ...@@ -2555,13 +2560,15 @@ brt_handle_maybe_reactive_root (BRT brt, CACHEKEY *rootp, BRTNODE *nodep) {
static void find_heaviest_child (BRTNODE node, int *childnum) { static void find_heaviest_child (BRTNODE node, int *childnum) {
int max_child = 0; int max_child = 0;
int max_weight = BNC_NBYTESINBUF(node, 0); int max_weight = BNC_NBYTESINBUF(node, 0) + BP_WORKDONE(node, 0);
int i; int i;
if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight); if (0) printf("%s:%d weights: %d", __FILE__, __LINE__, max_weight);
assert(node->n_children>0); assert(node->n_children>0);
for (i=1; i<node->n_children; i++) { for (i=1; i<node->n_children; i++) {
int this_weight = BNC_NBYTESINBUF(node,i); if (BP_WORKDONE(node,i))
assert (BNC_NBYTESINBUF(node,i));
int this_weight = BNC_NBYTESINBUF(node,i) + BP_WORKDONE(node,i);;
if (0) printf(" %d", this_weight); if (0) printf(" %d", this_weight);
if (max_weight < this_weight) { if (max_weight < this_weight) {
max_child = i; max_child = i;
...@@ -2649,6 +2656,9 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re, ...@@ -2649,6 +2656,9 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re,
BNC_NBYTESINBUF(node, childnum) -= n_bytes_removed; BNC_NBYTESINBUF(node, childnum) -= n_bytes_removed;
} }
invariant(BNC_NBYTESINBUF(node, childnum) == 0);
BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
node->dirty=TRUE; node->dirty=TRUE;
child->dirty=TRUE; child->dirty=TRUE;
fixup_child_estimates(node, childnum, child, TRUE); fixup_child_estimates(node, childnum, child, TRUE);
...@@ -2686,6 +2696,10 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re, ...@@ -2686,6 +2696,10 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re,
node->dirty = 1; node->dirty = 1;
} }
invariant(BNC_NBYTESINBUF(node, childnum) == 0);
BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__); if (0) printf("%s:%d done random picking\n", __FILE__, __LINE__);
// Having pushed all that stuff to a child, do we need to flush the child? We may have to flush it many times if there were lots of messages that just got pushed down. // Having pushed all that stuff to a child, do we need to flush the child? We may have to flush it many times if there were lots of messages that just got pushed down.
...@@ -2708,6 +2722,64 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re, ...@@ -2708,6 +2722,64 @@ flush_this_child (BRT t, BRTNODE node, int childnum, enum reactivity *child_re,
} }
} }
#ifdef FLUSH_HEIGHT1
static void
flush_this_height1_child (BRT t, BRTNODE node, int childnum, BRTNODE child)
// Effect: Push everything in the CHILDNUMth buffer of (height one) node down into the (leaf) child.
// TODO3564: The child may split or merge as a result of the activity.
// Requires:
// node is height one
// both node and relevant child are in memory and pinned
// all messages in this buffer have already been applied to leafnode, but not messages above
{
printf("Flushing height one node %"PRIu64" to child %d leaf node %"PRIu64"\n",
node->thisnodename.b, childnum, child->thisnodename.b);
assert(node->height == 1);
assert(child->height == 0);
BLOCKNUM targetchild = BNC_BLOCKNUM(node, childnum);
assert(targetchild.b == child->thisnodename.b);
toku_verify_blocknum_allocated(t->h->blocktable, targetchild);
assert(child->thisnodename.b!=0);
VERIFY_NODE(t, child);
// remove this invariant if this function can be called after messages above this node have been applied to leaf
invariant(node->max_msn_applied_to_node.msn >= child->max_msn_applied_to_node.msn);
FIFO fifo = BNC_BUFFER(node,childnum);
// The child is a leaf node.
// We must empty the fifo, and mark the node and child as dirty
bytevec key, val;
ITEMLEN keylen, vallen;
u_int32_t type;
MSN msn;
XIDS xids;
while(0==toku_fifo_peek(fifo, &key, &keylen, &val, &vallen, &type, &msn, &xids)) {
int n_bytes_removed = (keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids));
int r = toku_fifo_deq(fifo);
assert(r==0);
node->u.n.n_bytes_in_buffers -= n_bytes_removed;
BNC_NBYTESINBUF(node, childnum) -= n_bytes_removed;
}
invariant(BNC_NBYTESINBUF(node, childnum) == 0);
BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
node->dirty=TRUE;
child->dirty=TRUE;
fixup_child_estimates(node, childnum, child, TRUE);
// TODO3564 When to test and deal with a reactive leaf?
// *child_re = get_node_reactivity(child);
}
#endif
static void static void
brtnode_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd) brtnode_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd)
// Effect: Push CMD into the subtree rooted at NODE. // Effect: Push CMD into the subtree rooted at NODE.
...@@ -2749,7 +2821,7 @@ brtnode_nonleaf_put_cmd_at_root (BRT t, BRTNODE node, BRT_MSG cmd) ...@@ -2749,7 +2821,7 @@ brtnode_nonleaf_put_cmd_at_root (BRT t, BRTNODE node, BRT_MSG cmd)
// If the appropriate basement node is not in memory, then nothing gets applied // If the appropriate basement node is not in memory, then nothing gets applied
// If the appropriate basement node must be in memory, it is the caller's responsibility to ensure // If the appropriate basement node must be in memory, it is the caller's responsibility to ensure
// that it is // that it is
void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, int *made_change) { void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, bool *made_change, uint64_t *workdonep) {
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
// ignore messages that have already been applied to this leaf // ignore messages that have already been applied to this leaf
if (cmd->msn.msn <= node->max_msn_applied_to_node_in_memory.msn) { if (cmd->msn.msn <= node->max_msn_applied_to_node_in_memory.msn) {
...@@ -2763,25 +2835,26 @@ void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, int *made_change) ...@@ -2763,25 +2835,26 @@ void toku_apply_cmd_to_leaf(BRT t, BRTNODE node, BRT_MSG cmd, int *made_change)
if (brt_msg_applies_once(cmd)) { if (brt_msg_applies_once(cmd)) {
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t); unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t);
if (BP_STATE(node,childnum) == PT_AVAIL) { if (BP_STATE(node,childnum) == PT_AVAIL) {
brt_leaf_put_cmd( brt_leaf_put_cmd(t,
t, BLB(node, childnum),
(BASEMENTNODE)node->bp[childnum].ptr, &BP_SUBTREE_EST(node, childnum),
&BP_SUBTREE_EST(node, childnum), cmd,
cmd, made_change,
made_change workdonep
); );
} }
} }
else if (brt_msg_applies_all(cmd)) { else if (brt_msg_applies_all(cmd)) {
int bn_made_change = 0; bool bn_made_change = false;
for (int childnum=0; childnum<node->n_children; childnum++) { for (int childnum=0; childnum<node->n_children; childnum++) {
if (BP_STATE(node,childnum) == PT_AVAIL) { if (BP_STATE(node,childnum) == PT_AVAIL) {
brt_leaf_put_cmd( brt_leaf_put_cmd(
t, t,
(BASEMENTNODE)node->bp[childnum].ptr, BLB(node, childnum),
&BP_SUBTREE_EST(node,childnum), &BP_SUBTREE_EST(node,childnum),
cmd, cmd,
&bn_made_change &bn_made_change,
workdonep
); );
if (bn_made_change) *made_change = 1; if (bn_made_change) *made_change = 1;
} }
...@@ -2818,9 +2891,10 @@ static void push_something_at_root (BRT brt, BRTNODE *nodep, BRT_MSG cmd) ...@@ -2818,9 +2891,10 @@ static void push_something_at_root (BRT brt, BRTNODE *nodep, BRT_MSG cmd)
// Must special case height 0, since brtnode_put_cmd() doesn't modify leaves. // Must special case height 0, since brtnode_put_cmd() doesn't modify leaves.
// Part of the problem is: if the node is in memory, then it was updated as part of the in-memory operation. // Part of the problem is: if the node is in memory, then it was updated as part of the in-memory operation.
// If the root node is not in memory, then we must apply it. // If the root node is not in memory, then we must apply it.
int made_dirty = 0; bool made_dirty = 0;
uint64_t workdone = 0;
// not up to date, which means the get_and_pin actually fetched it into memory. // not up to date, which means the get_and_pin actually fetched it into memory.
toku_apply_cmd_to_leaf(brt, node, cmd, &made_dirty); toku_apply_cmd_to_leaf(brt, node, cmd, &made_dirty, &workdone);
if (made_dirty) node->dirty = 1; if (made_dirty) node->dirty = 1;
} else { } else {
brtnode_nonleaf_put_cmd_at_root(brt, node, cmd); brtnode_nonleaf_put_cmd_at_root(brt, node, cmd);
...@@ -2856,6 +2930,7 @@ static u_int32_t get_roothash (BRT brt) { ...@@ -2856,6 +2930,7 @@ static u_int32_t get_roothash (BRT brt) {
return rh->fullhash; return rh->fullhash;
} }
// apply a single message, stored in root's buffer(s), to all relevant leaves that are in memory
static void apply_cmd_to_in_memory_non_root_leaves ( static void apply_cmd_to_in_memory_non_root_leaves (
BRT t, BRT t,
CACHEKEY nodenum, CACHEKEY nodenum,
...@@ -2863,7 +2938,8 @@ static void apply_cmd_to_in_memory_non_root_leaves ( ...@@ -2863,7 +2938,8 @@ static void apply_cmd_to_in_memory_non_root_leaves (
BRT_MSG cmd, BRT_MSG cmd,
BOOL is_root, BOOL is_root,
BRTNODE parent, BRTNODE parent,
int parents_childnum int parents_childnum,
uint64_t * workdone_this_childpath_p
) )
{ {
void *node_v; void *node_v;
...@@ -2876,27 +2952,22 @@ static void apply_cmd_to_in_memory_non_root_leaves ( ...@@ -2876,27 +2952,22 @@ static void apply_cmd_to_in_memory_non_root_leaves (
if (brt_msg_applies_once(cmd)) { if (brt_msg_applies_once(cmd)) {
unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t); unsigned int childnum = toku_brtnode_which_child(node, cmd->u.id.key, t);
u_int32_t child_fullhash = compute_child_fullhash(t->cf, node, childnum); u_int32_t child_fullhash = compute_child_fullhash(t->cf, node, childnum);
apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), child_fullhash, cmd, FALSE, node, childnum); apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), child_fullhash, cmd, FALSE, node, childnum, workdone_this_childpath_p);
} }
else if (brt_msg_applies_all(cmd)) { else if (brt_msg_applies_all(cmd)) {
for (int childnum=0; childnum<node->n_children; childnum++) { for (int childnum=0; childnum<node->n_children; childnum++) {
assert(BP_HAVE_FULLHASH(node, childnum)); assert(BP_HAVE_FULLHASH(node, childnum));
apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), BP_FULLHASH(node, childnum), cmd, FALSE, node, childnum); apply_cmd_to_in_memory_non_root_leaves(t, BP_BLOCKNUM(node, childnum), BP_FULLHASH(node, childnum), cmd, FALSE, node, childnum, workdone_this_childpath_p);
} }
} }
else if (brt_msg_does_nothing(cmd)) {
}
else {
assert(FALSE);
}
} }
// leaf node // leaf node
else { else {
// only apply message if this is NOT a root node, because push_something_at_root // only apply message if this is NOT a root node, because push_something_at_root
// has already applied it // has already applied it
if (!is_root) { if (!is_root) {
int made_change; bool made_change;
toku_apply_cmd_to_leaf(t, node, cmd, &made_change); toku_apply_cmd_to_leaf(t, node, cmd, &made_change, workdone_this_childpath_p);
} }
} }
...@@ -2950,7 +3021,7 @@ toku_brt_root_put_cmd (BRT brt, BRT_MSG_S * cmd) ...@@ -2950,7 +3021,7 @@ toku_brt_root_put_cmd (BRT brt, BRT_MSG_S * cmd)
// verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock) // verify that msn of latest message was captured in root node (push_something_at_root() did not release ydb lock)
invariant(cmd->msn.msn == node->max_msn_applied_to_node_in_memory.msn); invariant(cmd->msn.msn == node->max_msn_applied_to_node_in_memory.msn);
apply_cmd_to_in_memory_non_root_leaves(brt, *rootp, fullhash, cmd, TRUE, NULL, -1); apply_cmd_to_in_memory_non_root_leaves(brt, *rootp, fullhash, cmd, TRUE, NULL, -1, NULL);
if (node->height > 0 && nonleaf_node_is_gorged(node)) { if (node->height > 0 && nonleaf_node_is_gorged(node)) {
// No need for a loop here. We only inserted one message, so flushing a single child suffices. // No need for a loop here. We only inserted one message, so flushing a single child suffices.
flush_some_child(brt, node, TRUE, TRUE, flush_some_child(brt, node, TRUE, TRUE,
...@@ -4872,25 +4943,27 @@ static BOOL msg_type_has_key (enum brt_msg_type m) { ...@@ -4872,25 +4943,27 @@ static BOOL msg_type_has_key (enum brt_msg_type m) {
} }
static int static int
apply_buffer_messages_to_node ( apply_buffer_messages_to_basement_node (
BRT t, BRT t,
BASEMENTNODE bn, BASEMENTNODE bn,
SUBTREE_EST se,
BRTNODE ancestor, BRTNODE ancestor,
int childnum, int childnum,
int height, struct pivot_bounds const * const bounds,
MSN min_applied_msn, bool *made_change
struct pivot_bounds const * const bounds
) )
// Effect: For all the messages in ANCESTOR that are between lower_bound_exclusive (exclusive) and upper_bound_inclusive (inclusive), apply the message node. // Effect: For each messages in ANCESTOR that is between lower_bound_exclusive (exclusive) and upper_bound_inclusive (inclusive), apply the message to the node.
// In ANCESTOR, the relevant messages are all in the buffer for child number CHILDNUM. // In ANCESTOR, the relevant messages are all in the buffer for child number CHILDNUM.
// Treat the bounds as minus or plus infinity respectively if they are NULL. // Treat the bounds as minus or plus infinity respectively if they are NULL.
// Do not mark the node as dirty (preserve previous state of 'dirty' bit).
{ {
assert(ancestor->height==height); //F MSN start_msn = node->max_msn_applied_to_node;
assert(ancestor->height>0); //F uint64_t start_workdone = BP_WORKDONE(ancestor, childnum);
//F printf("apply_buffer_messages_to_leafnode %"PRIu64", height = %d, msn = 0x%"PRIx64", ancestor = %"PRIu64", ancestor msn = 0x%"PRIx64"\n",
// node->thisnodename.b, node->height, start_msn.msn, ancestor->thisnodename.b, ancestor->max_msn_applied_to_node.msn);
assert(0 <= childnum && childnum < ancestor->n_children); assert(0 <= childnum && childnum < ancestor->n_children);
int r = 0; int r = 0;
DBT lbe, ubi; DBT lbe, ubi; // lbe is lower bound exclusive, ubi is upper bound inclusive
DBT *lbe_ptr, *ubi_ptr; DBT *lbe_ptr, *ubi_ptr;
if (bounds->lower_bound_exclusive==NULL) { if (bounds->lower_bound_exclusive==NULL) {
lbe_ptr = NULL; lbe_ptr = NULL;
...@@ -4904,27 +4977,156 @@ apply_buffer_messages_to_node ( ...@@ -4904,27 +4977,156 @@ apply_buffer_messages_to_node (
ubi = kv_pair_key_to_dbt(bounds->upper_bound_inclusive); ubi = kv_pair_key_to_dbt(bounds->upper_bound_inclusive);
ubi_ptr = &ubi; ubi_ptr = &ubi;
} }
int made_change;
assert(BP_STATE(ancestor,childnum) == PT_AVAIL); assert(BP_STATE(ancestor,childnum) == PT_AVAIL);
uint64_t workdone_this_leaf_total = 0;
FIFO_ITERATE(BNC_BUFFER(ancestor, childnum), key, keylen, val, vallen, type, msn, xids, FIFO_ITERATE(BNC_BUFFER(ancestor, childnum), key, keylen, val, vallen, type, msn, xids,
({ ({
DBT hk; DBT hk;
toku_fill_dbt(&hk, key, keylen); toku_fill_dbt(&hk, key, keylen);
if (msn.msn > min_applied_msn.msn && (!msg_type_has_key(type) || key_is_in_leaf_range(t, &hk, lbe_ptr, ubi_ptr))) { if (msn.msn > bn->max_msn_applied.msn && (!msg_type_has_key(type) || key_is_in_leaf_range(t, &hk, lbe_ptr, ubi_ptr))) {
DBT hv; DBT hv;
BRT_MSG_S brtcmd = { (enum brt_msg_type)type, msn, xids, .u.id = {&hk, BRT_MSG_S brtcmd = { (enum brt_msg_type)type, msn, xids, .u.id = {&hk,
toku_fill_dbt(&hv, val, vallen)} }; toku_fill_dbt(&hv, val, vallen)} };
brt_leaf_put_cmd(t, bn, se, &brtcmd, &made_change); uint64_t workdone_this_leaf = 0;
brt_leaf_put_cmd(t,
bn, &BP_SUBTREE_EST(ancestor, childnum),
&brtcmd, made_change, &workdone_this_leaf);
BP_WORKDONE(ancestor, childnum) += workdone_this_leaf;
workdone_this_leaf_total += workdone_this_leaf;
} }
})); }));
//F uint64_t end_workdone = BP_WORKDONE(ancestor, childnum);
// printf(" workdone = %"PRIu64", msndiff = 0x%"PRIx64", ancestorworkdone start, end = %"PRIu64", %"PRIu64"\n",
// workdone_this_leaf_total, node->max_msn_applied_to_node.msn - start_msn.msn, start_workdone, end_workdone);
return r; return r;
} }
//###########
static void static void
maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds) maybe_flush_pinned_node(BRT t, BRTNODE node, int childnum, BRTNODE child) {
// Effect: Bring a leaf node up-to-date according to all the messages in the ancestors. If the leaf node is already up-to-date then do nothing. // Effect:
// If NODE is not a leaf node, then don't meodify it. // if some criterion is met, flush the specified buffer
// The dirtyness of the node is not changed. // Requires;
// node and specified child are pinned
if (0) {
printf("maybe_flush_pinned_node nodeid %"PRIu64" child %d, max_msn=0x%"PRIx64", workdone=%"PRIu64" bufsize=%d\n",
node->thisnodename.b, childnum, node->max_msn_applied_to_node_in_memory.msn,
BP_WORKDONE(node,childnum), BNC_NBYTESINBUF(node,childnum));
}
assert(node->height>0);
assert(child->height == node->height - 1);
BLOCKNUM targetchild = BP_BLOCKNUM(node, childnum);
toku_verify_blocknum_allocated(t->h->blocktable, targetchild);
assert(child->thisnodename.b!=0);
assert(targetchild.b == child->thisnodename.b);
VERIFY_NODE(t, child);
uint32_t threshold = node->nodesize / node->n_children;
if (BNC_NBYTESINBUF(node,childnum) + BP_WORKDONE(node,childnum) > threshold) {
if (0) {
printf("flush pinned node %"PRIu64" height %d child %d, max_msn=0x%"PRIx64", workdone=%"PRIu64" bufsize=%d, threshold = %d\n",
node->thisnodename.b, node->height, childnum, node->max_msn_applied_to_node_in_memory.msn,
BP_WORKDONE(node,childnum), BNC_NBYTESINBUF(node,childnum), threshold);
}
FIFO fifo = BNC_BUFFER(node,childnum);
if (child->height==0) {
// The child is a leaf node.
assert_leaf_up_to_date(child);
bytevec key, val;
ITEMLEN keylen, vallen;
u_int32_t type;
MSN msn;
XIDS xids;
while(0==toku_fifo_peek(fifo, &key, &keylen, &val, &vallen, &type, &msn, &xids)) {
int n_bytes_removed = (keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids));
int r = toku_fifo_deq(fifo);
assert(r==0);
BNC_NBYTESINBUF(node, childnum) -= n_bytes_removed;
}
invariant(BNC_NBYTESINBUF(node, childnum) == 0);
BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
node->dirty=TRUE;
child->dirty=TRUE;
fixup_child_estimates(node, childnum, child, TRUE);
} else {
bytevec key,val;
ITEMLEN keylen, vallen;
assert(toku_fifo_n_entries(fifo)>0);
u_int32_t type;
MSN msn;
XIDS xids;
while(0==toku_fifo_peek(fifo, &key, &keylen, &val, &vallen, &type, &msn, &xids)) {
DBT hk,hv;
//TODO: Factor out (into a function) conversion of fifo_entry to message
BRT_MSG_S brtcmd = { (enum brt_msg_type)type, msn, xids, .u.id= {toku_fill_dbt(&hk, key, keylen),
toku_fill_dbt(&hv, val, vallen)} };
int n_bytes_removed = (hk.size + hv.size + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids));
brt_nonleaf_put_cmd(t, child, &brtcmd);
{
int r = toku_fifo_deq(fifo);
assert(r==0);
}
BNC_NBYTESINBUF(node, childnum) -= n_bytes_removed;
node->dirty = 1;
}
invariant(BNC_NBYTESINBUF(node, childnum) == 0);
BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
}
fixup_child_estimates(node, childnum, child, TRUE);
}
}
static void
apply_ancestors_messages_to_leafnode_and_maybe_flush (BRT t, BASEMENTNODE bm, ANCESTORS ancestors, BRTNODE child,
const struct pivot_bounds const *bounds, bool *made_change)
// Effect: Go through ancestors list applying messages from first ancestor (height one), then next, until
// all messages have been applied.
// Then mark the node as up_to_date.
// Then maybe flush the ancestors above, starting with the root and going down.
// Arguments:
// t: the tree
// bm: the basement node to which all the messages should be applied.
// ancestors: a linked list of the ancestors (nearest first). The last in the list is the root.
// child: the child of the first ancestor. We pass this only because of a hack to flush to the child without requiring repinning. With background
// flushing the child argument will go away.
// bounds: lower and upper bounds (exclusive and inclusive resp) of the keys that belong in bm.
// made_change: (output). Set true if we actually made a change.
// Implementation note: This is a recursive function that applies messages on the way in and maybe flushes the child path on the way out.
// With background flushing we will be able to back to a simpler loop (since the recursion will be tail recursion).
{
if (ancestors) {
apply_buffer_messages_to_basement_node(t, bm, ancestors->node, ancestors->childnum, bounds, made_change);
apply_ancestors_messages_to_leafnode_and_maybe_flush(t, bm, ancestors->next, ancestors->node, bounds, made_change);
maybe_flush_pinned_node(t, ancestors->node, ancestors->childnum, child);
} else {
// have just applied messages stored in root
bm->soft_copy_is_up_to_date = true;
}
}
static void
maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, bool *made_change)
// Effect:
// Bring a leaf node up-to-date according to all the messages in the ancestors.
// If the leaf node is already up-to-date then do nothing.
// If the leaf node is not already up-to-date, then record the work done for that leaf in each ancestor.
// If workdone for any nonleaf nodes exceeds threshold then flush them, but don't do any merges or splits.
{ {
VERIFY_NODE(t, node); VERIFY_NODE(t, node);
BOOL update_stats = FALSE; BOOL update_stats = FALSE;
...@@ -4938,21 +5140,18 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors ...@@ -4938,21 +5140,18 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
} }
update_stats = TRUE; update_stats = TRUE;
int height = 0; int height = 0;
BASEMENTNODE curr_bn = (BASEMENTNODE)node->bp[i].ptr; BASEMENTNODE curr_bn = BLB(node, i);
SUBTREE_EST curr_se = &BP_SUBTREE_EST(node,i);
ANCESTORS curr_ancestors = ancestors; ANCESTORS curr_ancestors = ancestors;
struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds); struct pivot_bounds curr_bounds = next_pivot_keys(node, i, bounds);
while (curr_ancestors) { while (curr_ancestors) {
height++; height++;
apply_buffer_messages_to_node( apply_ancestors_messages_to_leafnode_and_maybe_flush(
t, t,
curr_bn, curr_bn,
curr_se, curr_ancestors,
curr_ancestors->node, node,
curr_ancestors->childnum, &curr_bounds,
height, made_change
node->max_msn_applied_to_node_on_disk,
&curr_bounds
); );
if (curr_ancestors->node->max_msn_applied_to_node_in_memory.msn > node->max_msn_applied_to_node_in_memory.msn) { if (curr_ancestors->node->max_msn_applied_to_node_in_memory.msn > node->max_msn_applied_to_node_in_memory.msn) {
node->max_msn_applied_to_node_in_memory = curr_ancestors->node->max_msn_applied_to_node_in_memory; node->max_msn_applied_to_node_in_memory = curr_ancestors->node->max_msn_applied_to_node_in_memory;
...@@ -5301,7 +5500,7 @@ brt_search_node( ...@@ -5301,7 +5500,7 @@ brt_search_node(
} }
else { else {
r = brt_search_basement_node( r = brt_search_basement_node(
(BASEMENTNODE)node->bp[child_to_search].ptr, BLB(node, child_to_search),
search, search,
getf, getf,
getf_v, getf_v,
...@@ -6035,7 +6234,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_ ...@@ -6035,7 +6234,7 @@ toku_dump_brtnode (FILE *file, BRT brt, BLOCKNUM blocknum, int depth, struct kv_
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, msn, xids, FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, msn, xids,
{ {
data=data; datalen=datalen; keylen=keylen; data=data; datalen=datalen; keylen=keylen;
fprintf(file, "%*s xid=%"PRIu64" %u (type=%d) msn=%"PRIu64"\n", depth+2, "", xids_get_innermost_xid(xids), (unsigned)toku_dtoh32(*(int*)key), type, msn.msn); fprintf(file, "%*s xid=%"PRIu64" %u (type=%d) msn=0x%"PRIu64"\n", depth+2, "", xids_get_innermost_xid(xids), (unsigned)toku_dtoh32(*(int*)key), type, msn.msn);
//assert(strlen((char*)key)+1==keylen); //assert(strlen((char*)key)+1==keylen);
//assert(strlen((char*)data)+1==datalen); //assert(strlen((char*)data)+1==datalen);
}); });
......
...@@ -2759,7 +2759,8 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int ...@@ -2759,7 +2759,8 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
DBT thekey = { .data = key, .size = keylen }; DBT thekey = { .data = key, .size = keylen };
DBT theval = { .data = val, .size = vallen }; DBT theval = { .data = val, .size = vallen };
BRT_MSG_S cmd = { BRT_INSERT, ZERO_MSN, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, ZERO_MSN, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL); uint64_t workdone=0;
brt_leaf_apply_cmd_once(BLB(leafnode,0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, &workdone);
} }
static int write_literal(struct dbout *out, void*data, size_t len) { static int write_literal(struct dbout *out, void*data, size_t len) {
...@@ -2994,11 +2995,14 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -2994,11 +2995,14 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
totalchildkeylens += kv_pair_keylen(childkey); totalchildkeylens += kv_pair_keylen(childkey);
} }
node->totalchildkeylens = totalchildkeylens; node->totalchildkeylens = totalchildkeylens;
XMALLOC_N(n_children, node->bp);
for (int i = 0; i < n_children; i++) { for (int i=0; i<n_children; i++) {
BP_SUBTREE_EST(node, i) = subtree_info[i].subtree_estimates; set_BNC(node, i, toku_create_empty_nl());
BP_BLOCKNUM(node, i) = make_blocknum(subtree_info[i].block); BP_BLOCKNUM(node,i)= make_blocknum(subtree_info[i].block);
BP_STATE(node, i) = PT_AVAIL; BP_SUBTREE_EST(node,i) = subtree_info[i].subtree_estimates;
BP_HAVE_FULLHASH(node,i) = FALSE;
BP_FULLHASH(node,i) = 0;
BP_STATE(node,i) = PT_AVAIL;
} }
if (result == 0) { if (result == 0) {
...@@ -3029,11 +3033,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu ...@@ -3029,11 +3033,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
toku_free(node->childkeys[i]); toku_free(node->childkeys[i]);
} }
for (int i=0; i<n_children; i++) { for (int i=0; i<n_children; i++) {
if (BNC_BUFFER(node, i)) { destroy_nonleaf_childinfo(BNC(node,i));
toku_fifo_free(&BNC_BUFFER(node, i));
BNC_BUFFER(node, i) = NULL;
}
toku_free(node->bp[i].ptr);
} }
toku_free(pivots); toku_free(pivots);
toku_free(node->bp); toku_free(node->bp);
......
...@@ -22,6 +22,7 @@ typedef struct brt *BRT; ...@@ -22,6 +22,7 @@ typedef struct brt *BRT;
typedef struct brtnode *BRTNODE; typedef struct brtnode *BRTNODE;
typedef struct brtnode_leaf_basement_node *BASEMENTNODE; typedef struct brtnode_leaf_basement_node *BASEMENTNODE;
typedef struct brtnode_nonleaf_childinfo *NONLEAF_CHILDINFO; typedef struct brtnode_nonleaf_childinfo *NONLEAF_CHILDINFO;
typedef struct sub_block *SUB_BLOCK;
typedef struct subtree_estimates *SUBTREE_EST; typedef struct subtree_estimates *SUBTREE_EST;
struct brt_header; struct brt_header;
struct wbuf; struct wbuf;
......
...@@ -13,9 +13,14 @@ ...@@ -13,9 +13,14 @@
#include "threadpool.h" #include "threadpool.h"
#include "sub_block.h" #include "sub_block.h"
#include "compress.h" #include "compress.h"
#include "memory.h"
void SUB_BLOCK sub_block_creat(void) {
sub_block_init(struct sub_block *sub_block) { SUB_BLOCK XMALLOC(sb);
sub_block_init(sb);
return sb;
}
void sub_block_init(SUB_BLOCK sub_block) {
sub_block->uncompressed_ptr = 0; sub_block->uncompressed_ptr = 0;
sub_block->uncompressed_size = 0; sub_block->uncompressed_size = 0;
...@@ -25,7 +30,7 @@ sub_block_init(struct sub_block *sub_block) { ...@@ -25,7 +30,7 @@ sub_block_init(struct sub_block *sub_block) {
sub_block->xsum = 0; sub_block->xsum = 0;
} }
// get the size of the compression header // get the size of the compression header
size_t size_t
sub_block_header_size(int n_sub_blocks) { sub_block_header_size(int n_sub_blocks) {
...@@ -204,6 +209,8 @@ compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *un ...@@ -204,6 +209,8 @@ compress_all_sub_blocks(int n_sub_blocks, struct sub_block sub_block[], char *un
char *compressed_base_ptr = compressed_ptr; char *compressed_base_ptr = compressed_ptr;
size_t compressed_len; size_t compressed_len;
// This is a complex way to write a parallel loop. Cilk would be better.
if (n_sub_blocks == 1) { if (n_sub_blocks == 1) {
// single sub-block // single sub-block
sub_block[0].uncompressed_ptr = uncompressed_ptr; sub_block[0].uncompressed_ptr = uncompressed_ptr;
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <compress.h> #include <compress.h>
#include "brttypes.h"
#if defined(__cplusplus) || defined(__cilkplusplus) #if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" { extern "C" {
...@@ -37,8 +38,8 @@ struct stored_sub_block { ...@@ -37,8 +38,8 @@ struct stored_sub_block {
u_int32_t xsum; u_int32_t xsum;
}; };
void void sub_block_init(SUB_BLOCK);
sub_block_init(struct sub_block *sub_block); SUB_BLOCK sub_block_creat(void);
// get the size of the compression header // get the size of the compression header
size_t size_t
......
...@@ -112,11 +112,11 @@ setup_dn(enum brtnode_verify_type bft, int fd, struct brt_header *brt_h, BRTNODE ...@@ -112,11 +112,11 @@ setup_dn(enum brtnode_verify_type bft, int fd, struct brt_header *brt_h, BRTNODE
for (int i = 0; i < (*dn)->n_children; i++) { for (int i = 0; i < (*dn)->n_children; i++) {
if ((*dn)->height == 0) { if ((*dn)->height == 0) {
assert(BP_STATE(*dn,i) == PT_ON_DISK); assert(BP_STATE(*dn,i) == PT_ON_DISK);
assert((*dn)->bp[i].ptr == NULL); assert(is_BNULL(*dn, i));
} }
else { else {
assert(BP_STATE(*dn,i) == PT_COMPRESSED); assert(BP_STATE(*dn,i) == PT_COMPRESSED);
assert((*dn)->bp[i].ptr != NULL); assert(is_BNULL(*dn, i));
} }
} }
} }
...@@ -175,11 +175,8 @@ test_serialize_leaf_with_large_pivots(enum brtnode_verify_type bft) { ...@@ -175,11 +175,8 @@ test_serialize_leaf_with_large_pivots(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long) random())<<32); BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long) random())<<32); BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0); BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0);
sn.bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node)); set_BLB(&sn, i, toku_create_empty_bn());
r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0);
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION; BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
} }
for (int i = 0; i < nrows; ++i) { for (int i = 0; i < nrows; ++i) {
r = toku_omt_insert(BLB_BUFFER(&sn, i), les[i], omt_cmp, les[i], NULL); assert(r==0); r = toku_omt_insert(BLB_BUFFER(&sn, i), les[i], omt_cmp, les[i], NULL); assert(r==0);
...@@ -245,15 +242,12 @@ test_serialize_leaf_with_large_pivots(enum brtnode_verify_type bft) { ...@@ -245,15 +242,12 @@ test_serialize_leaf_with_large_pivots(enum brtnode_verify_type bft) {
for (int i = 0; i < sn.n_children-1; ++i) { for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]); kv_pair_free(sn.childkeys[i]);
} }
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < nrows; ++i) { for (int i = 0; i < nrows; ++i) {
toku_free(les[i]); toku_free(les[i]);
} }
toku_free(sn.childkeys); toku_free(sn.childkeys);
for (int i = 0; i < sn.n_children; i++) { for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr); destroy_basement_node(BLB(&sn, i));
} }
toku_free(sn.bp); toku_free(sn.bp);
...@@ -299,11 +293,9 @@ test_serialize_leaf_with_many_rows(enum brtnode_verify_type bft) { ...@@ -299,11 +293,9 @@ test_serialize_leaf_with_many_rows(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long) random())<<32); BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long) random())<<32); BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0); BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0);
sn.bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node)); set_BLB(&sn, i, toku_create_empty_bn());
r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0); r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0);
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION; BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
} }
BLB_NBYTESINBUF(&sn, 0) = 0; BLB_NBYTESINBUF(&sn, 0) = 0;
for (int i = 0; i < nrows; ++i) { for (int i = 0; i < nrows; ++i) {
...@@ -366,14 +358,11 @@ test_serialize_leaf_with_many_rows(enum brtnode_verify_type bft) { ...@@ -366,14 +358,11 @@ test_serialize_leaf_with_many_rows(enum brtnode_verify_type bft) {
for (int i = 0; i < sn.n_children-1; ++i) { for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]); kv_pair_free(sn.childkeys[i]);
} }
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < nrows; ++i) { for (int i = 0; i < nrows; ++i) {
toku_free(les[i]); toku_free(les[i]);
} }
for (int i = 0; i < sn.n_children; i++) { for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr); destroy_basement_node(BLB(&sn, i));
} }
toku_free(sn.bp); toku_free(sn.bp);
toku_free(sn.childkeys); toku_free(sn.childkeys);
...@@ -425,11 +414,8 @@ test_serialize_leaf_with_large_rows(enum brtnode_verify_type bft) { ...@@ -425,11 +414,8 @@ test_serialize_leaf_with_large_rows(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long) random())<<32); BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long) random())<<32); BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long) random())<<32);
BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0); BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0);
sn.bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node)); set_BLB(&sn, i, toku_create_empty_bn());
r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0);
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION; BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
} }
BLB_NBYTESINBUF(&sn, 0) = 0; BLB_NBYTESINBUF(&sn, 0) = 0;
for (int i = 0; i < 7; ++i) { for (int i = 0; i < 7; ++i) {
...@@ -492,14 +478,11 @@ test_serialize_leaf_with_large_rows(enum brtnode_verify_type bft) { ...@@ -492,14 +478,11 @@ test_serialize_leaf_with_large_rows(enum brtnode_verify_type bft) {
for (int i = 0; i < sn.n_children-1; ++i) { for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]); kv_pair_free(sn.childkeys[i]);
} }
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < 7; ++i) { for (int i = 0; i < 7; ++i) {
toku_free(les[i]); toku_free(les[i]);
} }
for (int i = 0; i < sn.n_children; i++) { for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr); destroy_basement_node(BLB(&sn, i));
} }
toku_free(sn.bp); toku_free(sn.bp);
toku_free(sn.childkeys); toku_free(sn.childkeys);
...@@ -549,10 +532,8 @@ test_serialize_leaf_with_empty_basement_nodes(enum brtnode_verify_type bft) { ...@@ -549,10 +532,8 @@ test_serialize_leaf_with_empty_basement_nodes(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long)random())<<32); BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long)random())<<32); BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0); BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0);
sn.bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node)); set_BLB(&sn, i, toku_create_empty_bn());
r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0);
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION; BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0; BLB_SEQINSERT(&sn, i) = 0;
} }
r = toku_omt_insert(BLB_BUFFER(&sn, 1), elts[0], omt_cmp, elts[0], NULL); assert(r==0); r = toku_omt_insert(BLB_BUFFER(&sn, 1), elts[0], omt_cmp, elts[0], NULL); assert(r==0);
...@@ -622,14 +603,11 @@ test_serialize_leaf_with_empty_basement_nodes(enum brtnode_verify_type bft) { ...@@ -622,14 +603,11 @@ test_serialize_leaf_with_empty_basement_nodes(enum brtnode_verify_type bft) {
for (int i = 0; i < sn.n_children-1; ++i) { for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]); kv_pair_free(sn.childkeys[i]);
} }
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
toku_free(elts[i]); toku_free(elts[i]);
} }
for (int i = 0; i < sn.n_children; i++) { for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr); destroy_basement_node(BLB(&sn, i));
} }
toku_free(sn.bp); toku_free(sn.bp);
toku_free(sn.childkeys); toku_free(sn.childkeys);
...@@ -672,11 +650,8 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum brtnode_verify_type ...@@ -672,11 +650,8 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum brtnode_verify_type
BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long)random())<<32); BP_SUBTREE_EST(&sn,i).nkeys = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long)random())<<32); BP_SUBTREE_EST(&sn,i).dsize = random() + (((long long)random())<<32);
BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0); BP_SUBTREE_EST(&sn,i).exact = (BOOL)(random()%2 != 0);
sn.bp[i].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node)); set_BLB(&sn, i, toku_create_empty_bn());
r = toku_omt_create(&BLB_BUFFER(&sn, i)); assert(r==0);
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION; BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
} }
BLB_NBYTESINBUF(&sn, 0) = 0*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(&sn, 0)); BLB_NBYTESINBUF(&sn, 0) = 0*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(&sn, 0));
BLB_NBYTESINBUF(&sn, 1) = 0*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(&sn, 1)); BLB_NBYTESINBUF(&sn, 1) = 0*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(&sn, 1));
...@@ -739,11 +714,8 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum brtnode_verify_type ...@@ -739,11 +714,8 @@ test_serialize_leaf_with_multiple_empty_basement_nodes(enum brtnode_verify_type
for (int i = 0; i < sn.n_children-1; ++i) { for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]); kv_pair_free(sn.childkeys[i]);
} }
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < sn.n_children; i++) { for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr); destroy_basement_node(BLB(&sn, i));
} }
toku_free(sn.bp); toku_free(sn.bp);
toku_free(sn.childkeys); toku_free(sn.childkeys);
...@@ -793,10 +765,8 @@ test_serialize_leaf(enum brtnode_verify_type bft) { ...@@ -793,10 +765,8 @@ test_serialize_leaf(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,1).exact = (BOOL)(random()%2 != 0); BP_SUBTREE_EST(&sn,1).exact = (BOOL)(random()%2 != 0);
BP_STATE(&sn,0) = PT_AVAIL; BP_STATE(&sn,0) = PT_AVAIL;
BP_STATE(&sn,1) = PT_AVAIL; BP_STATE(&sn,1) = PT_AVAIL;
sn.bp[0].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node)); set_BLB(&sn, 0, toku_create_empty_bn());
sn.bp[1].ptr = toku_xmalloc(sizeof(struct brtnode_leaf_basement_node)); set_BLB(&sn, 1, toku_create_empty_bn());
r = toku_omt_create(&BLB_BUFFER(&sn, 0)); assert(r==0);
r = toku_omt_create(&BLB_BUFFER(&sn, 1)); assert(r==0);
r = toku_omt_insert(BLB_BUFFER(&sn, 0), elts[0], omt_cmp, elts[0], NULL); assert(r==0); r = toku_omt_insert(BLB_BUFFER(&sn, 0), elts[0], omt_cmp, elts[0], NULL); assert(r==0);
r = toku_omt_insert(BLB_BUFFER(&sn, 0), elts[1], omt_cmp, elts[1], NULL); assert(r==0); r = toku_omt_insert(BLB_BUFFER(&sn, 0), elts[1], omt_cmp, elts[1], NULL); assert(r==0);
r = toku_omt_insert(BLB_BUFFER(&sn, 1), elts[2], omt_cmp, elts[2], NULL); assert(r==0); r = toku_omt_insert(BLB_BUFFER(&sn, 1), elts[2], omt_cmp, elts[2], NULL); assert(r==0);
...@@ -804,8 +774,6 @@ test_serialize_leaf(enum brtnode_verify_type bft) { ...@@ -804,8 +774,6 @@ test_serialize_leaf(enum brtnode_verify_type bft) {
BLB_NBYTESINBUF(&sn, 1) = 1*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(&sn, 1)); BLB_NBYTESINBUF(&sn, 1) = 1*(KEY_VALUE_OVERHEAD+2+5) + toku_omt_size(BLB_BUFFER(&sn, 1));
for (int i = 0; i < 2; ++i) { for (int i = 0; i < 2; ++i) {
BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION; BLB_OPTIMIZEDFORUPGRADE(&sn, i) = BRT_LAYOUT_VERSION;
BLB_SOFTCOPYISUPTODATE(&sn, i) = TRUE;
BLB_SEQINSERT(&sn, i) = 0;
} }
struct brt *XMALLOC(brt); struct brt *XMALLOC(brt);
...@@ -867,14 +835,11 @@ test_serialize_leaf(enum brtnode_verify_type bft) { ...@@ -867,14 +835,11 @@ test_serialize_leaf(enum brtnode_verify_type bft) {
for (int i = 0; i < sn.n_children-1; ++i) { for (int i = 0; i < sn.n_children-1; ++i) {
kv_pair_free(sn.childkeys[i]); kv_pair_free(sn.childkeys[i]);
} }
for (int i = 0; i < sn.n_children; ++i) {
toku_omt_destroy(&BLB_BUFFER(&sn, i));
}
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
toku_free(elts[i]); toku_free(elts[i]);
} }
for (int i = 0; i < sn.n_children; i++) { for (int i = 0; i < sn.n_children; i++) {
toku_free(sn.bp[i].ptr); destroy_basement_node(BLB(&sn, i));
} }
toku_free(sn.bp); toku_free(sn.bp);
toku_free(sn.childkeys); toku_free(sn.childkeys);
...@@ -927,10 +892,8 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) { ...@@ -927,10 +892,8 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
BP_SUBTREE_EST(&sn,1).exact = (BOOL)(random()%2 != 0); BP_SUBTREE_EST(&sn,1).exact = (BOOL)(random()%2 != 0);
BP_STATE(&sn,0) = PT_AVAIL; BP_STATE(&sn,0) = PT_AVAIL;
BP_STATE(&sn,1) = PT_AVAIL; BP_STATE(&sn,1) = PT_AVAIL;
sn.bp[0].ptr = toku_xmalloc(sizeof(struct brtnode_nonleaf_childinfo)); set_BNC(&sn, 0, toku_create_empty_nl());
sn.bp[1].ptr = toku_xmalloc(sizeof(struct brtnode_nonleaf_childinfo)); set_BNC(&sn, 1, toku_create_empty_nl());
r = toku_fifo_create(&BNC_BUFFER(&sn,0)); assert(r==0);
r = toku_fifo_create(&BNC_BUFFER(&sn,1)); assert(r==0);
//Create XIDS //Create XIDS
XIDS xids_0 = xids_get_root_xids(); XIDS xids_0 = xids_get_root_xids();
XIDS xids_123; XIDS xids_123;
...@@ -1000,10 +963,8 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) { ...@@ -1000,10 +963,8 @@ test_serialize_nonleaf(enum brtnode_verify_type bft) {
kv_pair_free(sn.childkeys[0]); kv_pair_free(sn.childkeys[0]);
toku_free(hello_string); toku_free(hello_string);
toku_fifo_free(&BNC_BUFFER(&sn,0)); destroy_nonleaf_childinfo(BNC(&sn, 0));
toku_fifo_free(&BNC_BUFFER(&sn,1)); destroy_nonleaf_childinfo(BNC(&sn, 1));
toku_free(sn.bp[0].ptr);
toku_free(sn.bp[1].ptr);
toku_free(sn.bp); toku_free(sn.bp);
toku_free(sn.childkeys); toku_free(sn.childkeys);
......
...@@ -30,7 +30,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen ...@@ -30,7 +30,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL); brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -37,7 +37,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen ...@@ -37,7 +37,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node // apply an insert to the leaf node
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL); brt_leaf_apply_cmd_once(BLB(leafnode,0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
leafnode->max_msn_applied_to_node_on_disk = msn; leafnode->max_msn_applied_to_node_on_disk = msn;
leafnode->max_msn_applied_to_node_in_memory = msn; leafnode->max_msn_applied_to_node_in_memory = msn;
......
...@@ -11,6 +11,10 @@ ...@@ -11,6 +11,10 @@
// - inject message with old msn, verify that row still has value2 (verify cmd.msn < node.max_msn is rejected) // - inject message with old msn, verify that row still has value2 (verify cmd.msn < node.max_msn is rejected)
// TODO:
// - verify that no work is done by messages that should be ignored (via workdone arg to brt_leaf_put_cmd())
// - maybe get counter of messages ignored for old msn (once the counter is implemented in brt.c)
#include "brt-internal.h" #include "brt-internal.h"
#include "includes.h" #include "includes.h"
#include "test.h" #include "test.h"
...@@ -40,8 +44,9 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size ...@@ -40,8 +44,9 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
int made_change; bool made_change;
toku_apply_cmd_to_leaf(brt, leafnode, &cmd, &made_change); u_int64_t workdone=0;
toku_apply_cmd_to_leaf(brt, leafnode, &cmd, &made_change, &workdone);
{ {
int r = toku_brt_lookup(brt, &thekey, lookup_checkf, &pair); int r = toku_brt_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0); assert(r==0);
...@@ -49,7 +54,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size ...@@ -49,7 +54,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
} }
BRT_MSG_S badcmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } }; BRT_MSG_S badcmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_apply_cmd_to_leaf(brt, leafnode, &badcmd, &made_change); toku_apply_cmd_to_leaf(brt, leafnode, &badcmd, &made_change, &workdone);
// message should be rejected for duplicate msn, row should still have original val // message should be rejected for duplicate msn, row should still have original val
...@@ -62,7 +67,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size ...@@ -62,7 +67,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
// now verify that message with proper msn gets through // now verify that message with proper msn gets through
msn = next_dummymsn(); msn = next_dummymsn();
BRT_MSG_S cmd2 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &val2 } }; BRT_MSG_S cmd2 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &val2 } };
toku_apply_cmd_to_leaf(brt, leafnode, &cmd2, &made_change); toku_apply_cmd_to_leaf(brt, leafnode, &cmd2, &made_change, &workdone);
// message should be accepted, val should have new value // message should be accepted, val should have new value
{ {
...@@ -75,7 +80,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size ...@@ -75,7 +80,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
// now verify that message with lesser (older) msn is rejected // now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10; msn.msn = msn.msn - 10;
BRT_MSG_S cmd3 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } }; BRT_MSG_S cmd3 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_apply_cmd_to_leaf(brt, leafnode, &cmd3, &made_change); toku_apply_cmd_to_leaf(brt, leafnode, &cmd3, &made_change, &workdone);
// message should be rejected, val should still have value in pair2 // message should be rejected, val should still have value in pair2
{ {
......
...@@ -35,7 +35,8 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_ ...@@ -35,7 +35,8 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
if (verbose) if (verbose)
printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, sub_block_size, n_sub_blocks); printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, sub_block_size, n_sub_blocks);
struct sub_block sub_blocks[n_sub_blocks]; struct sub_block *sub_blocks[n_sub_blocks];
for (int i=0; i<n_sub_blocks; i++) sub_blocks[i] = sub_block_create();
set_all_sub_block_sizes(total_size, sub_block_size, n_sub_blocks, sub_blocks); set_all_sub_block_sizes(total_size, sub_block_size, n_sub_blocks, sub_blocks);
size_t cbuf_size_bound = get_sum_compressed_size_bound(n_sub_blocks, sub_blocks); size_t cbuf_size_bound = get_sum_compressed_size_bound(n_sub_blocks, sub_blocks);
...@@ -50,13 +51,13 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_ ...@@ -50,13 +51,13 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
for (int xidx = 0; xidx < n_sub_blocks; xidx++) { for (int xidx = 0; xidx < n_sub_blocks; xidx++) {
// corrupt a checksum // corrupt a checksum
sub_blocks[xidx].xsum += 1; sub_blocks[xidx]->xsum += 1;
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool); r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r != 0); assert(r != 0);
// reset the checksums // reset the checksums
sub_blocks[xidx].xsum -= 1; sub_blocks[xidx]->xsum -= 1;
r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool); r = decompress_all_sub_blocks(n_sub_blocks, sub_blocks, cbuf, ubuf, n_cores, pool);
assert(r == 0); assert(r == 0);
...@@ -77,7 +78,7 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_ ...@@ -77,7 +78,7 @@ test_sub_block_checksum(void *buf, int total_size, int my_max_sub_blocks, int n_
assert(r == 0); assert(r == 0);
assert(memcmp(buf, ubuf, total_size) == 0); assert(memcmp(buf, ubuf, total_size) == 0);
} }
for (int i=0; i<n_sub_blocks; i++) sub_block_destroy(sub_blocks[i]);
toku_free(ubuf); toku_free(ubuf);
toku_free(cbuf); toku_free(cbuf);
} }
......
...@@ -23,7 +23,8 @@ test_sub_block_compression(void *buf, int total_size, int my_max_sub_blocks, int ...@@ -23,7 +23,8 @@ test_sub_block_compression(void *buf, int total_size, int my_max_sub_blocks, int
if (verbose) if (verbose)
printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, sub_block_size, n_sub_blocks); printf("%s:%d %d %d\n", __FUNCTION__, __LINE__, sub_block_size, n_sub_blocks);
struct sub_block sub_blocks[n_sub_blocks]; struct sub_block *sub_blocks[n_sub_blocks];
for (int i=0; i<n_sub_blocks; i++) sub_blocks[i] = sub_block_create();
set_all_sub_block_sizes(total_size, sub_block_size, n_sub_blocks, sub_blocks); set_all_sub_block_sizes(total_size, sub_block_size, n_sub_blocks, sub_blocks);
size_t cbuf_size_bound = get_sum_compressed_size_bound(n_sub_blocks, sub_blocks); size_t cbuf_size_bound = get_sum_compressed_size_bound(n_sub_blocks, sub_blocks);
...@@ -41,6 +42,7 @@ test_sub_block_compression(void *buf, int total_size, int my_max_sub_blocks, int ...@@ -41,6 +42,7 @@ test_sub_block_compression(void *buf, int total_size, int my_max_sub_blocks, int
assert(memcmp(buf, ubuf, total_size) == 0); assert(memcmp(buf, ubuf, total_size) == 0);
for (int i=0; i<n_sub_blocks; i++) sub_block_destroy(sub_blocks[i]);
toku_free(ubuf); toku_free(ubuf);
toku_free(cbuf); toku_free(cbuf);
} }
......
...@@ -40,7 +40,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen ...@@ -40,7 +40,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node // apply an insert to the leaf node
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL); brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// Create bad tree (don't do following): // Create bad tree (don't do following):
// leafnode->max_msn_applied_to_node = msn; // leafnode->max_msn_applied_to_node = msn;
......
...@@ -28,7 +28,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen ...@@ -28,7 +28,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL); brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -29,7 +29,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen ...@@ -29,7 +29,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL); brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -28,7 +28,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen ...@@ -28,7 +28,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL); brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -29,7 +29,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen ...@@ -29,7 +29,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL); brt_leaf_apply_cmd_once(BLB(leafnode,0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -29,7 +29,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen ...@@ -29,7 +29,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL); brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
...@@ -28,7 +28,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen ...@@ -28,7 +28,7 @@ append_leaf(BRTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen
// apply an insert to the leaf node // apply an insert to the leaf node
MSN msn = next_dummymsn(); MSN msn = next_dummymsn();
BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } }; BRT_MSG_S cmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &theval } };
brt_leaf_apply_cmd_once((BASEMENTNODE)leafnode->bp[0].ptr, &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL); brt_leaf_apply_cmd_once(BLB(leafnode, 0), &BP_SUBTREE_EST(leafnode,0), &cmd, idx, NULL, NULL, NULL);
// dont forget to dirty the node // dont forget to dirty the node
leafnode->dirty = 1; leafnode->dirty = 1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment