Commit d0b857c6 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

refs #5559 merge 5559 to main


git-svn-id: file:///svn/toku/tokudb@50812 c7de825b-a66e-492c-adef-691d508d4ae1
parent 19ebffd8
...@@ -33,7 +33,7 @@ typedef struct flusher_advice FLUSHER_ADVICE; ...@@ -33,7 +33,7 @@ typedef struct flusher_advice FLUSHER_ADVICE;
typedef int (*FA_PICK_CHILD)(FT h, FTNODE parent, void* extra); typedef int (*FA_PICK_CHILD)(FT h, FTNODE parent, void* extra);
/** /**
* Decide whether to call `flush_some_child` on the child if it is * Decide whether to call `toku_ft_flush_some_child` on the child if it is
* stable and a nonleaf node. * stable and a nonleaf node.
* *
* Flusher threads: yes if child is gorged * Flusher threads: yes if child is gorged
...@@ -76,9 +76,9 @@ typedef bool (*FA_SHOULD_DESTROY_BN)(void* extra); ...@@ -76,9 +76,9 @@ typedef bool (*FA_SHOULD_DESTROY_BN)(void* extra);
/** /**
* Update `ft_flusher_status` in whatever way necessary. Called once * Update `ft_flusher_status` in whatever way necessary. Called once
* by `flush_some_child` right before choosing what to do next (split, * by `toku_ft_flush_some_child` right before choosing what to do next (split,
* merge, recurse), with the number of nodes that were dirtied by this * merge, recurse), with the number of nodes that were dirtied by this
* execution of `flush_some_child`. * execution of `toku_ft_flush_some_child`.
*/ */
typedef void (*FA_UPDATE_STATUS)(FTNODE child, int dirtied, void* extra); typedef void (*FA_UPDATE_STATUS)(FTNODE child, int dirtied, void* extra);
...@@ -109,17 +109,6 @@ struct flusher_advice { ...@@ -109,17 +109,6 @@ struct flusher_advice {
void* extra; // parameter passed into callbacks void* extra; // parameter passed into callbacks
}; };
// FIXME all of these need the toku prefix
//
// how about:
//
// toku_ftnode_flush_some_child()
// toku_fa_flusher_advice_init()
// toku_fa_always_recursively_flush()
// toku_fa_dont_destroy_basement_nodes()
// toku_fa_default_merge_child()
// toku_fa_default_pick_child_after_split()
void void
flusher_advice_init( flusher_advice_init(
struct flusher_advice *fa, struct flusher_advice *fa,
...@@ -132,11 +121,11 @@ flusher_advice_init( ...@@ -132,11 +121,11 @@ flusher_advice_init(
void* extra void* extra
); );
void void toku_ft_flush_some_child(
flush_some_child( FT ft,
FT h,
FTNODE parent, FTNODE parent,
struct flusher_advice *fa); struct flusher_advice *fa
);
bool bool
always_recursively_flush(FTNODE child, void* extra); always_recursively_flush(FTNODE child, void* extra);
......
...@@ -288,7 +288,7 @@ flt_update_status(FTNODE child, ...@@ -288,7 +288,7 @@ flt_update_status(FTNODE child,
{ {
struct flush_status_update_extra *fste = (struct flush_status_update_extra *) extra; struct flush_status_update_extra *fste = (struct flush_status_update_extra *) extra;
update_flush_status(child, fste->cascades); update_flush_status(child, fste->cascades);
// If `flush_some_child` decides to recurse after this, we'll need // If `toku_ft_flush_some_child` decides to recurse after this, we'll need
// cascades to increase. If not it doesn't matter. // cascades to increase. If not it doesn't matter.
fste->cascades++; fste->cascades++;
} }
...@@ -420,7 +420,7 @@ ct_maybe_merge_child(struct flusher_advice *fa, ...@@ -420,7 +420,7 @@ ct_maybe_merge_child(struct flusher_advice *fa,
(void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1); (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
(void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1); (void) toku_sync_fetch_and_add(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
flush_some_child(h, root_node, &new_fa); toku_ft_flush_some_child(h, root_node, &new_fa);
(void) toku_sync_fetch_and_sub(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1); (void) toku_sync_fetch_and_sub(&STATUS_VALUE(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
...@@ -436,7 +436,7 @@ ct_update_status(FTNODE child, ...@@ -436,7 +436,7 @@ ct_update_status(FTNODE child,
struct flush_status_update_extra* fste = (struct flush_status_update_extra *) extra; struct flush_status_update_extra* fste = (struct flush_status_update_extra *) extra;
update_flush_status(child, fste->cascades); update_flush_status(child, fste->cascades);
STATUS_VALUE(FT_FLUSHER_CLEANER_NODES_DIRTIED) += dirtied; STATUS_VALUE(FT_FLUSHER_CLEANER_NODES_DIRTIED) += dirtied;
// Incrementing this in case `flush_some_child` decides to recurse. // Incrementing this in case `toku_ft_flush_some_child` decides to recurse.
fste->cascades++; fste->cascades++;
} }
...@@ -719,6 +719,23 @@ move_leafentries( ...@@ -719,6 +719,23 @@ move_leafentries(
} }
} }
static void ftnode_finalize_split(FTNODE node, FTNODE B, MSN max_msn_applied_to_node) {
// Effect: Finalizes a split by updating some bits and dirtying both nodes
toku_assert_entire_node_in_memory(node);
toku_assert_entire_node_in_memory(B);
verify_all_in_mempool(node);
verify_all_in_mempool(B);
node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
// The new node in the split inherits the oldest known reference xid
B->oldest_known_referenced_xid = node->oldest_known_referenced_xid;
node->dirty = 1;
B->dirty = 1;
}
void void
ftleaf_split( ftleaf_split(
FT h, FT h,
...@@ -914,18 +931,9 @@ ftleaf_split( ...@@ -914,18 +931,9 @@ ftleaf_split(
REALLOC_N(num_children_in_node-1, node->childkeys); REALLOC_N(num_children_in_node-1, node->childkeys);
} }
verify_all_in_mempool(node); ftnode_finalize_split(node, B, max_msn_applied_to_node);
verify_all_in_mempool(B);
node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
node->dirty = 1;
B->dirty = 1;
*nodea = node; *nodea = node;
*nodeb = B; *nodeb = B;
} // end of ftleaf_split() } // end of ftleaf_split()
void void
...@@ -989,15 +997,7 @@ ft_nonleaf_split( ...@@ -989,15 +997,7 @@ ft_nonleaf_split(
REALLOC_N(n_children_in_a-1, node->childkeys); REALLOC_N(n_children_in_a-1, node->childkeys);
} }
node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node; ftnode_finalize_split(node, B, max_msn_applied_to_node);
B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
node->dirty = 1;
B->dirty = 1;
toku_assert_entire_node_in_memory(node);
toku_assert_entire_node_in_memory(B);
//VERIFY_NODE(t,node);
//VERIFY_NODE(t,B);
*nodea = node; *nodea = node;
*nodeb = B; *nodeb = B;
} }
...@@ -1050,12 +1050,12 @@ ft_split_child( ...@@ -1050,12 +1050,12 @@ ft_split_child(
if (picked_child == childnum || if (picked_child == childnum ||
(picked_child < 0 && nodea->height > 0 && fa->should_recursively_flush(nodea, fa->extra))) { (picked_child < 0 && nodea->height > 0 && fa->should_recursively_flush(nodea, fa->extra))) {
toku_unpin_ftnode_off_client_thread(h, nodeb); toku_unpin_ftnode_off_client_thread(h, nodeb);
flush_some_child(h, nodea, fa); toku_ft_flush_some_child(h, nodea, fa);
} }
else if (picked_child == childnum + 1 || else if (picked_child == childnum + 1 ||
(picked_child < 0 && nodeb->height > 0 && fa->should_recursively_flush(nodeb, fa->extra))) { (picked_child < 0 && nodeb->height > 0 && fa->should_recursively_flush(nodeb, fa->extra))) {
toku_unpin_ftnode_off_client_thread(h, nodea); toku_unpin_ftnode_off_client_thread(h, nodea);
flush_some_child(h, nodeb, fa); toku_ft_flush_some_child(h, nodeb, fa);
} }
else { else {
toku_unpin_ftnode_off_client_thread(h, nodea); toku_unpin_ftnode_off_client_thread(h, nodea);
...@@ -1063,6 +1063,21 @@ ft_split_child( ...@@ -1063,6 +1063,21 @@ ft_split_child(
} }
} }
static void bring_node_fully_into_memory(FTNODE node, FT ft) {
if (!is_entire_node_in_memory(node)) {
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, ft);
toku_cachetable_pf_pinned_pair(
node,
toku_ftnode_pf_callback,
&bfe,
ft->cf,
node->thisnodename,
toku_cachetable_hash(ft->cf, node->thisnodename)
);
}
}
static void static void
flush_this_child( flush_this_child(
FT h, FT h,
...@@ -1090,8 +1105,9 @@ flush_this_child( ...@@ -1090,8 +1105,9 @@ flush_this_child(
NONLEAF_CHILDINFO bnc = BNC(node, childnum); NONLEAF_CHILDINFO bnc = BNC(node, childnum);
set_BNC(node, childnum, toku_create_empty_nl()); set_BNC(node, childnum, toku_create_empty_nl());
// now we have a bnc to flush to the child // now we have a bnc to flush to the child. pass down the parent's
toku_bnc_flush_to_child(h, bnc, child); // oldest known referenced xid as we flush down to the child.
toku_bnc_flush_to_child(h, bnc, child, node->oldest_known_referenced_xid);
destroy_nonleaf_childinfo(bnc); destroy_nonleaf_childinfo(bnc);
} }
...@@ -1487,18 +1503,18 @@ ft_merge_child( ...@@ -1487,18 +1503,18 @@ ft_merge_child(
toku_unpin_ftnode_off_client_thread(h, childb); toku_unpin_ftnode_off_client_thread(h, childb);
} }
if (childa->height > 0 && fa->should_recursively_flush(childa, fa->extra)) { if (childa->height > 0 && fa->should_recursively_flush(childa, fa->extra)) {
flush_some_child(h, childa, fa); toku_ft_flush_some_child(h, childa, fa);
} }
else { else {
toku_unpin_ftnode_off_client_thread(h, childa); toku_unpin_ftnode_off_client_thread(h, childa);
} }
} }
void static void ft_flush_some_child_with_xid(
flush_some_child( FT ft,
FT h,
FTNODE parent, FTNODE parent,
struct flusher_advice *fa) struct flusher_advice *fa,
TXNID oldest_referenced_xid)
// Effect: This function does the following: // Effect: This function does the following:
// - Pick a child of parent (the heaviest child), // - Pick a child of parent (the heaviest child),
// - flush from parent to child, // - flush from parent to child,
...@@ -1514,27 +1530,27 @@ flush_some_child( ...@@ -1514,27 +1530,27 @@ flush_some_child(
toku_assert_entire_node_in_memory(parent); toku_assert_entire_node_in_memory(parent);
// pick the child we want to flush to // pick the child we want to flush to
int childnum = fa->pick_child(h, parent, fa->extra); int childnum = fa->pick_child(ft, parent, fa->extra);
// for test // for test
call_flusher_thread_callback(flt_flush_before_child_pin); call_flusher_thread_callback(flt_flush_before_child_pin);
// get the child into memory // get the child into memory
BLOCKNUM targetchild = BP_BLOCKNUM(parent, childnum); BLOCKNUM targetchild = BP_BLOCKNUM(parent, childnum);
toku_verify_blocknum_allocated(h->blocktable, targetchild); toku_verify_blocknum_allocated(ft->blocktable, targetchild);
uint32_t childfullhash = compute_child_fullhash(h->cf, parent, childnum); uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
FTNODE child; FTNODE child;
struct ftnode_fetch_extra bfe; struct ftnode_fetch_extra bfe;
// Note that we don't read the entire node into memory yet. // Note that we don't read the entire node into memory yet.
// The idea is let's try to do the minimum work before releasing the parent lock // The idea is let's try to do the minimum work before releasing the parent lock
fill_bfe_for_min_read(&bfe, h); fill_bfe_for_min_read(&bfe, ft);
toku_pin_ftnode_off_client_thread(h, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child); toku_pin_ftnode_off_client_thread(ft, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child);
// for test // for test
call_flusher_thread_callback(ft_flush_aflter_child_pin); call_flusher_thread_callback(ft_flush_aflter_child_pin);
if (fa->should_destroy_basement_nodes(fa)) { if (fa->should_destroy_basement_nodes(fa)) {
maybe_destroy_child_blbs(parent, child, h); maybe_destroy_child_blbs(parent, child, ft);
} }
//Note that at this point, we don't have the entire child in. //Note that at this point, we don't have the entire child in.
...@@ -1567,7 +1583,7 @@ flush_some_child( ...@@ -1567,7 +1583,7 @@ flush_some_child(
// reactive, we can unpin the parent // reactive, we can unpin the parent
// //
if (!may_child_be_reactive) { if (!may_child_be_reactive) {
toku_unpin_ftnode_off_client_thread(h, parent); toku_unpin_ftnode_off_client_thread(ft, parent);
parent = NULL; parent = NULL;
} }
...@@ -1575,7 +1591,7 @@ flush_some_child( ...@@ -1575,7 +1591,7 @@ flush_some_child(
// now, if necessary, read/decompress the rest of child into memory, // now, if necessary, read/decompress the rest of child into memory,
// so that we can proceed and apply the flush // so that we can proceed and apply the flush
// //
bring_node_fully_into_memory(child, h); bring_node_fully_into_memory(child, ft);
// It is possible after reading in the entire child, // It is possible after reading in the entire child,
// that we now know that the child is not reactive // that we now know that the child is not reactive
...@@ -1583,9 +1599,9 @@ flush_some_child( ...@@ -1583,9 +1599,9 @@ flush_some_child(
// we wont be splitting/merging child // we wont be splitting/merging child
// and we have already replaced the bnc // and we have already replaced the bnc
// for the root with a fresh one // for the root with a fresh one
enum reactivity child_re = get_node_reactivity(child, h->h->nodesize); enum reactivity child_re = get_node_reactivity(child, ft->h->nodesize);
if (parent && child_re == RE_STABLE) { if (parent && child_re == RE_STABLE) {
toku_unpin_ftnode_off_client_thread(h, parent); toku_unpin_ftnode_off_client_thread(ft, parent);
parent = NULL; parent = NULL;
} }
...@@ -1601,9 +1617,10 @@ flush_some_child( ...@@ -1601,9 +1617,10 @@ flush_some_child(
} }
// do the actual flush // do the actual flush
toku_bnc_flush_to_child( toku_bnc_flush_to_child(
h, ft,
bnc, bnc,
child child,
oldest_referenced_xid
); );
destroy_nonleaf_childinfo(bnc); destroy_nonleaf_childinfo(bnc);
} }
...@@ -1612,7 +1629,7 @@ flush_some_child( ...@@ -1612,7 +1629,7 @@ flush_some_child(
// let's get the reactivity of the child again, // let's get the reactivity of the child again,
// it is possible that the flush got rid of some values // it is possible that the flush got rid of some values
// and now the parent is no longer reactive // and now the parent is no longer reactive
child_re = get_node_reactivity(child, h->h->nodesize); child_re = get_node_reactivity(child, ft->h->nodesize);
// if the parent has been unpinned above, then // if the parent has been unpinned above, then
// this is our only option, even if the child is not stable // this is our only option, even if the child is not stable
// if the child is not stable, we'll handle it the next // if the child is not stable, we'll handle it the next
...@@ -1623,17 +1640,17 @@ flush_some_child( ...@@ -1623,17 +1640,17 @@ flush_some_child(
) )
{ {
if (parent) { if (parent) {
toku_unpin_ftnode_off_client_thread(h, parent); toku_unpin_ftnode_off_client_thread(ft, parent);
parent = NULL; parent = NULL;
} }
// //
// it is the responsibility of flush_some_child to unpin child // it is the responsibility of ft_flush_some_child_with_xid to unpin child
// //
if (child->height > 0 && fa->should_recursively_flush(child, fa->extra)) { if (child->height > 0 && fa->should_recursively_flush(child, fa->extra)) {
flush_some_child(h, child, fa); ft_flush_some_child_with_xid(ft, child, fa, oldest_referenced_xid);
} }
else { else {
toku_unpin_ftnode_off_client_thread(h, child); toku_unpin_ftnode_off_client_thread(ft, child);
} }
} }
else if (child_re == RE_FISSIBLE) { else if (child_re == RE_FISSIBLE) {
...@@ -1642,7 +1659,7 @@ flush_some_child( ...@@ -1642,7 +1659,7 @@ flush_some_child(
// parent and child as it sees fit // parent and child as it sees fit
// //
paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
ft_split_child(h, parent, childnum, child, SPLIT_EVENLY, fa); ft_split_child(ft, parent, childnum, child, SPLIT_EVENLY, fa);
} }
else if (child_re == RE_FUSIBLE) { else if (child_re == RE_FUSIBLE) {
// //
...@@ -1650,13 +1667,20 @@ flush_some_child( ...@@ -1650,13 +1667,20 @@ flush_some_child(
// parent and child as it sees fit // parent and child as it sees fit
// //
paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
fa->maybe_merge_child(fa, h, parent, childnum, child, fa->extra); fa->maybe_merge_child(fa, ft, parent, childnum, child, fa->extra);
} }
else { else {
abort(); abort();
} }
} }
void toku_ft_flush_some_child(FT ft, FTNODE parent, struct flusher_advice *fa) {
// Vanilla flush_some_child flushes from parent to child without
// providing a meaningful oldest_referenced_xid. No simple garbage
// collection is performed.
return ft_flush_some_child_with_xid(ft, parent, fa, TXNID_NONE);
}
static void static void
update_cleaner_status( update_cleaner_status(
FTNODE node, FTNODE node,
...@@ -1782,12 +1806,12 @@ toku_ftnode_cleaner_callback( ...@@ -1782,12 +1806,12 @@ toku_ftnode_cleaner_callback(
int childnum = find_heaviest_child(node); int childnum = find_heaviest_child(node);
update_cleaner_status(node, childnum); update_cleaner_status(node, childnum);
// Either flush_some_child will unlock the node, or we do it here. // Either toku_ft_flush_some_child will unlock the node, or we do it here.
if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) { if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) {
struct flusher_advice fa; struct flusher_advice fa;
struct flush_status_update_extra fste; struct flush_status_update_extra fste;
ct_flusher_advice_init(&fa, &fste, h->h->nodesize); ct_flusher_advice_init(&fa, &fste, h->h->nodesize);
flush_some_child(h, node, &fa); toku_ft_flush_some_child(h, node, &fa);
} else { } else {
toku_unpin_ftnode_off_client_thread(h, node); toku_unpin_ftnode_off_client_thread(h, node);
} }
...@@ -1798,6 +1822,7 @@ struct flusher_extra { ...@@ -1798,6 +1822,7 @@ struct flusher_extra {
FT h; FT h;
FTNODE node; FTNODE node;
NONLEAF_CHILDINFO bnc; NONLEAF_CHILDINFO bnc;
TXNID oldest_referenced_xid;
}; };
// //
...@@ -1834,16 +1859,17 @@ static void flush_node_fun(void *fe_v) ...@@ -1834,16 +1859,17 @@ static void flush_node_fun(void *fe_v)
toku_bnc_flush_to_child( toku_bnc_flush_to_child(
fe->h, fe->h,
fe->bnc, fe->bnc,
fe->node fe->node,
fe->oldest_referenced_xid
); );
destroy_nonleaf_childinfo(fe->bnc); destroy_nonleaf_childinfo(fe->bnc);
// after the flush has completed, now check to see if the node needs flushing // after the flush has completed, now check to see if the node needs flushing
// If so, call flush_some_child on the node, and it is the responsibility // If so, call ft_flush_some_child_with_xid on the node (because this flush intends to
// of flush_some_child to unlock the node // pass a meaningful oldest referenced xid for simple garbage collection), and it is the
// otherwise, we unlock the node here. // responsibility of the flush to unlock the node. otherwise, we unlock it here.
if (fe->node->height > 0 && toku_ft_nonleaf_is_gorged(fe->node, fe->h->h->nodesize)) { if (fe->node->height > 0 && toku_ft_nonleaf_is_gorged(fe->node, fe->h->h->nodesize)) {
flush_some_child(fe->h, fe->node, &fa); ft_flush_some_child_with_xid(fe->h, fe->node, &fa, fe->oldest_referenced_xid);
} }
else { else {
toku_unpin_ftnode_off_client_thread(fe->h,fe->node); toku_unpin_ftnode_off_client_thread(fe->h,fe->node);
...@@ -1853,8 +1879,8 @@ static void flush_node_fun(void *fe_v) ...@@ -1853,8 +1879,8 @@ static void flush_node_fun(void *fe_v)
// In this case, we were just passed a node with no // In this case, we were just passed a node with no
// bnc, which means we are tasked with flushing some // bnc, which means we are tasked with flushing some
// buffer in the node. // buffer in the node.
// It is the responsibility of flush_some_child to unlock the node // It is the responsibility of flush some child to unlock the node
flush_some_child(fe->h, fe->node, &fa); ft_flush_some_child_with_xid(fe->h, fe->node, &fa, fe->oldest_referenced_xid);
} }
remove_background_job_from_cf(fe->h->cf); remove_background_job_from_cf(fe->h->cf);
toku_free(fe); toku_free(fe);
...@@ -1864,12 +1890,14 @@ static void ...@@ -1864,12 +1890,14 @@ static void
place_node_and_bnc_on_background_thread( place_node_and_bnc_on_background_thread(
FT h, FT h,
FTNODE node, FTNODE node,
NONLEAF_CHILDINFO bnc) NONLEAF_CHILDINFO bnc,
TXNID oldest_referenced_xid)
{ {
struct flusher_extra *XMALLOC(fe); struct flusher_extra *XMALLOC(fe);
fe->h = h; fe->h = h;
fe->node = node; fe->node = node;
fe->bnc = bnc; fe->bnc = bnc;
fe->oldest_referenced_xid = oldest_referenced_xid;
cachefile_kibbutz_enq(h->cf, flush_node_fun, fe); cachefile_kibbutz_enq(h->cf, flush_node_fun, fe);
} }
...@@ -1886,9 +1914,9 @@ place_node_and_bnc_on_background_thread( ...@@ -1886,9 +1914,9 @@ place_node_and_bnc_on_background_thread(
// child needs to be split/merged), then we place the parent on the background thread. // child needs to be split/merged), then we place the parent on the background thread.
// The parent will be unlocked on the background thread // The parent will be unlocked on the background thread
// //
void void toku_ft_flush_node_on_background_thread(FT h, FTNODE parent)
flush_node_on_background_thread(FT h, FTNODE parent)
{ {
TXNID oldest_known_referenced_xid = parent->oldest_known_referenced_xid;
// //
// first let's see if we can detach buffer on client thread // first let's see if we can detach buffer on client thread
// and pick the child we want to flush to // and pick the child we want to flush to
...@@ -1903,9 +1931,9 @@ flush_node_on_background_thread(FT h, FTNODE parent) ...@@ -1903,9 +1931,9 @@ flush_node_on_background_thread(FT h, FTNODE parent)
int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child); int r = toku_maybe_pin_ftnode_clean(h, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child);
if (r != 0) { if (r != 0) {
// In this case, we could not lock the child, so just place the parent on the background thread // In this case, we could not lock the child, so just place the parent on the background thread
// In the callback, we will use flush_some_child, which checks to // In the callback, we will use toku_ft_flush_some_child, which checks to
// see if we should blow away the old basement nodes. // see if we should blow away the old basement nodes.
place_node_and_bnc_on_background_thread(h, parent, NULL); place_node_and_bnc_on_background_thread(h, parent, NULL, oldest_known_referenced_xid);
} }
else { else {
// //
...@@ -1934,7 +1962,7 @@ flush_node_on_background_thread(FT h, FTNODE parent) ...@@ -1934,7 +1962,7 @@ flush_node_on_background_thread(FT h, FTNODE parent)
// so, because we know for sure the child is not // so, because we know for sure the child is not
// reactive, we can unpin the parent // reactive, we can unpin the parent
// //
place_node_and_bnc_on_background_thread(h, child, bnc); place_node_and_bnc_on_background_thread(h, child, bnc, oldest_known_referenced_xid);
toku_unpin_ftnode(h, parent); toku_unpin_ftnode(h, parent);
} }
else { else {
...@@ -1944,7 +1972,7 @@ flush_node_on_background_thread(FT h, FTNODE parent) ...@@ -1944,7 +1972,7 @@ flush_node_on_background_thread(FT h, FTNODE parent)
toku_unpin_ftnode(h, child); toku_unpin_ftnode(h, child);
// Again, we'll have the parent on the background thread, so // Again, we'll have the parent on the background thread, so
// we don't need to destroy the basement nodes yet. // we don't need to destroy the basement nodes yet.
place_node_and_bnc_on_background_thread(h, parent, NULL); place_node_and_bnc_on_background_thread(h, parent, NULL, oldest_known_referenced_xid);
} }
} }
} }
......
...@@ -65,11 +65,11 @@ toku_flusher_thread_set_callback( ...@@ -65,11 +65,11 @@ toku_flusher_thread_set_callback(
/** /**
* Puts a workitem on the flusher thread queue, scheduling the node to be * Puts a workitem on the flusher thread queue, scheduling the node to be
* flushed by flush_some_child. * flushed by toku_ft_flush_some_child.
*/ */
void void
flush_node_on_background_thread( toku_ft_flush_node_on_background_thread(
FT h, FT ft,
FTNODE parent FTNODE parent
); );
......
...@@ -141,7 +141,7 @@ hot_update_flusher_keys(FTNODE parent, ...@@ -141,7 +141,7 @@ hot_update_flusher_keys(FTNODE parent,
} }
} }
// Picks which child flush_some_child will use for flushing and // Picks which child toku_ft_flush_some_child will use for flushing and
// recursion. // recursion.
static int static int
hot_pick_child(FT h, hot_pick_child(FT h,
...@@ -308,7 +308,7 @@ toku_ft_hot_optimize(FT_HANDLE brt, ...@@ -308,7 +308,7 @@ toku_ft_hot_optimize(FT_HANDLE brt,
// This should recurse to the bottom of the tree and then // This should recurse to the bottom of the tree and then
// return. // return.
if (root->height > 0) { if (root->height > 0) {
flush_some_child(brt->ft, root, &advice); toku_ft_flush_some_child(brt->ft, root, &advice);
} else { } else {
// Since there are no children to flush, we should abort // Since there are no children to flush, we should abort
// the HOT call. // the HOT call.
...@@ -318,7 +318,7 @@ toku_ft_hot_optimize(FT_HANDLE brt, ...@@ -318,7 +318,7 @@ toku_ft_hot_optimize(FT_HANDLE brt,
// Set the highest pivot key seen here, since the parent may // Set the highest pivot key seen here, since the parent may
// be unlocked and NULL'd later in our caller: // be unlocked and NULL'd later in our caller:
// flush_some_child(). // toku_ft_flush_some_child().
hot_set_highest_key(&flusher); hot_set_highest_key(&flusher);
// This is where we determine if the traversal is finished or // This is where we determine if the traversal is finished or
......
...@@ -137,7 +137,7 @@ long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc); ...@@ -137,7 +137,7 @@ long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc);
long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc); long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc);
void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, DESCRIPTOR desc, ft_compare_func cmp); void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, DESCRIPTOR desc, ft_compare_func cmp);
void toku_bnc_empty(NONLEAF_CHILDINFO bnc); void toku_bnc_empty(NONLEAF_CHILDINFO bnc);
void toku_bnc_flush_to_child(FT h, NONLEAF_CHILDINFO bnc, FTNODE child); void toku_bnc_flush_to_child(FT h, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID oldest_referenced_xid);
bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) __attribute__((const, nonnull)); bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) __attribute__((const, nonnull));
bool toku_ft_nonleaf_is_gorged(FTNODE node, uint32_t nodesize); bool toku_ft_nonleaf_is_gorged(FTNODE node, uint32_t nodesize);
...@@ -246,6 +246,17 @@ struct ftnode { ...@@ -246,6 +246,17 @@ struct ftnode {
unsigned int totalchildkeylens; unsigned int totalchildkeylens;
DBT *childkeys; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1]. DBT *childkeys; /* Pivot keys. Child 0's keys are <= childkeys[0]. Child 1's keys are <= childkeys[1].
Child 1's keys are > childkeys[0]. */ Child 1's keys are > childkeys[0]. */
// What's the oldest referenced xid that this node knows about? The real oldest
// referenced xid might be younger, but this is our best estimate. We use it
// as a heuristic to transition provisional mvcc entries from provisional to
// committed (from implicity committed to really committed).
//
// A better heuristic would be the oldest live txnid, but we use this since it
// still works well most of the time, and its readily available on the inject
// code path.
TXNID oldest_known_referenced_xid;
// array of size n_children, consisting of ftnode partitions // array of size n_children, consisting of ftnode partitions
// each one is associated with a child // each one is associated with a child
// for internal nodes, the ith partition corresponds to the ith message buffer // for internal nodes, the ith partition corresponds to the ith message buffer
...@@ -606,8 +617,6 @@ void toku_destroy_ftnode_internals(FTNODE node); ...@@ -606,8 +617,6 @@ void toku_destroy_ftnode_internals(FTNODE node);
void toku_ftnode_free (FTNODE *node); void toku_ftnode_free (FTNODE *node);
bool is_entire_node_in_memory(FTNODE node); bool is_entire_node_in_memory(FTNODE node);
void toku_assert_entire_node_in_memory(FTNODE node); void toku_assert_entire_node_in_memory(FTNODE node);
// FIXME needs toku prefix
void bring_node_fully_into_memory(FTNODE node, FT h);
// append a child node to a parent node // append a child node to a parent node
void toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey); void toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey);
...@@ -1092,7 +1101,6 @@ toku_ft_leaf_apply_cmd ( ...@@ -1092,7 +1101,6 @@ toku_ft_leaf_apply_cmd (
FTNODE node, FTNODE node,
int target_childnum, int target_childnum,
FT_MSG cmd, FT_MSG cmd,
TXNID oldest_referenced_xid,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
); );
...@@ -1107,7 +1115,6 @@ toku_ft_node_put_cmd ( ...@@ -1107,7 +1115,6 @@ toku_ft_node_put_cmd (
FT_MSG cmd, FT_MSG cmd,
bool is_fresh, bool is_fresh,
size_t flow_deltas[], size_t flow_deltas[],
TXNID oldest_referenced_xid,
STAT64INFO stats_to_update STAT64INFO stats_to_update
); );
......
...@@ -16,7 +16,7 @@ to insert a message at the root ...@@ -16,7 +16,7 @@ to insert a message at the root
- capture the next msn of the root node and assign it to the message - capture the next msn of the root node and assign it to the message
- split the root if it needs to be split - split the root if it needs to be split
- insert the message into the root buffer - insert the message into the root buffer
- if the root is too full, then flush_some_child() of the root on a flusher thread - if the root is too full, then toku_ft_flush_some_child() of the root on a flusher thread
flusher functions use an advice struct with provides some functions to flusher functions use an advice struct with provides some functions to
call that tell it what to do based on the context of the flush. see ft-flusher.h call that tell it what to do based on the context of the flush. see ft-flusher.h
...@@ -27,7 +27,7 @@ to flush some child, given a parent and some advice ...@@ -27,7 +27,7 @@ to flush some child, given a parent and some advice
- flush the buffer to the child - flush the buffer to the child
- if the child has stable reactivity and - if the child has stable reactivity and
advice->should_recursively_flush() is true, then advice->should_recursively_flush() is true, then
flush_some_child() of the child toku_ft_flush_some_child() of the child
- otherwise split the child if it needs to be split - otherwise split the child if it needs to be split
- otherwise maybe merge the child if it needs to be merged - otherwise maybe merge the child if it needs to be merged
...@@ -125,11 +125,13 @@ basement nodes, bulk fetch, and partial fetch: ...@@ -125,11 +125,13 @@ basement nodes, bulk fetch, and partial fetch:
#include "log-internal.h" #include "log-internal.h"
#include "sub_block.h" #include "sub_block.h"
#include "txn_manager.h" #include "txn_manager.h"
#include "ule.h" #include "leafentry.h"
#include "xids.h" #include "xids.h"
#include <toku_race_tools.h> #include <toku_race_tools.h>
#include <portability/toku_atomic.h> #include <portability/toku_atomic.h>
#include <util/mempool.h> #include <util/mempool.h>
#include <util/partitioned_counter.h> #include <util/partitioned_counter.h>
#include <util/rwlock.h> #include <util/rwlock.h>
...@@ -752,6 +754,7 @@ void toku_ftnode_clone_callback( ...@@ -752,6 +754,7 @@ void toku_ftnode_clone_callback(
rebalance_ftnode_leaf(node, ft->h->basementnodesize); rebalance_ftnode_leaf(node, ft->h->basementnodesize);
} }
cloned_node->oldest_known_referenced_xid = node->oldest_known_referenced_xid;
cloned_node->max_msn_applied_to_node_on_disk = node->max_msn_applied_to_node_on_disk; cloned_node->max_msn_applied_to_node_on_disk = node->max_msn_applied_to_node_on_disk;
cloned_node->flags = node->flags; cloned_node->flags = node->flags;
cloned_node->thisnodename = node->thisnodename; cloned_node->thisnodename = node->thisnodename;
...@@ -1374,6 +1377,7 @@ toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_c ...@@ -1374,6 +1377,7 @@ toku_initialize_empty_ftnode (FTNODE n, BLOCKNUM nodename, int height, int num_c
n->childkeys = 0; n->childkeys = 0;
n->bp = 0; n->bp = 0;
n->n_children = num_children; n->n_children = num_children;
n->oldest_known_referenced_xid = TXNID_NONE;
if (num_children > 0) { if (num_children > 0) {
XMALLOC_N(num_children-1, n->childkeys); XMALLOC_N(num_children-1, n->childkeys);
...@@ -1548,13 +1552,10 @@ toku_ft_bn_apply_cmd_once ( ...@@ -1548,13 +1552,10 @@ toku_ft_bn_apply_cmd_once (
if (le) if (le)
oldsize = leafentry_memsize(le); oldsize = leafentry_memsize(le);
// apply_msg_to_leafentry() may call mempool_malloc_from_omt() to allocate more space. // toku_le_apply_msg() may call mempool_malloc_from_omt() to allocate more space.
// That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is // That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is
// no longer in use. We'll have to release the old mempool later. // no longer in use. We'll have to release the old mempool later.
{ toku_le_apply_msg(cmd, le, oldest_referenced_xid, &newsize, &new_le, &bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta);
int r = apply_msg_to_leafentry(cmd, le, oldest_referenced_xid, &newsize, &new_le, &bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta);
invariant(r==0);
}
if (new_le) { if (new_le) {
paranoid_invariant(newsize == leafentry_disksize(new_le)); paranoid_invariant(newsize == leafentry_disksize(new_le));
...@@ -1734,7 +1735,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1734,7 +1735,7 @@ toku_ft_bn_apply_cmd (
DESCRIPTOR desc, DESCRIPTOR desc,
BASEMENTNODE bn, BASEMENTNODE bn,
FT_MSG cmd, FT_MSG cmd,
TXNID oldest_referenced_xid, TXNID oldest_known_referenced_xid,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
...@@ -1776,7 +1777,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1776,7 +1777,7 @@ toku_ft_bn_apply_cmd (
assert_zero(r); assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
} }
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
// if the insertion point is within a window of the right edge of // if the insertion point is within a window of the right edge of
// the leaf then it is sequential // the leaf then it is sequential
...@@ -1804,7 +1805,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1804,7 +1805,7 @@ toku_ft_bn_apply_cmd (
if (r == DB_NOTFOUND) break; if (r == DB_NOTFOUND) break;
assert_zero(r); assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
break; break;
} }
...@@ -1820,7 +1821,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1820,7 +1821,7 @@ toku_ft_bn_apply_cmd (
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
int deleted = 0; int deleted = 0;
if (!le_is_clean(storeddata)) { //If already clean, nothing to do. if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
uint32_t new_omt_size = toku_omt_size(bn->buffer); uint32_t new_omt_size = toku_omt_size(bn->buffer);
if (new_omt_size != omt_size) { if (new_omt_size != omt_size) {
paranoid_invariant(new_omt_size+1 == omt_size); paranoid_invariant(new_omt_size+1 == omt_size);
...@@ -1846,7 +1847,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1846,7 +1847,7 @@ toku_ft_bn_apply_cmd (
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
int deleted = 0; int deleted = 0;
if (le_has_xids(storeddata, cmd->xids)) { if (le_has_xids(storeddata, cmd->xids)) {
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update); toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
uint32_t new_omt_size = toku_omt_size(bn->buffer); uint32_t new_omt_size = toku_omt_size(bn->buffer);
if (new_omt_size != omt_size) { if (new_omt_size != omt_size) {
paranoid_invariant(new_omt_size+1 == omt_size); paranoid_invariant(new_omt_size+1 == omt_size);
...@@ -1867,10 +1868,10 @@ toku_ft_bn_apply_cmd ( ...@@ -1867,10 +1868,10 @@ toku_ft_bn_apply_cmd (
r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be, r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
&storeddatav, &idx); &storeddatav, &idx);
if (r==DB_NOTFOUND) { if (r==DB_NOTFOUND) {
r = do_update(update_fun, desc, bn, cmd, idx, NULL, oldest_referenced_xid, workdone, stats_to_update); r = do_update(update_fun, desc, bn, cmd, idx, NULL, oldest_known_referenced_xid, workdone, stats_to_update);
} else if (r==0) { } else if (r==0) {
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update); r = do_update(update_fun, desc, bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
} // otherwise, a worse error, just return it } // otherwise, a worse error, just return it
break; break;
} }
...@@ -1882,7 +1883,7 @@ toku_ft_bn_apply_cmd ( ...@@ -1882,7 +1883,7 @@ toku_ft_bn_apply_cmd (
r = toku_omt_fetch(bn->buffer, idx, &storeddatav); r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
assert_zero(r); assert_zero(r);
CAST_FROM_VOIDP(storeddata, storeddatav); CAST_FROM_VOIDP(storeddata, storeddatav);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, oldest_referenced_xid, workdone, stats_to_update); r = do_update(update_fun, desc, bn, cmd, idx, storeddata, oldest_known_referenced_xid, workdone, stats_to_update);
assert_zero(r); assert_zero(r);
if (num_leafentries_before == toku_omt_size(bn->buffer)) { if (num_leafentries_before == toku_omt_size(bn->buffer)) {
...@@ -2147,13 +2148,18 @@ ft_basement_node_gc_once(BASEMENTNODE bn, ...@@ -2147,13 +2148,18 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
const xid_omt_t &snapshot_xids, const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids, const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns, const xid_omt_t &live_root_txns,
TXNID oldest_known_referenced_xid,
STAT64INFO_S * delta) STAT64INFO_S * delta)
{ {
paranoid_invariant(leaf_entry); paranoid_invariant(leaf_entry);
// There is no point in running GC if there is only one committed // Don't run garbage collection on non-mvcc leaf entries.
// leaf entry. if (leaf_entry->type != LE_MVCC) {
if (leaf_entry->type != LE_MVCC || leaf_entry->u.mvcc.num_cxrs <= 1) { // MAKE ACCESSOR goto exit;
}
// Don't run garbage collection if this leafentry decides it's not worth it.
if (!toku_le_worth_running_garbage_collection(leaf_entry, oldest_known_referenced_xid)) {
goto exit; goto exit;
} }
...@@ -2172,7 +2178,7 @@ ft_basement_node_gc_once(BASEMENTNODE bn, ...@@ -2172,7 +2178,7 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
// Cache the size of the leaf entry. // Cache the size of the leaf entry.
oldsize = leafentry_memsize(leaf_entry); oldsize = leafentry_memsize(leaf_entry);
garbage_collect_leafentry(leaf_entry, toku_le_garbage_collect(leaf_entry,
&new_leaf_entry, &new_leaf_entry,
&newsize, &newsize,
&bn->buffer, &bn->buffer,
...@@ -2180,7 +2186,8 @@ ft_basement_node_gc_once(BASEMENTNODE bn, ...@@ -2180,7 +2186,8 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
&maybe_free, &maybe_free,
snapshot_xids, snapshot_xids,
referenced_xids, referenced_xids,
live_root_txns); live_root_txns,
oldest_known_referenced_xid);
// These will represent the number of bytes and rows changed as // These will represent the number of bytes and rows changed as
// part of the garbage collection. // part of the garbage collection.
...@@ -2225,6 +2232,7 @@ basement_node_gc_all_les(BASEMENTNODE bn, ...@@ -2225,6 +2232,7 @@ basement_node_gc_all_les(BASEMENTNODE bn,
const xid_omt_t &snapshot_xids, const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids, const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns, const xid_omt_t &live_root_txns,
TXNID oldest_known_referenced_xid,
STAT64INFO_S * delta) STAT64INFO_S * delta)
{ {
int r = 0; int r = 0;
...@@ -2236,7 +2244,7 @@ basement_node_gc_all_les(BASEMENTNODE bn, ...@@ -2236,7 +2244,7 @@ basement_node_gc_all_les(BASEMENTNODE bn,
r = toku_omt_fetch(bn->buffer, index, &storedatav); r = toku_omt_fetch(bn->buffer, index, &storedatav);
assert_zero(r); assert_zero(r);
CAST_FROM_VOIDP(leaf_entry, storedatav); CAST_FROM_VOIDP(leaf_entry, storedatav);
ft_basement_node_gc_once(bn, index, leaf_entry, snapshot_xids, referenced_xids, live_root_txns, delta); ft_basement_node_gc_once(bn, index, leaf_entry, snapshot_xids, referenced_xids, live_root_txns, oldest_known_referenced_xid, delta);
// Check if the leaf entry was deleted or not. // Check if the leaf entry was deleted or not.
if (num_leafentries_before == toku_omt_size(bn->buffer)) { if (num_leafentries_before == toku_omt_size(bn->buffer)) {
++index; ++index;
...@@ -2244,13 +2252,14 @@ basement_node_gc_all_les(BASEMENTNODE bn, ...@@ -2244,13 +2252,14 @@ basement_node_gc_all_les(BASEMENTNODE bn,
} }
} }
// Garbage collect all leaf entires. // Garbage collect all leaf entires in all basement nodes.
static void static void
ft_leaf_gc_all_les(FTNODE node, ft_leaf_gc_all_les(FTNODE node,
FT h, FT ft,
const xid_omt_t &snapshot_xids, const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids, const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns) const xid_omt_t &live_root_txns,
TXNID oldest_known_referenced_xid)
{ {
toku_assert_entire_node_in_memory(node); toku_assert_entire_node_in_memory(node);
paranoid_invariant_zero(node->height); paranoid_invariant_zero(node->height);
...@@ -2261,15 +2270,16 @@ ft_leaf_gc_all_les(FTNODE node, ...@@ -2261,15 +2270,16 @@ ft_leaf_gc_all_les(FTNODE node,
STAT64INFO_S delta; STAT64INFO_S delta;
delta.numrows = 0; delta.numrows = 0;
delta.numbytes = 0; delta.numbytes = 0;
basement_node_gc_all_les(bn, snapshot_xids, referenced_xids, live_root_txns, &delta); basement_node_gc_all_les(bn, snapshot_xids, referenced_xids, live_root_txns, oldest_known_referenced_xid, &delta);
toku_ft_update_stats(&h->in_memory_stats, delta); toku_ft_update_stats(&ft->in_memory_stats, delta);
} }
} }
void toku_bnc_flush_to_child( void toku_bnc_flush_to_child(
FT h, FT ft,
NONLEAF_CHILDINFO bnc, NONLEAF_CHILDINFO bnc,
FTNODE child FTNODE child,
TXNID oldest_known_referenced_xid
) )
{ {
paranoid_invariant(bnc); paranoid_invariant(bnc);
...@@ -2293,25 +2303,25 @@ void toku_bnc_flush_to_child( ...@@ -2293,25 +2303,25 @@ void toku_bnc_flush_to_child(
flow_deltas[1] = FIFO_CURRENT_ENTRY_MEMSIZE; flow_deltas[1] = FIFO_CURRENT_ENTRY_MEMSIZE;
} }
toku_ft_node_put_cmd( toku_ft_node_put_cmd(
h->compare_fun, ft->compare_fun,
h->update_fun, ft->update_fun,
&h->cmp_descriptor, &ft->cmp_descriptor,
child, child,
-1, -1,
&ftcmd, &ftcmd,
is_fresh, is_fresh,
flow_deltas, flow_deltas,
TXNID_NONE,
&stats_delta &stats_delta
); );
remaining_memsize -= FIFO_CURRENT_ENTRY_MEMSIZE; remaining_memsize -= FIFO_CURRENT_ENTRY_MEMSIZE;
})); }));
invariant(remaining_memsize == 0); invariant(remaining_memsize == 0);
if (stats_delta.numbytes || stats_delta.numrows) { if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&h->in_memory_stats, stats_delta); toku_ft_update_stats(&ft->in_memory_stats, stats_delta);
} }
// Run garbage collection, if we are a leaf entry. // Run garbage collection, if we are a leaf entry.
TOKULOGGER logger = toku_cachefile_logger(h->cf); TOKULOGGER logger = toku_cachefile_logger(ft->cf);
if (child->height == 0 && logger) { if (child->height == 0 && logger) {
xid_omt_t snapshot_txnids; xid_omt_t snapshot_txnids;
rx_omt_t referenced_xids; rx_omt_t referenced_xids;
...@@ -2326,8 +2336,15 @@ void toku_bnc_flush_to_child( ...@@ -2326,8 +2336,15 @@ void toku_bnc_flush_to_child(
STATUS_INC(FT_MSG_BYTES_OUT, buffsize); STATUS_INC(FT_MSG_BYTES_OUT, buffsize);
// may be misleading if there's a broadcast message in there // may be misleading if there's a broadcast message in there
STATUS_INC(FT_MSG_BYTES_CURR, -buffsize); STATUS_INC(FT_MSG_BYTES_CURR, -buffsize);
// Perform the garbage collection.
ft_leaf_gc_all_les(child, h, snapshot_txnids, referenced_xids, live_root_txns); // Perform garbage collection. Provide a full snapshot of the transaction
// system plus the oldest known referenced xid that could have had messages
// applied to this leaf.
//
// Using the oldest xid in either the referenced_xids or live_root_txns
// snapshots is not sufficient, because there could be something older that is neither
// live nor referenced, but instead aborted somewhere above us as a message in the tree.
ft_leaf_gc_all_les(child, ft, snapshot_txnids, referenced_xids, live_root_txns, oldest_known_referenced_xid);
// Free the OMT's we used for garbage collecting. // Free the OMT's we used for garbage collecting.
snapshot_txnids.destroy(); snapshot_txnids.destroy();
...@@ -2342,22 +2359,6 @@ bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) { ...@@ -2342,22 +2359,6 @@ bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) {
return bnc->flow[0] >= flow_threshold || bnc->flow[1] >= flow_threshold; return bnc->flow[0] >= flow_threshold || bnc->flow[1] >= flow_threshold;
} }
void bring_node_fully_into_memory(FTNODE node, FT h)
{
if (!is_entire_node_in_memory(node)) {
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, h);
toku_cachetable_pf_pinned_pair(
node,
toku_ftnode_pf_callback,
&bfe,
h->cf,
node->thisnodename,
toku_cachetable_hash(h->cf, node->thisnodename)
);
}
}
void void
toku_ft_node_put_cmd ( toku_ft_node_put_cmd (
ft_compare_func compare_fun, ft_compare_func compare_fun,
...@@ -2368,7 +2369,6 @@ toku_ft_node_put_cmd ( ...@@ -2368,7 +2369,6 @@ toku_ft_node_put_cmd (
FT_MSG cmd, FT_MSG cmd,
bool is_fresh, bool is_fresh,
size_t flow_deltas[], size_t flow_deltas[],
TXNID oldest_referenced_xid,
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
// Effect: Push CMD into the subtree rooted at NODE. // Effect: Push CMD into the subtree rooted at NODE.
...@@ -2385,7 +2385,7 @@ toku_ft_node_put_cmd ( ...@@ -2385,7 +2385,7 @@ toku_ft_node_put_cmd (
// and instead defer to these functions // and instead defer to these functions
// //
if (node->height==0) { if (node->height==0) {
toku_ft_leaf_apply_cmd(compare_fun, update_fun, desc, node, target_childnum, cmd, oldest_referenced_xid, nullptr, stats_to_update); toku_ft_leaf_apply_cmd(compare_fun, update_fun, desc, node, target_childnum, cmd, nullptr, stats_to_update);
} else { } else {
ft_nonleaf_put_cmd(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas); ft_nonleaf_put_cmd(compare_fun, desc, node, target_childnum, cmd, is_fresh, flow_deltas);
} }
...@@ -2405,7 +2405,6 @@ void toku_ft_leaf_apply_cmd( ...@@ -2405,7 +2405,6 @@ void toku_ft_leaf_apply_cmd(
FTNODE node, FTNODE node,
int target_childnum, // which child to inject to, or -1 if unknown int target_childnum, // which child to inject to, or -1 if unknown
FT_MSG cmd, FT_MSG cmd,
TXNID oldest_referenced_xid,
uint64_t *workdone, uint64_t *workdone,
STAT64INFO stats_to_update STAT64INFO stats_to_update
) )
...@@ -2438,6 +2437,10 @@ void toku_ft_leaf_apply_cmd( ...@@ -2438,6 +2437,10 @@ void toku_ft_leaf_apply_cmd(
node->max_msn_applied_to_node_on_disk = cmd_msn; node->max_msn_applied_to_node_on_disk = cmd_msn;
} }
// Pass the oldest possible live xid value to each basementnode
// when we apply messages to them.
TXNID oldest_known_referenced_xid = node->oldest_known_referenced_xid;
if (ft_msg_applies_once(cmd)) { if (ft_msg_applies_once(cmd)) {
unsigned int childnum = (target_childnum >= 0 unsigned int childnum = (target_childnum >= 0
? target_childnum ? target_childnum
...@@ -2450,7 +2453,7 @@ void toku_ft_leaf_apply_cmd( ...@@ -2450,7 +2453,7 @@ void toku_ft_leaf_apply_cmd(
desc, desc,
bn, bn,
cmd, cmd,
oldest_referenced_xid, oldest_known_referenced_xid,
workdone, workdone,
stats_to_update); stats_to_update);
} else { } else {
...@@ -2466,7 +2469,7 @@ void toku_ft_leaf_apply_cmd( ...@@ -2466,7 +2469,7 @@ void toku_ft_leaf_apply_cmd(
desc, desc,
BLB(node, childnum), BLB(node, childnum),
cmd, cmd,
oldest_referenced_xid, oldest_known_referenced_xid,
workdone, workdone,
stats_to_update); stats_to_update);
} else { } else {
...@@ -2495,6 +2498,14 @@ static void inject_message_in_locked_node( ...@@ -2495,6 +2498,14 @@ static void inject_message_in_locked_node(
// otherwise. // otherwise.
invariant(toku_ctpair_is_write_locked(node->ct_pair)); invariant(toku_ctpair_is_write_locked(node->ct_pair));
toku_assert_entire_node_in_memory(node); toku_assert_entire_node_in_memory(node);
// Update the oldest known referenced xid for this node if it is younger
// than the one currently known. Otherwise, it's better to keep the heurstic
// we have and ignore this one.
if (oldest_referenced_xid >= node->oldest_known_referenced_xid) {
node->oldest_known_referenced_xid = oldest_referenced_xid;
}
// Get the MSN from the header. Now that we have a write lock on the // Get the MSN from the header. Now that we have a write lock on the
// node we're injecting into, we know no other thread will get an MSN // node we're injecting into, we know no other thread will get an MSN
// after us and get that message into our subtree before us. // after us and get that message into our subtree before us.
...@@ -2510,7 +2521,6 @@ static void inject_message_in_locked_node( ...@@ -2510,7 +2521,6 @@ static void inject_message_in_locked_node(
cmd, cmd,
true, true,
flow_deltas, flow_deltas,
oldest_referenced_xid,
&stats_delta &stats_delta
); );
if (stats_delta.numbytes || stats_delta.numrows) { if (stats_delta.numbytes || stats_delta.numrows) {
...@@ -2538,10 +2548,10 @@ static void inject_message_in_locked_node( ...@@ -2538,10 +2548,10 @@ static void inject_message_in_locked_node(
// verify that msn of latest message was captured in root node // verify that msn of latest message was captured in root node
paranoid_invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn); paranoid_invariant(cmd->msn.msn == node->max_msn_applied_to_node_on_disk.msn);
// if we call flush_some_child, then that function unpins the root // if we call toku_ft_flush_some_child, then that function unpins the root
// otherwise, we unpin ourselves // otherwise, we unpin ourselves
if (node->height > 0 && toku_ft_nonleaf_is_gorged(node, ft->h->nodesize)) { if (node->height > 0 && toku_ft_nonleaf_is_gorged(node, ft->h->nodesize)) {
flush_node_on_background_thread(ft, node); toku_ft_flush_node_on_background_thread(ft, node);
} }
else { else {
toku_unpin_ftnode(ft, node); toku_unpin_ftnode(ft, node);
......
...@@ -379,6 +379,7 @@ serialize_ft_min_size (uint32_t version) { ...@@ -379,6 +379,7 @@ serialize_ft_min_size (uint32_t version) {
size_t size = 0; size_t size = 0;
switch(version) { switch(version) {
case FT_LAYOUT_VERSION_22:
case FT_LAYOUT_VERSION_21: case FT_LAYOUT_VERSION_21:
size += sizeof(MSN); // max_msn_in_ft size += sizeof(MSN); // max_msn_in_ft
case FT_LAYOUT_VERSION_20: case FT_LAYOUT_VERSION_20:
......
...@@ -146,7 +146,6 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char ...@@ -146,7 +146,6 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char
&cmd, &cmd,
true, true,
zero_flow_deltas, zero_flow_deltas,
TXNID_NONE,
NULL NULL
); );
......
...@@ -912,3 +912,64 @@ void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp ...@@ -912,3 +912,64 @@ void toku_ft_get_compression_method(FT ft, enum toku_compression_method *methodp
void toku_ft_set_blackhole(FT_HANDLE ft_handle) { void toku_ft_set_blackhole(FT_HANDLE ft_handle) {
ft_handle->ft->blackhole = true; ft_handle->ft->blackhole = true;
} }
struct garbage_helper_extra {
FT ft;
size_t total_space;
size_t used_space;
};
static int
garbage_leafentry_helper(OMTVALUE v, uint32_t UU(idx), void *extra) {
struct garbage_helper_extra *CAST_FROM_VOIDP(info, extra);
LEAFENTRY CAST_FROM_VOIDP(le, v);
info->total_space += leafentry_disksize(le);
info->used_space += LE_CLEAN_MEMSIZE(le_latest_keylen(le), le_latest_vallen(le));
return 0;
}
static int
garbage_helper(BLOCKNUM blocknum, int64_t UU(size), int64_t UU(address), void *extra) {
struct garbage_helper_extra *CAST_FROM_VOIDP(info, extra);
FTNODE node;
FTNODE_DISK_DATA ndd;
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, info->ft);
int fd = toku_cachefile_get_fd(info->ft->cf);
int r = toku_deserialize_ftnode_from(fd, blocknum, 0, &node, &ndd, &bfe);
if (r != 0) {
goto no_node;
}
if (node->height > 0) {
goto exit;
}
for (int i = 0; i < node->n_children; ++i) {
BASEMENTNODE bn = BLB(node, i);
r = toku_omt_iterate(bn->buffer, garbage_leafentry_helper, info);
if (r != 0) {
goto exit;
}
}
exit:
toku_ftnode_free(&node);
toku_free(ndd);
no_node:
return r;
}
void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space) {
// Effect: Iterates the FT's blocktable and calculates the total and used space for leaf blocks.
// Note: It is ok to call this function concurrently with reads/writes to the table since
// the blocktable lock is held, which means no new allocations or file writes can occur.
invariant_notnull(total_space);
invariant_notnull(used_space);
struct garbage_helper_extra info = {
.ft = ft,
.total_space = 0,
.used_space = 0
};
toku_blocktable_iterate(ft->blocktable, TRANSLATION_CHECKPOINTED, garbage_helper, &info, true, true);
*total_space = info.total_space;
*used_space = info.used_space;
}
...@@ -104,4 +104,8 @@ void toku_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p); ...@@ -104,4 +104,8 @@ void toku_node_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p);
// mark the ft as a blackhole. any message injections will be a no op. // mark the ft as a blackhole. any message injections will be a no op.
void toku_ft_set_blackhole(FT_HANDLE ft_handle); void toku_ft_set_blackhole(FT_HANDLE ft_handle);
// Effect: Calculates the total space and used space for a FT's leaf data.
// The difference between the two is MVCC garbage.
void toku_ft_get_garbage(FT ft, uint64_t *total_space, uint64_t *used_space);
#endif #endif
...@@ -30,6 +30,7 @@ enum ft_layout_version_e { ...@@ -30,6 +30,7 @@ enum ft_layout_version_e {
// last_xid to shutdown // last_xid to shutdown
FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header, FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header,
// Removed log suppression logentry // Removed log suppression logentry
FT_LAYOUT_VERSION_22 = 22, // Ming: Add oldest known referenced xid to each ftnode, for better garbage collection
FT_NEXT_VERSION, // the version after the current version FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line. FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
......
...@@ -367,6 +367,7 @@ serialize_ftnode_info_size(FTNODE node) ...@@ -367,6 +367,7 @@ serialize_ftnode_info_size(FTNODE node)
retval += 4; // nodesize retval += 4; // nodesize
retval += 4; // flags retval += 4; // flags
retval += 4; // height; retval += 4; // height;
retval += 8; // oldest_known_referenced_xid
retval += node->totalchildkeylens; // total length of pivots retval += node->totalchildkeylens; // total length of pivots
retval += (node->n_children-1)*4; // encode length of each pivot retval += (node->n_children-1)*4; // encode length of each pivot
if (node->height > 0) { if (node->height > 0) {
...@@ -390,6 +391,8 @@ static void serialize_ftnode_info(FTNODE node, ...@@ -390,6 +391,8 @@ static void serialize_ftnode_info(FTNODE node,
wbuf_nocrc_uint(&wb, 0); // write a dummy value for where node->nodesize used to be wbuf_nocrc_uint(&wb, 0); // write a dummy value for where node->nodesize used to be
wbuf_nocrc_uint(&wb, node->flags); wbuf_nocrc_uint(&wb, node->flags);
wbuf_nocrc_int (&wb, node->height); wbuf_nocrc_int (&wb, node->height);
wbuf_TXNID(&wb, node->oldest_known_referenced_xid);
// pivot information // pivot information
for (int i = 0; i < node->n_children-1; i++) { for (int i = 0; i < node->n_children-1; i++) {
wbuf_nocrc_bytes(&wb, node->childkeys[i].data, node->childkeys[i].size); wbuf_nocrc_bytes(&wb, node->childkeys[i].data, node->childkeys[i].size);
...@@ -1264,6 +1267,9 @@ deserialize_ftnode_info( ...@@ -1264,6 +1267,9 @@ deserialize_ftnode_info(
if (node->layout_version_read_from_disk < FT_LAYOUT_VERSION_19) { if (node->layout_version_read_from_disk < FT_LAYOUT_VERSION_19) {
(void) rbuf_int(&rb); // optimized_for_upgrade (void) rbuf_int(&rb); // optimized_for_upgrade
} }
if (node->layout_version_read_from_disk >= FT_LAYOUT_VERSION_22) {
rbuf_TXNID(&rb, &node->oldest_known_referenced_xid);
}
// now create the basement nodes or childinfos, depending on whether this is a // now create the basement nodes or childinfos, depending on whether this is a
// leaf node or internal node // leaf node or internal node
...@@ -1505,6 +1511,17 @@ exit: ...@@ -1505,6 +1511,17 @@ exit:
return r; return r;
} }
static FTNODE alloc_ftnode_for_deserialize(uint32_t fullhash, BLOCKNUM blocknum) {
// Effect: Allocate an FTNODE and fill in the values that are not read from
FTNODE XMALLOC(node);
node->fullhash = fullhash;
node->thisnodename = blocknum;
node->dirty = 0;
node->bp = nullptr;
node->oldest_known_referenced_xid = TXNID_NONE;
return node;
}
static int static int
deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode, deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
FTNODE_DISK_DATA* ndd, FTNODE_DISK_DATA* ndd,
...@@ -1518,13 +1535,7 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode, ...@@ -1518,13 +1535,7 @@ deserialize_ftnode_header_from_rbuf_if_small_enough (FTNODE *ftnode,
// Return 0 if it worked. If something goes wrong (including that we are looking at some old data format that doesn't have partitions) then return nonzero. // Return 0 if it worked. If something goes wrong (including that we are looking at some old data format that doesn't have partitions) then return nonzero.
{ {
int r = 0; int r = 0;
FTNODE XMALLOC(node); FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
// fill in values that are known and not stored in rb
node->fullhash = fullhash;
node->thisnodename = blocknum;
node->dirty = 0;
node->bp = NULL; // fill this in so we can free without a leak.
if (rb->size < 24) { if (rb->size < 24) {
// TODO: What error do we return here? // TODO: What error do we return here?
...@@ -2171,15 +2182,10 @@ deserialize_ftnode_from_rbuf( ...@@ -2171,15 +2182,10 @@ deserialize_ftnode_from_rbuf(
// Effect: deserializes a ftnode that is in rb (with pointer of rb just past the magic) into a FTNODE. // Effect: deserializes a ftnode that is in rb (with pointer of rb just past the magic) into a FTNODE.
{ {
int r = 0; int r = 0;
FTNODE XMALLOC(node);
struct sub_block sb_node_info; struct sub_block sb_node_info;
// fill in values that are known and not stored in rb FTNODE node = alloc_ftnode_for_deserialize(fullhash, blocknum);
node->fullhash = fullhash;
node->thisnodename = blocknum;
node->dirty = 0;
// now start reading from rbuf // now start reading from rbuf
// first thing we do is read the header information // first thing we do is read the header information
bytevec magic; bytevec magic;
rbuf_literal_bytes(rb, &magic, 8); rbuf_literal_bytes(rb, &magic, 8);
......
...@@ -280,61 +280,14 @@ dump_nodesizes(int f, FT h) { ...@@ -280,61 +280,14 @@ dump_nodesizes(int f, FT h) {
printf("leafsizes: %" PRIu64 "\n", info.leafsizes); printf("leafsizes: %" PRIu64 "\n", info.leafsizes);
} }
typedef struct {
int f;
FT h;
size_t total_space;
size_t used_space;
} garbage_help_extra;
static int
garbage_leafentry_helper(OMTVALUE v, uint32_t UU(idx), void *extra) {
garbage_help_extra *CAST_FROM_VOIDP(info, extra);
LEAFENTRY CAST_FROM_VOIDP(le, v);
info->total_space += leafentry_disksize(le);
info->used_space += LE_CLEAN_MEMSIZE(le_latest_keylen(le), le_latest_vallen(le));
return 0;
}
static int
garbage_helper(BLOCKNUM b, int64_t UU(size), int64_t UU(address), void *extra) {
garbage_help_extra *CAST_FROM_VOIDP(info, extra);
FTNODE n;
FTNODE_DISK_DATA ndd = NULL;
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, info->h);
int r = toku_deserialize_ftnode_from(info->f, b, 0, &n, &ndd, &bfe);
if (r != 0) {
goto no_node;
}
if (n->height > 0) {
goto exit;
}
for (int i = 0; i < n->n_children; ++i) {
BASEMENTNODE bn = BLB(n, i);
r = toku_omt_iterate(bn->buffer, garbage_leafentry_helper, info);
if (r != 0) {
goto exit;
}
}
exit:
toku_ftnode_free(&n);
toku_free(ndd);
no_node:
return r;
}
static void static void
dump_garbage_stats(int f, FT h) { dump_garbage_stats(int f, FT ft) {
garbage_help_extra info; invariant(f == toku_cachefile_get_fd(ft->cf));
memset(&info, 0, sizeof info); uint64_t total_space = 0;
info.f = f; uint64_t used_space = 0;
info.h = h; toku_ft_get_garbage(ft, &total_space, &used_space);
toku_blocktable_iterate(h->blocktable, TRANSLATION_CHECKPOINTED, printf("total_size: %zu\n", total_space);
garbage_helper, &info, true, true); printf("used_size: %zu\n", used_space);
printf("total_size: %zu\n", info.total_space);
printf("used_size: %zu\n", info.used_space);
} }
static uint32_t static uint32_t
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ifndef LEAFENTRY_H
#define LEAFENTRY_H #ifndef TOKU_LEAFENTRY_H
#define TOKU_LEAFENTRY_H
#ident "$Id$" #ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
...@@ -10,11 +12,12 @@ ...@@ -10,11 +12,12 @@
#include <util/mempool.h> #include <util/mempool.h>
#include "txn_manager.h"
#include "rbuf.h" #include "rbuf.h"
#include "x1764.h" #include "x1764.h"
#include "omt.h" #include "omt.h"
#if 0 /*
Memory format of packed leaf entry Memory format of packed leaf entry
CONSTANTS: CONSTANTS:
num_uxrs num_uxrs
...@@ -35,10 +38,7 @@ ...@@ -35,10 +38,7 @@
Run-time-constants Run-time-constants
key[] key[]
val[] val[]
#endif */
#if TOKU_WINDOWS
#pragma pack(push, 1)
#endif
// //
// enum of possible values for LEAFENTRY->type field // enum of possible values for LEAFENTRY->type field
...@@ -94,9 +94,6 @@ struct __attribute__ ((__packed__)) leafentry { ...@@ -94,9 +94,6 @@ struct __attribute__ ((__packed__)) leafentry {
}; };
static_assert(10 == sizeof(leafentry), "leafentry size is wrong"); static_assert(10 == sizeof(leafentry), "leafentry size is wrong");
static_assert(5 == __builtin_offsetof(leafentry, u), "union is in the wrong place"); static_assert(5 == __builtin_offsetof(leafentry, u), "union is in the wrong place");
#if TOKU_WINDOWS
#pragma pack(pop)
#endif
#define LE_CLEAN_MEMSIZE(_keylen, _vallen) \ #define LE_CLEAN_MEMSIZE(_keylen, _vallen) \
(sizeof(((LEAFENTRY)NULL)->type) /* type */ \ (sizeof(((LEAFENTRY)NULL)->type) /* type */ \
...@@ -123,6 +120,10 @@ static_assert(5 == __builtin_offsetof(leafentry, u), "union is in the wrong plac ...@@ -123,6 +120,10 @@ static_assert(5 == __builtin_offsetof(leafentry, u), "union is in the wrong plac
typedef struct leafentry *LEAFENTRY; typedef struct leafentry *LEAFENTRY;
typedef struct leafentry_13 *LEAFENTRY_13; typedef struct leafentry_13 *LEAFENTRY_13;
//
// TODO: consistency among names is very poor.
//
size_t leafentry_memsize (LEAFENTRY le); // the size of a leafentry in memory. size_t leafentry_memsize (LEAFENTRY le); // the size of a leafentry in memory.
size_t leafentry_disksize (LEAFENTRY le); // this is the same as logsizeof_LEAFENTRY. The size of a leafentry on disk. size_t leafentry_disksize (LEAFENTRY le); // this is the same as logsizeof_LEAFENTRY. The size of a leafentry on disk.
void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le); void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
...@@ -144,19 +145,6 @@ void* le_key_and_len (LEAFENTRY le, uint32_t *len); ...@@ -144,19 +145,6 @@ void* le_key_and_len (LEAFENTRY le, uint32_t *len);
uint64_t le_outermost_uncommitted_xid (LEAFENTRY le); uint64_t le_outermost_uncommitted_xid (LEAFENTRY le);
void
le_committed_mvcc(uint8_t *key, uint32_t keylen,
uint8_t *val, uint32_t vallen,
TXNID xid,
void (*bytes)(struct dbuf *dbuf, const void *bytes, int nbytes),
struct dbuf *d);
void
le_clean(uint8_t *key, uint32_t keylen,
uint8_t *val, uint32_t vallen,
void (*bytes)(struct dbuf *dbuf, const void *bytes, int nbytes),
struct dbuf *d);
//Callback contract: //Callback contract:
// Function checks to see if id is accepted by context. // Function checks to see if id is accepted by context.
// Returns: // Returns:
...@@ -169,9 +157,9 @@ int le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_empty, TOKUT ...@@ -169,9 +157,9 @@ int le_iterate_is_del(LEAFENTRY le, LE_ITERATE_CALLBACK f, bool *is_empty, TOKUT
int le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vallenp, TOKUTXN context); int le_iterate_val(LEAFENTRY le, LE_ITERATE_CALLBACK f, void** valpp, uint32_t *vallenp, TOKUTXN context);
size_t size_t
leafentry_disksize_13(LEAFENTRY_13 le); leafentry_disksize_13(LEAFENTRY_13 le);
int int
toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored data. toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored data.
size_t *new_leafentry_memorysize, size_t *new_leafentry_memorysize,
...@@ -179,7 +167,28 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored ...@@ -179,7 +167,28 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry, // NULL if there was no stored
OMT *omtp, OMT *omtp,
struct mempool *mp); struct mempool *mp);
void toku_le_apply_msg(FT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
TXNID oldest_referenced_xid,
size_t *new_leafentry_memorysize,
LEAFENTRY *new_leafentry_p,
OMT *omtp,
struct mempool *mp,
void **maybe_free,
int64_t * numbytes_delta_p);
bool toku_le_worth_running_garbage_collection(LEAFENTRY le, TXNID oldest_known_referenced_xid);
#endif void toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
LEAFENTRY *new_leaf_entry,
size_t *new_leaf_entry_memory_size,
OMT *omtp,
struct mempool *mp,
void **maybe_free,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns,
TXNID oldest_known_referenced_xid);
#endif /* TOKU_LEAFENTRY_H */
...@@ -48,7 +48,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va ...@@ -48,7 +48,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
brt->ft->h->max_msn_in_ft = msn; brt->ft->h->max_msn_in_ft = msn;
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} }; FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd, TXNID_NONE, nullptr, nullptr); toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd, nullptr, nullptr);
{ {
int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair); int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0); assert(r==0);
...@@ -56,7 +56,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va ...@@ -56,7 +56,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
} }
FT_MSG_S badcmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval }} }; FT_MSG_S badcmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &badcmd, TXNID_NONE, nullptr, nullptr); toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &badcmd, nullptr, nullptr);
// message should be rejected for duplicate msn, row should still have original val // message should be rejected for duplicate msn, row should still have original val
{ {
...@@ -69,7 +69,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va ...@@ -69,7 +69,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
msn = next_dummymsn(); msn = next_dummymsn();
brt->ft->h->max_msn_in_ft = msn; brt->ft->h->max_msn_in_ft = msn;
FT_MSG_S cmd2 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &val2 }} }; FT_MSG_S cmd2 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &val2 }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd2, TXNID_NONE, nullptr, nullptr); toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd2, nullptr, nullptr);
// message should be accepted, val should have new value // message should be accepted, val should have new value
{ {
...@@ -81,7 +81,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va ...@@ -81,7 +81,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
// now verify that message with lesser (older) msn is rejected // now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10; msn.msn = msn.msn - 10;
FT_MSG_S cmd3 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval } }}; FT_MSG_S cmd3 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval } }};
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd3, TXNID_NONE, nullptr, nullptr); toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd3, nullptr, nullptr);
// message should be rejected, val should still have value in pair2 // message should be rejected, val should still have value in pair2
{ {
......
...@@ -123,8 +123,7 @@ insert_random_message_to_bn(FT_HANDLE t, BASEMENTNODE blb, LEAFENTRY *save, XIDS ...@@ -123,8 +123,7 @@ insert_random_message_to_bn(FT_HANDLE t, BASEMENTNODE blb, LEAFENTRY *save, XIDS
msg.u.id.val = valdbt; msg.u.id.val = valdbt;
size_t memsize; size_t memsize;
int64_t numbytes; int64_t numbytes;
int r = apply_msg_to_leafentry(&msg, NULL, TXNID_NONE, &memsize, save, NULL, NULL, NULL, &numbytes); toku_le_apply_msg(&msg, NULL, TXNID_NONE, &memsize, save, NULL, NULL, NULL, &numbytes);
assert_zero(r);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb, &msg, TXNID_NONE, NULL, NULL); toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb, &msg, TXNID_NONE, NULL, NULL);
if (msn.msn > blb->max_msn_applied.msn) { if (msn.msn > blb->max_msn_applied.msn) {
blb->max_msn_applied = msn; blb->max_msn_applied = msn;
...@@ -164,8 +163,7 @@ insert_same_message_to_bns(FT_HANDLE t, BASEMENTNODE blb1, BASEMENTNODE blb2, LE ...@@ -164,8 +163,7 @@ insert_same_message_to_bns(FT_HANDLE t, BASEMENTNODE blb1, BASEMENTNODE blb2, LE
msg.u.id.val = valdbt; msg.u.id.val = valdbt;
size_t memsize; size_t memsize;
int64_t numbytes; int64_t numbytes;
int r = apply_msg_to_leafentry(&msg, NULL, TXNID_NONE, &memsize, save, NULL, NULL, NULL, &numbytes); toku_le_apply_msg(&msg, NULL, TXNID_NONE, &memsize, save, NULL, NULL, NULL, &numbytes);
assert_zero(r);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb1, &msg, TXNID_NONE, NULL, NULL); toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb1, &msg, TXNID_NONE, NULL, NULL);
if (msn.msn > blb1->max_msn_applied.msn) { if (msn.msn > blb1->max_msn_applied.msn) {
blb1->max_msn_applied = msn; blb1->max_msn_applied = msn;
...@@ -274,7 +272,7 @@ flush_to_internal(FT_HANDLE t) { ...@@ -274,7 +272,7 @@ flush_to_internal(FT_HANDLE t) {
set_BNC(child, 0, child_bnc); set_BNC(child, 0, child_bnc);
BP_STATE(child, 0) = PT_AVAIL; BP_STATE(child, 0) = PT_AVAIL;
toku_bnc_flush_to_child(t->ft, parent_bnc, child); toku_bnc_flush_to_child(t->ft, parent_bnc, child, TXNID_NONE);
int parent_messages_present[num_parent_messages]; int parent_messages_present[num_parent_messages];
int child_messages_present[num_child_messages]; int child_messages_present[num_child_messages];
...@@ -409,7 +407,7 @@ flush_to_internal_multiple(FT_HANDLE t) { ...@@ -409,7 +407,7 @@ flush_to_internal_multiple(FT_HANDLE t) {
} }
} }
toku_bnc_flush_to_child(t->ft, parent_bnc, child); toku_bnc_flush_to_child(t->ft, parent_bnc, child, TXNID_NONE);
int total_messages = 0; int total_messages = 0;
for (i = 0; i < 8; ++i) { for (i = 0; i < 8; ++i) {
...@@ -580,7 +578,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) { ...@@ -580,7 +578,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
if (make_leaf_up_to_date) { if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) { for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) { if (!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], TXNID_NONE, NULL, NULL); toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], NULL, NULL);
} }
} }
for (i = 0; i < 8; ++i) { for (i = 0; i < 8; ++i) {
...@@ -601,7 +599,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) { ...@@ -601,7 +599,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
} }
if (use_flush) { if (use_flush) {
toku_bnc_flush_to_child(t->ft, parent_bnc, child); toku_bnc_flush_to_child(t->ft, parent_bnc, child, TXNID_NONE);
destroy_nonleaf_childinfo(parent_bnc); destroy_nonleaf_childinfo(parent_bnc);
} else { } else {
FTNODE XMALLOC(parentnode); FTNODE XMALLOC(parentnode);
...@@ -803,7 +801,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) { ...@@ -803,7 +801,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) { for (i = 0; i < num_parent_messages; ++i) {
if (dummy_cmp(NULL, parent_messages[i]->u.id.key, &childkeys[7]) <= 0 && if (dummy_cmp(NULL, parent_messages[i]->u.id.key, &childkeys[7]) <= 0 &&
!parent_messages_is_fresh[i]) { !parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], TXNID_NONE, NULL, NULL); toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], NULL, NULL);
} }
} }
for (i = 0; i < 8; ++i) { for (i = 0; i < 8; ++i) {
...@@ -995,8 +993,8 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) { ...@@ -995,8 +993,8 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
if (make_leaf_up_to_date) { if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) { for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) { if (!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, -1, parent_messages[i], TXNID_NONE, NULL, NULL); toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, -1, parent_messages[i], NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, -1, parent_messages[i], TXNID_NONE, NULL, NULL); toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, -1, parent_messages[i], NULL, NULL);
} }
} }
for (i = 0; i < 8; ++i) { for (i = 0; i < 8; ++i) {
...@@ -1010,7 +1008,7 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) { ...@@ -1010,7 +1008,7 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
} }
} }
toku_bnc_flush_to_child(t->ft, parent_bnc, child1); toku_bnc_flush_to_child(t->ft, parent_bnc, child1, TXNID_NONE);
FTNODE XMALLOC(parentnode); FTNODE XMALLOC(parentnode);
BLOCKNUM parentblocknum = { 17 }; BLOCKNUM parentblocknum = { 17 };
......
...@@ -26,7 +26,7 @@ bool checkpoint_callback_called; ...@@ -26,7 +26,7 @@ bool checkpoint_callback_called;
toku_pthread_t checkpoint_tid; toku_pthread_t checkpoint_tid;
// callback functions for flush_some_child // callback functions for toku_ft_flush_some_child
static bool static bool
dont_destroy_bn(void* UU(extra)) dont_destroy_bn(void* UU(extra))
{ {
...@@ -160,7 +160,7 @@ doit (bool after_child_pin) { ...@@ -160,7 +160,7 @@ doit (bool after_child_pin) {
assert(toku_bnc_nbytesinbuf(BNC(node, 0)) > 0); assert(toku_bnc_nbytesinbuf(BNC(node, 0)) > 0);
// do the flush // do the flush
flush_some_child(t->ft, node, &fa); toku_ft_flush_some_child(t->ft, node, &fa);
assert(checkpoint_callback_called); assert(checkpoint_callback_called);
// now let's pin the root again and make sure it is flushed // now let's pin the root again and make sure it is flushed
......
...@@ -26,7 +26,7 @@ bool checkpoint_callback_called; ...@@ -26,7 +26,7 @@ bool checkpoint_callback_called;
toku_pthread_t checkpoint_tid; toku_pthread_t checkpoint_tid;
// callback functions for flush_some_child // callback functions for toku_ft_flush_some_child
static bool static bool
dont_destroy_bn(void* UU(extra)) dont_destroy_bn(void* UU(extra))
{ {
...@@ -177,7 +177,7 @@ doit (int state) { ...@@ -177,7 +177,7 @@ doit (int state) {
assert(node->n_children == 2); assert(node->n_children == 2);
// do the flush // do the flush
flush_some_child(t->ft, node, &fa); toku_ft_flush_some_child(t->ft, node, &fa);
assert(checkpoint_callback_called); assert(checkpoint_callback_called);
// now let's pin the root again and make sure it is has merged // now let's pin the root again and make sure it is has merged
......
...@@ -26,7 +26,7 @@ bool checkpoint_callback_called; ...@@ -26,7 +26,7 @@ bool checkpoint_callback_called;
toku_pthread_t checkpoint_tid; toku_pthread_t checkpoint_tid;
// callback functions for flush_some_child // callback functions for toku_ft_flush_some_child
static bool static bool
dont_destroy_bn(void* UU(extra)) dont_destroy_bn(void* UU(extra))
{ {
...@@ -197,7 +197,7 @@ doit (int state) { ...@@ -197,7 +197,7 @@ doit (int state) {
assert(node->n_children == 2); assert(node->n_children == 2);
// do the flush // do the flush
flush_some_child(t->ft, node, &fa); toku_ft_flush_some_child(t->ft, node, &fa);
assert(checkpoint_callback_called); assert(checkpoint_callback_called);
// now let's pin the root again and make sure it is has rebalanced // now let's pin the root again and make sure it is has rebalanced
......
...@@ -26,7 +26,7 @@ bool checkpoint_callback_called; ...@@ -26,7 +26,7 @@ bool checkpoint_callback_called;
toku_pthread_t checkpoint_tid; toku_pthread_t checkpoint_tid;
// callback functions for flush_some_child // callback functions for toku_ft_flush_some_child
static bool static bool
dont_destroy_bn(void* UU(extra)) dont_destroy_bn(void* UU(extra))
{ {
...@@ -173,7 +173,7 @@ doit (bool after_split) { ...@@ -173,7 +173,7 @@ doit (bool after_split) {
assert(node->n_children == 1); assert(node->n_children == 1);
// do the flush // do the flush
flush_some_child(t->ft, node, &fa); toku_ft_flush_some_child(t->ft, node, &fa);
assert(checkpoint_callback_called); assert(checkpoint_callback_called);
// now let's pin the root again and make sure it is has split // now let's pin the root again and make sure it is has split
......
...@@ -396,14 +396,13 @@ test_le_apply(ULE ule_initial, FT_MSG msg, ULE ule_expected) { ...@@ -396,14 +396,13 @@ test_le_apply(ULE ule_initial, FT_MSG msg, ULE ule_expected) {
size_t result_memsize; size_t result_memsize;
int64_t ignoreme; int64_t ignoreme;
r = apply_msg_to_leafentry(msg, toku_le_apply_msg(msg,
le_initial, le_initial,
TXNID_NONE, TXNID_NONE,
&result_memsize, &result_memsize,
&le_result, &le_result,
NULL, NULL,
NULL, NULL, &ignoreme); NULL, NULL, &ignoreme);
CKERR(r);
if (le_result) if (le_result)
le_verify_accessors(le_result, ule_expected, result_memsize); le_verify_accessors(le_result, ule_expected, result_memsize);
...@@ -702,6 +701,111 @@ test_le_apply_messages(void) { ...@@ -702,6 +701,111 @@ test_le_apply_messages(void) {
test_le_committed_apply(); test_le_committed_apply();
} }
static bool ule_worth_running_garbage_collection(ULE ule, TXNID oldest_known_referenced_xid) {
LEAFENTRY le;
size_t initial_memsize;
int r = le_pack(ule, &initial_memsize, &le, nullptr, nullptr, nullptr); CKERR(r);
invariant_notnull(le);
bool worth_running = toku_le_worth_running_garbage_collection(le, oldest_known_referenced_xid);
toku_free(le);
return worth_running;
}
static void test_le_garbage_collection_birdie(void) {
DBT key;
DBT val;
ULE_S ule_initial;
ULE_S ule_expected;
uint8_t keybuf[MAX_SIZE];
uint32_t keysize=8;
uint8_t valbuf[MAX_SIZE];
uint32_t valsize=8;
ule_initial.uxrs = ule_initial.uxrs_static;
ule_expected.uxrs = ule_expected.uxrs_static;
memset(&key, 0, sizeof(key));
memset(&val, 0, sizeof(val));
bool do_garbage_collect;
fillrandom(keybuf, keysize);
fillrandom(valbuf, valsize);
//
// Test garbage collection "worth-doing" heurstic
//
// Garbage collection should not be worth doing on a clean leafentry.
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 0;
ule_initial.uxrs[0].xid = TXNID_NONE;
ule_initial.uxrs[0].type = XR_INSERT;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(!do_garbage_collect);
// It is worth doing when there is more than one committed entry
ule_initial.num_cuxrs = 2;
ule_initial.num_puxrs = 1;
ule_initial.uxrs[1].xid = 500;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(do_garbage_collect);
// It is not worth doing when there is one of each, when the
// provisional entry is newer than the oldest known referenced xid
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 1;
ule_initial.uxrs[1].xid = 1500;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(!do_garbage_collect);
ule_initial.uxrs[1].xid = 200;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(!do_garbage_collect);
// It is not worth doing when there is only one committed entry,
// multiple provisional entries, but the outermost entry is newer.
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 3;
ule_initial.uxrs[1].xid = 201;
ule_initial.uxrs[2].xid = 206;
ule_initial.uxrs[3].xid = 215;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(!do_garbage_collect);
// It is worth doing when the above scenario has an outermost entry
// older than the oldest known, even if its children seem newer.
// this children must have commit because the parent is not live.
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 3;
ule_initial.uxrs[1].xid = 190;
ule_initial.uxrs[2].xid = 206;
ule_initial.uxrs[3].xid = 215;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(do_garbage_collect);
// It is worth doing when there is more than one committed entry,
// even if a provisional entry exists that is newer than the
// oldest known refrenced xid
ule_initial.num_cuxrs = 2;
ule_initial.num_puxrs = 1;
ule_initial.uxrs[1].xid = 499;
ule_initial.uxrs[2].xid = 500;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(do_garbage_collect);
// It is worth doing when there is one of each, and the provisional
// entry is older than the oldest known referenced xid
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 1;
ule_initial.uxrs[1].xid = 199;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(do_garbage_collect);
// It is definately worth doing when the above case is true
// and there is more than one provisional entry.
ule_initial.num_cuxrs = 1;
ule_initial.num_puxrs = 2;
ule_initial.uxrs[1].xid = 150;
ule_initial.uxrs[2].xid = 175;
do_garbage_collect = ule_worth_running_garbage_collection(&ule_initial, 200);
invariant(do_garbage_collect);
}
static void test_le_optimize(void) { static void test_le_optimize(void) {
FT_MSG_S msg; FT_MSG_S msg;
...@@ -900,6 +1004,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute_ ...@@ -900,6 +1004,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute_
test_le_pack(); test_le_pack();
test_le_apply_messages(); test_le_apply_messages();
test_le_optimize(); test_le_optimize();
test_le_garbage_collection_birdie();
destroy_xids(); destroy_xids();
return 0; return 0;
} }
......
...@@ -162,7 +162,7 @@ doit (void) { ...@@ -162,7 +162,7 @@ doit (void) {
assert_zero(r); assert_zero(r);
// now with setup done, start the test // now with setup done, start the test
// test that if flush_some_child properly honors // test that if toku_ft_flush_some_child properly honors
// what we say and flushes the child we pick // what we say and flushes the child we pick
FTNODE node = NULL; FTNODE node = NULL;
toku_pin_node_with_min_bfe(&node, node_internal, t); toku_pin_node_with_min_bfe(&node, node_internal, t);
...@@ -185,7 +185,7 @@ doit (void) { ...@@ -185,7 +185,7 @@ doit (void) {
); );
curr_child_to_flush = 0; curr_child_to_flush = 0;
num_flushes_called = 0; num_flushes_called = 0;
flush_some_child(t->ft, node, &fa); toku_ft_flush_some_child(t->ft, node, &fa);
assert(num_flushes_called == 1); assert(num_flushes_called == 1);
toku_pin_node_with_min_bfe(&node, node_internal, t); toku_pin_node_with_min_bfe(&node, node_internal, t);
...@@ -203,7 +203,7 @@ doit (void) { ...@@ -203,7 +203,7 @@ doit (void) {
assert(!node->dirty); assert(!node->dirty);
curr_child_to_flush = 1; curr_child_to_flush = 1;
num_flushes_called = 0; num_flushes_called = 0;
flush_some_child(t->ft, node, &fa); toku_ft_flush_some_child(t->ft, node, &fa);
assert(num_flushes_called == 1); assert(num_flushes_called == 1);
toku_pin_node_with_min_bfe(&node, node_internal, t); toku_pin_node_with_min_bfe(&node, node_internal, t);
...@@ -221,7 +221,7 @@ doit (void) { ...@@ -221,7 +221,7 @@ doit (void) {
assert(!node->dirty); assert(!node->dirty);
curr_child_to_flush = 0; curr_child_to_flush = 0;
num_flushes_called = 0; num_flushes_called = 0;
flush_some_child(t->ft, node, &fa); toku_ft_flush_some_child(t->ft, node, &fa);
assert(num_flushes_called == 1); assert(num_flushes_called == 1);
toku_pin_node_with_min_bfe(&node, node_internal, t); toku_pin_node_with_min_bfe(&node, node_internal, t);
...@@ -250,7 +250,7 @@ doit (void) { ...@@ -250,7 +250,7 @@ doit (void) {
toku_assert_entire_node_in_memory(node); // entire root is in memory toku_assert_entire_node_in_memory(node); // entire root is in memory
curr_child_to_flush = i; curr_child_to_flush = i;
num_flushes_called = 0; num_flushes_called = 0;
flush_some_child(t->ft, node, &fa); toku_ft_flush_some_child(t->ft, node, &fa);
assert(num_flushes_called == 2); assert(num_flushes_called == 2);
toku_pin_node_with_min_bfe(&node, node_internal, t); toku_pin_node_with_min_bfe(&node, node_internal, t);
...@@ -296,7 +296,7 @@ doit (void) { ...@@ -296,7 +296,7 @@ doit (void) {
toku_assert_entire_node_in_memory(node); // entire root is in memory toku_assert_entire_node_in_memory(node); // entire root is in memory
curr_child_to_flush = 0; curr_child_to_flush = 0;
num_flushes_called = 0; num_flushes_called = 0;
flush_some_child(t->ft, node, &fa); toku_ft_flush_some_child(t->ft, node, &fa);
assert(num_flushes_called == 2); assert(num_flushes_called == 2);
r = toku_close_ft_handle_nolsn(t, 0); assert(r==0); r = toku_close_ft_handle_nolsn(t, 0); assert(r==0);
......
...@@ -88,15 +88,13 @@ void toku_ule_free(ULEHANDLE ule_p) { ...@@ -88,15 +88,13 @@ void toku_ule_free(ULEHANDLE ule_p) {
toku_free(ule_p); toku_free(ule_p);
} }
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
// //
// Question: Can any software outside this file modify or read a leafentry? // Question: Can any software outside this file modify or read a leafentry?
// If so, is it worthwhile to put it all here? // If so, is it worthwhile to put it all here?
// //
// There are two entries, one each for modification and query: // There are two entries, one each for modification and query:
// apply_msg_to_leafentry() performs all inserts/deletes/aborts // toku_le_apply_msg() performs all inserts/deletes/aborts
// //
// //
// //
...@@ -122,6 +120,7 @@ static void msg_init_empty_ule(ULE ule, FT_MSG msg); ...@@ -122,6 +120,7 @@ static void msg_init_empty_ule(ULE ule, FT_MSG msg);
static void msg_modify_ule(ULE ule, FT_MSG msg); static void msg_modify_ule(ULE ule, FT_MSG msg);
static void ule_init_empty_ule(ULE ule, uint32_t keylen, void * keyp); static void ule_init_empty_ule(ULE ule, uint32_t keylen, void * keyp);
static void ule_do_implicit_promotions(ULE ule, XIDS xids); static void ule_do_implicit_promotions(ULE ule, XIDS xids);
static void ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid);
static void ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index); static void ule_promote_provisional_innermost_to_index(ULE ule, uint32_t index);
static void ule_promote_provisional_innermost_to_committed(ULE ule); static void ule_promote_provisional_innermost_to_committed(ULE ule);
static void ule_apply_insert(ULE ule, XIDS xids, uint32_t vallen, void * valp); static void ule_apply_insert(ULE ule, XIDS xids, uint32_t vallen, void * valp);
...@@ -165,8 +164,6 @@ le_malloc(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free) ...@@ -165,8 +164,6 @@ le_malloc(OMT *omtp, struct mempool *mp, size_t size, void **maybe_free)
return rval; return rval;
} }
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
// Garbage collection related functions // Garbage collection related functions
// //
...@@ -215,7 +212,7 @@ xid_reads_committed_xid(TXNID tl1, TXNID xc, const xid_omt_t &snapshot_txnids, c ...@@ -215,7 +212,7 @@ xid_reads_committed_xid(TXNID tl1, TXNID xc, const xid_omt_t &snapshot_txnids, c
// so we get rid of them. // so we get rid of them.
// //
static void static void
simple_garbage_collection(ULE ule, TXNID oldest_referenced_xid) { ule_simple_garbage_collection(ULE ule, TXNID oldest_referenced_xid) {
uint32_t curr_index = 0; uint32_t curr_index = 0;
uint32_t num_entries; uint32_t num_entries;
if (ule->num_cuxrs == 1 || oldest_referenced_xid == TXNID_NONE) { if (ule->num_cuxrs == 1 || oldest_referenced_xid == TXNID_NONE) {
...@@ -244,7 +241,7 @@ done:; ...@@ -244,7 +241,7 @@ done:;
} }
static void static void
garbage_collection(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &referenced_xids, const xid_omt_t &live_root_txns) { ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &referenced_xids, const xid_omt_t &live_root_txns) {
if (ule->num_cuxrs == 1) goto done; if (ule->num_cuxrs == 1) goto done;
// will fail if too many num_cuxrs // will fail if too many num_cuxrs
bool necessary_static[MAX_TRANSACTION_RECORDS]; bool necessary_static[MAX_TRANSACTION_RECORDS];
...@@ -340,7 +337,6 @@ garbage_collection(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &refe ...@@ -340,7 +337,6 @@ garbage_collection(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &refe
done:; done:;
} }
///////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////
// This is the big enchilada. (Bring Tums.) Note that this level of abstraction // This is the big enchilada. (Bring Tums.) Note that this level of abstraction
// has no knowledge of the inner structure of either leafentry or msg. It makes // has no knowledge of the inner structure of either leafentry or msg. It makes
...@@ -353,8 +349,8 @@ done:; ...@@ -353,8 +349,8 @@ done:;
// If the leafentry is destroyed it sets *new_leafentry_p to NULL. // If the leafentry is destroyed it sets *new_leafentry_p to NULL.
// Otehrwise the new_leafentry_p points at the new leaf entry. // Otehrwise the new_leafentry_p points at the new leaf entry.
// As of October 2011, this function always returns 0. // As of October 2011, this function always returns 0.
int void
apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry toku_le_apply_msg(FT_MSG msg, // message to apply to leafentry
LEAFENTRY old_leafentry, // NULL if there was no stored data. LEAFENTRY old_leafentry, // NULL if there was no stored data.
TXNID oldest_referenced_xid, TXNID oldest_referenced_xid,
size_t *new_leafentry_memorysize, size_t *new_leafentry_memorysize,
...@@ -364,31 +360,49 @@ apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry ...@@ -364,31 +360,49 @@ apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry
void **maybe_free, void **maybe_free,
int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead
ULE_S ule; ULE_S ule;
int rval;
int64_t oldnumbytes = 0; int64_t oldnumbytes = 0;
int64_t newnumbytes = 0; int64_t newnumbytes = 0;
if (old_leafentry == NULL) // if leafentry does not exist ... if (old_leafentry == NULL) {
msg_init_empty_ule(&ule, msg); // ... create empty unpacked leaf entry msg_init_empty_ule(&ule, msg);
else { } else {
le_unpack(&ule, old_leafentry); // otherwise unpack leafentry le_unpack(&ule, old_leafentry); // otherwise unpack leafentry
oldnumbytes = ule_get_innermost_numbytes(&ule); oldnumbytes = ule_get_innermost_numbytes(&ule);
} }
msg_modify_ule(&ule, msg); // modify unpacked leafentry msg_modify_ule(&ule, msg); // modify unpacked leafentry
simple_garbage_collection(&ule, oldest_referenced_xid); ule_simple_garbage_collection(&ule, oldest_referenced_xid);
rval = le_pack(&ule, // create packed leafentry int rval = le_pack(&ule, // create packed leafentry
new_leafentry_memorysize, new_leafentry_memorysize,
new_leafentry_p, new_leafentry_p,
omtp, omtp,
mp, mp,
maybe_free); maybe_free);
if (new_leafentry_p) invariant_zero(rval);
if (new_leafentry_p) {
newnumbytes = ule_get_innermost_numbytes(&ule); newnumbytes = ule_get_innermost_numbytes(&ule);
}
*numbytes_delta_p = newnumbytes - oldnumbytes; *numbytes_delta_p = newnumbytes - oldnumbytes;
ule_cleanup(&ule); ule_cleanup(&ule);
return rval;
} }
bool toku_le_worth_running_garbage_collection(LEAFENTRY le, TXNID oldest_known_referenced_xid) {
// Effect: Quickly determines if it's worth trying to run garbage collection on a leafentry
// Return: True if it makes sense to try garbage collection, false otherwise.
// Rationale: Garbage collection is likely to clean up under two circumstances:
// 1.) There are multiple committed entries. Some may never be read by new txns.
// 2.) There is only one committed entry, but the outermost provisional entry
// is older than the oldest known referenced xid, so it must have commited.
// Therefor we can promote it to committed and get rid of the old commited entry.
if (le->type != LE_MVCC) {
return false;
}
if (le->u.mvcc.num_cxrs > 1) {
return true;
} else {
paranoid_invariant(le->u.mvcc.num_cxrs == 1);
}
return le->u.mvcc.num_pxrs > 0 && le_outermost_uncommitted_xid(le) < oldest_known_referenced_xid;
}
// Garbage collect one leaf entry, using the given OMT's. // Garbage collect one leaf entry, using the given OMT's.
// Parameters: // Parameters:
...@@ -408,8 +422,8 @@ apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry ...@@ -408,8 +422,8 @@ apply_msg_to_leafentry(FT_MSG msg, // message to apply to leafentry
// -- referenced_xids : list of in memory active transactions. // -- referenced_xids : list of in memory active transactions.
// NOTE: it is not a good idea to garbage collect a leaf // NOTE: it is not a good idea to garbage collect a leaf
// entry with only one committed value. // entry with only one committed value.
int void
garbage_collect_leafentry(LEAFENTRY old_leaf_entry, toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
LEAFENTRY *new_leaf_entry, LEAFENTRY *new_leaf_entry,
size_t *new_leaf_entry_memory_size, size_t *new_leaf_entry_memory_size,
OMT *omtp, OMT *omtp,
...@@ -417,12 +431,22 @@ garbage_collect_leafentry(LEAFENTRY old_leaf_entry, ...@@ -417,12 +431,22 @@ garbage_collect_leafentry(LEAFENTRY old_leaf_entry,
void **maybe_free, void **maybe_free,
const xid_omt_t &snapshot_xids, const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids, const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns) { const xid_omt_t &live_root_txns,
int r = 0; TXNID oldest_known_referenced_xid) {
ULE_S ule; ULE_S ule;
le_unpack(&ule, old_leaf_entry); le_unpack(&ule, old_leaf_entry);
garbage_collection(&ule, snapshot_xids, referenced_xids, live_root_txns);
r = le_pack(&ule, // Before running garbage collection, try to promote the outermost provisional
// entries to committed if its xid is older than the oldest possible live xid.
//
// The oldest known refeferenced xid is a lower bound on the oldest possible
// live xid, so we use that. It's usually close enough to get rid of most
// garbage in leafentries.
TXNID oldest_possible_live_xid = oldest_known_referenced_xid;
ule_try_promote_provisional_outermost(&ule, oldest_possible_live_xid);
ule_garbage_collect(&ule, snapshot_xids, referenced_xids, live_root_txns);
int r = le_pack(&ule,
new_leaf_entry_memory_size, new_leaf_entry_memory_size,
new_leaf_entry, new_leaf_entry,
omtp, omtp,
...@@ -430,7 +454,6 @@ garbage_collect_leafentry(LEAFENTRY old_leaf_entry, ...@@ -430,7 +454,6 @@ garbage_collect_leafentry(LEAFENTRY old_leaf_entry,
maybe_free); maybe_free);
assert(r == 0); assert(r == 0);
ule_cleanup(&ule); ule_cleanup(&ule);
return r;
} }
///////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////
...@@ -1529,6 +1552,15 @@ ule_promote_provisional_innermost_to_committed(ULE ule) { ...@@ -1529,6 +1552,15 @@ ule_promote_provisional_innermost_to_committed(ULE ule) {
} }
} }
static void
ule_try_promote_provisional_outermost(ULE ule, TXNID oldest_possible_live_xid) {
// Effect: If there is a provisional record whose outermost xid is older than
// the oldest known referenced_xid, promote it to committed.
if (ule->num_puxrs > 0 && ule_get_xid(ule, ule->num_cuxrs) < oldest_possible_live_xid) {
ule_promote_provisional_innermost_to_committed(ule);
}
}
// Purpose is to promote the value (and type) of the innermost transaction // Purpose is to promote the value (and type) of the innermost transaction
// record to the uxr at the specified index (keeping the txnid of the uxr at // record to the uxr at the specified index (keeping the txnid of the uxr at
// specified index.) // specified index.)
......
...@@ -45,31 +45,4 @@ TXNID uxr_get_txnid(UXRHANDLE uxr); ...@@ -45,31 +45,4 @@ TXNID uxr_get_txnid(UXRHANDLE uxr);
//1 does much slower debugging //1 does much slower debugging
#define GARBAGE_COLLECTION_DEBUG 0 #define GARBAGE_COLLECTION_DEBUG 0
void fast_msg_to_leafentry(
FT_MSG msg, // message to apply to leafentry
size_t *new_leafentry_memorysize,
size_t *new_leafentry_disksize,
LEAFENTRY *new_leafentry_p) ;
int apply_msg_to_leafentry(FT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
TXNID oldest_referenced_xid,
size_t *new_leafentry_memorysize,
LEAFENTRY *new_leafentry_p,
OMT *omtp,
struct mempool *mp,
void **maybe_free,
int64_t * numbytes_delta_p);
int garbage_collect_leafentry(LEAFENTRY old_leaf_entry,
LEAFENTRY *new_leaf_entry,
size_t *new_leaf_entry_memory_size,
OMT *omtp,
struct mempool *mp,
void **maybe_free,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns);
#endif // TOKU_ULE_H #endif // TOKU_ULE_H
...@@ -740,6 +740,9 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, vo ...@@ -740,6 +740,9 @@ static int random_put_in_db(DB *db, DB_TXN *txn, ARG arg, bool ignore_errors, vo
} else { } else {
rand_key_i[0] = arg->thread_idx; rand_key_i[0] = arg->thread_idx;
} }
if (arg->cli->num_elements > 0 && arg->bounded_element_range) {
rand_key_key[0] = rand_key_key[0] % arg->cli->num_elements;
}
fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility); fill_zeroed_array(valbuf, arg->cli->val_size, arg->random_data, arg->cli->compressibility);
DBT key, val; DBT key, val;
dbt_init(&key, &rand_key_b, sizeof rand_key_b); dbt_init(&key, &rand_key_b, sizeof rand_key_b);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment