Commit c6a6cd8c authored by Leif Walsh's avatar Leif Walsh

Merge remote-tracking branch 'origin/garbage_collection_extended'

parents 054633c6 dfcffcc7
......@@ -1544,11 +1544,7 @@ ft_merge_child(
}
}
static void ft_flush_some_child(
FT ft,
FTNODE parent,
struct flusher_advice *fa
)
void toku_ft_flush_some_child(FT ft, FTNODE parent, struct flusher_advice *fa)
// Effect: This function does the following:
// - Pick a child of parent (the heaviest child),
// - flush from parent to child,
......@@ -1562,7 +1558,7 @@ static void ft_flush_some_child(
NONLEAF_CHILDINFO bnc = NULL;
paranoid_invariant(parent->height>0);
toku_assert_entire_node_in_memory(parent);
TXNID oldest_referenced_xid = parent->oldest_referenced_xid_known;
TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
// pick the child we want to flush to
int childnum = fa->pick_child(ft, parent, fa->extra);
......@@ -1655,7 +1651,7 @@ static void ft_flush_some_child(
ft,
bnc,
child,
oldest_referenced_xid
parent_oldest_referenced_xid_known
);
destroy_nonleaf_childinfo(bnc);
}
......@@ -1679,10 +1675,10 @@ static void ft_flush_some_child(
parent = NULL;
}
//
// it is the responsibility of ft_flush_some_child to unpin child
// it is the responsibility of toku_ft_flush_some_child to unpin child
//
if (child->height > 0 && fa->should_recursively_flush(child, fa->extra)) {
ft_flush_some_child(ft, child, fa);
toku_ft_flush_some_child(ft, child, fa);
}
else {
toku_unpin_ftnode_off_client_thread(ft, child);
......@@ -1709,13 +1705,6 @@ static void ft_flush_some_child(
}
}
void toku_ft_flush_some_child(FT ft, FTNODE parent, struct flusher_advice *fa) {
// Vanilla flush_some_child flushes from parent to child without
// providing a meaningful oldest_referenced_xid. No simple garbage
// collection is performed.
return ft_flush_some_child(ft, parent, fa);
}
static void
update_cleaner_status(
FTNODE node,
......@@ -1857,7 +1846,7 @@ struct flusher_extra {
FT h;
FTNODE node;
NONLEAF_CHILDINFO bnc;
TXNID oldest_referenced_xid;
TXNID parent_oldest_referenced_xid_known;
};
//
......@@ -1896,16 +1885,16 @@ static void flush_node_fun(void *fe_v)
fe->h,
fe->bnc,
fe->node,
fe->oldest_referenced_xid
fe->parent_oldest_referenced_xid_known
);
destroy_nonleaf_childinfo(fe->bnc);
// after the flush has completed, now check to see if the node needs flushing
// If so, call ft_flush_some_child on the node (because this flush intends to
// If so, call toku_ft_flush_some_child on the node (because this flush intends to
// pass a meaningful oldest referenced xid for simple garbage collection), and it is the
// responsibility of the flush to unlock the node. otherwise, we unlock it here.
if (fe->node->height > 0 && toku_ft_nonleaf_is_gorged(fe->node, fe->h->h->nodesize)) {
ft_flush_some_child(fe->h, fe->node, &fa);
toku_ft_flush_some_child(fe->h, fe->node, &fa);
}
else {
toku_unpin_ftnode_off_client_thread(fe->h,fe->node);
......@@ -1916,7 +1905,7 @@ static void flush_node_fun(void *fe_v)
// bnc, which means we are tasked with flushing some
// buffer in the node.
// It is the responsibility of flush some child to unlock the node
ft_flush_some_child(fe->h, fe->node, &fa);
toku_ft_flush_some_child(fe->h, fe->node, &fa);
}
remove_background_job_from_cf(fe->h->cf);
toku_free(fe);
......@@ -1927,13 +1916,13 @@ place_node_and_bnc_on_background_thread(
FT h,
FTNODE node,
NONLEAF_CHILDINFO bnc,
TXNID oldest_referenced_xid)
TXNID parent_oldest_referenced_xid_known)
{
struct flusher_extra *XMALLOC(fe);
fe->h = h;
fe->node = node;
fe->bnc = bnc;
fe->oldest_referenced_xid = oldest_referenced_xid;
fe->parent_oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
cachefile_kibbutz_enq(h->cf, flush_node_fun, fe);
}
......@@ -1953,7 +1942,7 @@ place_node_and_bnc_on_background_thread(
void toku_ft_flush_node_on_background_thread(FT h, FTNODE parent)
{
toku::context flush_ctx(CTX_FLUSH);
TXNID oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
//
// first let's see if we can detach buffer on client thread
// and pick the child we want to flush to
......@@ -1970,7 +1959,7 @@ void toku_ft_flush_node_on_background_thread(FT h, FTNODE parent)
// In this case, we could not lock the child, so just place the parent on the background thread
// In the callback, we will use toku_ft_flush_some_child, which checks to
// see if we should blow away the old basement nodes.
place_node_and_bnc_on_background_thread(h, parent, NULL, oldest_referenced_xid_known);
place_node_and_bnc_on_background_thread(h, parent, NULL, parent_oldest_referenced_xid_known);
}
else {
//
......@@ -1999,7 +1988,7 @@ void toku_ft_flush_node_on_background_thread(FT h, FTNODE parent)
// so, because we know for sure the child is not
// reactive, we can unpin the parent
//
place_node_and_bnc_on_background_thread(h, child, bnc, oldest_referenced_xid_known);
place_node_and_bnc_on_background_thread(h, child, bnc, parent_oldest_referenced_xid_known);
toku_unpin_ftnode(h, parent);
}
else {
......@@ -2009,7 +1998,7 @@ void toku_ft_flush_node_on_background_thread(FT h, FTNODE parent)
toku_unpin_ftnode(h, child);
// Again, we'll have the parent on the background thread, so
// we don't need to destroy the basement nodes yet.
place_node_and_bnc_on_background_thread(h, parent, NULL, oldest_referenced_xid_known);
place_node_and_bnc_on_background_thread(h, parent, NULL, parent_oldest_referenced_xid_known);
}
}
}
......
......@@ -229,7 +229,7 @@ long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc);
long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc);
void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, DESCRIPTOR desc, ft_compare_func cmp);
void toku_bnc_empty(NONLEAF_CHILDINFO bnc);
void toku_bnc_flush_to_child(FT h, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID oldest_referenced_xid);
void toku_bnc_flush_to_child(FT h, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID parent_oldest_referenced_xid_known);
bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) __attribute__((const, nonnull));
bool toku_ft_nonleaf_is_gorged(FTNODE node, uint32_t nodesize);
......@@ -1027,7 +1027,7 @@ int toku_testsetup_insert_to_nonleaf (FT_HANDLE brt, BLOCKNUM, enum ft_msg_type,
void toku_pin_node_with_min_bfe(FTNODE* node, BLOCKNUM b, FT_HANDLE t);
// toku_ft_root_put_cmd() accepts non-constant cmd because this is where we set the msn
void toku_ft_root_put_cmd(FT h, FT_MSG_S * cmd, TXNID oldest_referenced_xid, GC_INFO gc_info);
void toku_ft_root_put_cmd(FT h, FT_MSG_S * cmd, txn_gc_info *gc_info);
void
toku_get_node_for_verify(
......@@ -1065,6 +1065,10 @@ typedef enum {
LE_MAX_PROVISIONAL_XR,
LE_EXPANDED,
LE_MAX_MEMSIZE,
LE_APPLY_GC_BYTES_IN,
LE_APPLY_GC_BYTES_OUT,
LE_NORMAL_GC_BYTES_IN,
LE_NORMAL_GC_BYTES_OUT,
LE_STATUS_NUM_ROWS
} le_status_entry;
......@@ -1197,8 +1201,7 @@ toku_ft_bn_apply_cmd_once (
const FT_MSG cmd,
uint32_t idx,
LEAFENTRY le,
TXNID oldest_referenced_xid,
GC_INFO gc_info,
txn_gc_info *gc_info,
uint64_t *workdonep,
STAT64INFO stats_to_update
);
......@@ -1210,8 +1213,7 @@ toku_ft_bn_apply_cmd (
DESCRIPTOR desc,
BASEMENTNODE bn,
FT_MSG cmd,
TXNID oldest_referenced_xid,
GC_INFO gc_info,
txn_gc_info *gc_info,
uint64_t *workdone,
STAT64INFO stats_to_update
);
......@@ -1224,7 +1226,7 @@ toku_ft_leaf_apply_cmd (
FTNODE node,
int target_childnum,
FT_MSG cmd,
GC_INFO gc_info,
txn_gc_info *gc_info,
uint64_t *workdone,
STAT64INFO stats_to_update
);
......@@ -1238,7 +1240,7 @@ toku_ft_node_put_cmd (
int target_childnum,
FT_MSG cmd,
bool is_fresh,
GC_INFO gc_info,
txn_gc_info *gc_info,
size_t flow_deltas[],
STAT64INFO stats_to_update
);
......
......@@ -898,7 +898,7 @@ void toku_ftnode_clone_callback(
*cloned_value_data = cloned_node;
}
static void ft_leaf_run_gc(FTNODE node, FT ft);
static void ft_leaf_run_gc(FT ft, FTNODE node);
void toku_ftnode_flush_callback(
CACHEFILE UU(cachefile),
......@@ -923,7 +923,7 @@ void toku_ftnode_flush_callback(
if (write_me) {
toku_assert_entire_node_in_memory(ftnode);
if (height == 0) {
ft_leaf_run_gc(ftnode, h);
ft_leaf_run_gc(h, ftnode);
}
if (height == 0 && !is_clone) {
ftnode_update_disk_stats(ftnode, h, for_checkpoint);
......@@ -1720,8 +1720,7 @@ toku_ft_bn_apply_cmd_once (
const FT_MSG cmd,
uint32_t idx,
LEAFENTRY le,
TXNID oldest_referenced_xid,
GC_INFO gc_info,
txn_gc_info *gc_info,
uint64_t *workdone,
STAT64INFO stats_to_update
)
......@@ -1747,7 +1746,6 @@ toku_ft_bn_apply_cmd_once (
le,
&bn->data_buffer,
idx,
oldest_referenced_xid,
gc_info,
&new_le,
&numbytes_delta
......@@ -1796,8 +1794,7 @@ struct setval_extra_s {
const DBT *key;
uint32_t idx;
LEAFENTRY le;
TXNID oldest_referenced_xid;
GC_INFO gc_info;
txn_gc_info *gc_info;
uint64_t * workdone; // set by toku_ft_bn_apply_cmd_once()
STAT64INFO stats_to_update;
};
......@@ -1830,7 +1827,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
}
toku_ft_bn_apply_cmd_once(svextra->bn, &msg,
svextra->idx, svextra->le,
svextra->oldest_referenced_xid, svextra->gc_info,
svextra->gc_info,
svextra->workdone, svextra->stats_to_update);
svextra->setval_r = 0;
}
......@@ -1844,8 +1841,7 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn
LEAFENTRY le,
void* keydata,
uint32_t keylen,
TXNID oldest_referenced_xid,
GC_INFO gc_info,
txn_gc_info *gc_info,
uint64_t * workdone,
STAT64INFO stats_to_update) {
LEAFENTRY le_for_update;
......@@ -1890,7 +1886,8 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn
le_for_update = le;
struct setval_extra_s setval_extra = {setval_tag, false, 0, bn, cmd->msn, cmd->xids,
keyp, idx, le_for_update, oldest_referenced_xid, gc_info, workdone, stats_to_update};
keyp, idx, le_for_update, gc_info,
workdone, stats_to_update};
// call handlerton's brt->update_fun(), which passes setval_extra to setval_fun()
FAKE_DB(db, desc);
int r = update_fun(
......@@ -1913,8 +1910,7 @@ toku_ft_bn_apply_cmd (
DESCRIPTOR desc,
BASEMENTNODE bn,
FT_MSG cmd,
TXNID oldest_referenced_xid_known,
GC_INFO gc_info,
txn_gc_info *gc_info,
uint64_t *workdone,
STAT64INFO stats_to_update
)
......@@ -1961,7 +1957,7 @@ toku_ft_bn_apply_cmd (
} else {
assert_zero(r);
}
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, workdone, stats_to_update);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, gc_info, workdone, stats_to_update);
// if the insertion point is within a window of the right edge of
// the leaf then it is sequential
......@@ -1993,7 +1989,7 @@ toku_ft_bn_apply_cmd (
);
if (r == DB_NOTFOUND) break;
assert_zero(r);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, workdone, stats_to_update);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, gc_info, workdone, stats_to_update);
break;
}
......@@ -2015,7 +2011,7 @@ toku_ft_bn_apply_cmd (
cmd->u.id.key = &curr_keydbt;
int deleted = 0;
if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, workdone, stats_to_update);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, gc_info, workdone, stats_to_update);
uint32_t new_omt_size = bn->data_buffer.omt_size();
if (new_omt_size != omt_size) {
paranoid_invariant(new_omt_size+1 == omt_size);
......@@ -2047,7 +2043,7 @@ toku_ft_bn_apply_cmd (
cmd->u.id.key = &curr_keydbt;
int deleted = 0;
if (le_has_xids(storeddata, cmd->xids)) {
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, oldest_referenced_xid_known, gc_info, workdone, stats_to_update);
toku_ft_bn_apply_cmd_once(bn, cmd, idx, storeddata, gc_info, workdone, stats_to_update);
uint32_t new_omt_size = bn->data_buffer.omt_size();
if (new_omt_size != omt_size) {
paranoid_invariant(new_omt_size+1 == omt_size);
......@@ -2079,9 +2075,9 @@ toku_ft_bn_apply_cmd (
key = cmd->u.id.key->data;
keylen = cmd->u.id.key->size;
}
r = do_update(update_fun, desc, bn, cmd, idx, NULL, NULL, 0, oldest_referenced_xid_known, gc_info, workdone, stats_to_update);
r = do_update(update_fun, desc, bn, cmd, idx, NULL, NULL, 0, gc_info, workdone, stats_to_update);
} else if (r==0) {
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, key, keylen, oldest_referenced_xid_known, gc_info, workdone, stats_to_update);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, key, keylen, gc_info, workdone, stats_to_update);
} // otherwise, a worse error, just return it
break;
}
......@@ -2104,7 +2100,7 @@ toku_ft_bn_apply_cmd (
// This is broken below. Have a compilation error checked
// in as a reminder
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, curr_key, curr_keylen, oldest_referenced_xid_known, gc_info, workdone, stats_to_update);
r = do_update(update_fun, desc, bn, cmd, idx, storeddata, curr_key, curr_keylen, gc_info, workdone, stats_to_update);
assert_zero(r);
if (num_leafentries_before == bn->data_buffer.omt_size()) {
......@@ -2342,10 +2338,7 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
void* keyp,
uint32_t keylen,
LEAFENTRY leaf_entry,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns,
TXNID oldest_referenced_xid_known,
txn_gc_info *gc_info,
STAT64INFO_S * delta)
{
paranoid_invariant(leaf_entry);
......@@ -2356,7 +2349,7 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
}
// Don't run garbage collection if this leafentry decides it's not worth it.
if (!toku_le_worth_running_garbage_collection(leaf_entry, oldest_referenced_xid_known)) {
if (!toku_le_worth_running_garbage_collection(leaf_entry, gc_info)) {
goto exit;
}
......@@ -2378,11 +2371,8 @@ ft_basement_node_gc_once(BASEMENTNODE bn,
index,
keyp,
keylen,
gc_info,
&new_leaf_entry,
snapshot_xids,
referenced_xids,
live_root_txns,
oldest_referenced_xid_known,
&numbytes_delta);
numrows_delta = 0;
......@@ -2411,10 +2401,7 @@ exit:
// Garbage collect all leaf entries for a given basement node.
static void
basement_node_gc_all_les(BASEMENTNODE bn,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns,
TXNID oldest_referenced_xid_known,
txn_gc_info *gc_info,
STAT64INFO_S * delta)
{
int r = 0;
......@@ -2432,10 +2419,7 @@ basement_node_gc_all_les(BASEMENTNODE bn,
keyp,
keylen,
leaf_entry,
snapshot_xids,
referenced_xids,
live_root_txns,
oldest_referenced_xid_known,
gc_info,
delta
);
// Check if the leaf entry was deleted or not.
......@@ -2447,12 +2431,7 @@ basement_node_gc_all_les(BASEMENTNODE bn,
// Garbage collect all leaf entires in all basement nodes.
static void
ft_leaf_gc_all_les(FTNODE node,
FT ft,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns,
TXNID oldest_referenced_xid_known)
ft_leaf_gc_all_les(FT ft, FTNODE node, txn_gc_info *gc_info)
{
toku_assert_entire_node_in_memory(node);
paranoid_invariant_zero(node->height);
......@@ -2463,38 +2442,40 @@ ft_leaf_gc_all_les(FTNODE node,
STAT64INFO_S delta;
delta.numrows = 0;
delta.numbytes = 0;
basement_node_gc_all_les(bn, snapshot_xids, referenced_xids, live_root_txns, oldest_referenced_xid_known, &delta);
basement_node_gc_all_les(bn, gc_info, &delta);
toku_ft_update_stats(&ft->in_memory_stats, delta);
}
}
static void
ft_leaf_run_gc(FTNODE node, FT ft) {
ft_leaf_run_gc(FT ft, FTNODE node) {
TOKULOGGER logger = toku_cachefile_logger(ft->cf);
if (logger) {
xid_omt_t snapshot_txnids;
rx_omt_t referenced_xids;
xid_omt_t live_root_txns;
toku_txn_manager_clone_state_for_gc(
logger->txn_manager,
&snapshot_txnids,
&referenced_xids,
&live_root_txns
);
TXN_MANAGER txn_manager = toku_logger_get_txn_manager(logger);
txn_manager_state txn_state_for_gc(txn_manager);
txn_state_for_gc.init();
TXNID oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
// Perform garbage collection. Provide a full snapshot of the transaction
// system plus the oldest known referenced xid that could have had messages
// applied to this leaf.
// Perform full garbage collection.
//
// Using the oldest xid in either the referenced_xids or live_root_txns
// snapshots is not sufficient, because there could be something older that is neither
// live nor referenced, but instead aborted somewhere above us as a message in the tree.
ft_leaf_gc_all_les(node, ft, snapshot_txnids, referenced_xids, live_root_txns, node->oldest_referenced_xid_known);
// Free the OMT's we used for garbage collecting.
snapshot_txnids.destroy();
referenced_xids.destroy();
live_root_txns.destroy();
// - txn_state_for_gc
// a fresh snapshot of the transaction system.
// - oldest_referenced_xid_for_simple_gc
// the oldest xid in any live list as of right now - suitible for simple gc
// - node->oldest_referenced_xid_known
// the last known oldest referenced xid for this node and any unapplied messages.
// it is a lower bound on the actual oldest referenced xid - but becasue there
// may be abort messages above us, we need to be careful to only use this value
// for implicit promotion (as opposed to the oldest referenced xid for simple gc)
//
// The node has its own oldest referenced xid because it must be careful not to implicitly promote
// provisional entries for transactions that are no longer live, but may have abort messages
// somewhere above us in the tree.
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_for_simple_gc,
node->oldest_referenced_xid_known,
true);
ft_leaf_gc_all_les(ft, node, &gc_info);
}
}
......@@ -2502,12 +2483,27 @@ void toku_bnc_flush_to_child(
FT ft,
NONLEAF_CHILDINFO bnc,
FTNODE child,
TXNID oldest_referenced_xid_known
TXNID parent_oldest_referenced_xid_known
)
{
paranoid_invariant(bnc);
STAT64INFO_S stats_delta = {0,0};
size_t remaining_memsize = toku_fifo_buffer_size_in_use(bnc->buffer);
TOKULOGGER logger = toku_cachefile_logger(ft->cf);
TXN_MANAGER txn_manager = logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr;
TXNID oldest_referenced_xid_for_simple_gc = TXNID_NONE;
txn_manager_state txn_state_for_gc(txn_manager);
bool do_garbage_collection = child->height == 0 && txn_manager != nullptr;
if (do_garbage_collection) {
txn_state_for_gc.init();
oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
}
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_for_simple_gc,
child->oldest_referenced_xid_known,
true);
FIFO_ITERATE(
bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
({
......@@ -2532,20 +2528,19 @@ void toku_bnc_flush_to_child(
-1,
&ftcmd,
is_fresh,
make_gc_info(true), // mvcc_needed
&gc_info,
flow_deltas,
&stats_delta
);
remaining_memsize -= FIFO_CURRENT_ENTRY_MEMSIZE;
}));
child->oldest_referenced_xid_known = oldest_referenced_xid_known;
child->oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
invariant(remaining_memsize == 0);
if (stats_delta.numbytes || stats_delta.numrows) {
toku_ft_update_stats(&ft->in_memory_stats, stats_delta);
}
if (child->height == 0) {
ft_leaf_run_gc(child, ft);
if (do_garbage_collection) {
size_t buffsize = toku_fifo_buffer_size_in_use(bnc->buffer);
STATUS_INC(FT_MSG_BYTES_OUT, buffsize);
// may be misleading if there's a broadcast message in there
......@@ -2568,7 +2563,7 @@ toku_ft_node_put_cmd (
int target_childnum,
FT_MSG cmd,
bool is_fresh,
GC_INFO gc_info,
txn_gc_info *gc_info,
size_t flow_deltas[],
STAT64INFO stats_to_update
)
......@@ -2606,7 +2601,7 @@ void toku_ft_leaf_apply_cmd(
FTNODE node,
int target_childnum, // which child to inject to, or -1 if unknown
FT_MSG cmd,
GC_INFO gc_info,
txn_gc_info *gc_info,
uint64_t *workdone,
STAT64INFO stats_to_update
)
......@@ -2639,10 +2634,6 @@ void toku_ft_leaf_apply_cmd(
node->max_msn_applied_to_node_on_disk = cmd_msn;
}
// Pass the oldest possible live xid value to each basementnode
// when we apply messages to them.
TXNID oldest_referenced_xid_known = node->oldest_referenced_xid_known;
if (ft_msg_applies_once(cmd)) {
unsigned int childnum = (target_childnum >= 0
? target_childnum
......@@ -2655,7 +2646,6 @@ void toku_ft_leaf_apply_cmd(
desc,
bn,
cmd,
oldest_referenced_xid_known,
gc_info,
workdone,
stats_to_update);
......@@ -2672,7 +2662,6 @@ void toku_ft_leaf_apply_cmd(
desc,
BLB(node, childnum),
cmd,
oldest_referenced_xid_known,
gc_info,
workdone,
stats_to_update);
......@@ -2693,8 +2682,7 @@ static void inject_message_in_locked_node(
int childnum,
FT_MSG_S *cmd,
size_t flow_deltas[],
TXNID oldest_referenced_xid,
GC_INFO gc_info
txn_gc_info *gc_info
)
{
// No guarantee that we're the writer, but oh well.
......@@ -2704,11 +2692,14 @@ static void inject_message_in_locked_node(
invariant(toku_ctpair_is_write_locked(node->ct_pair));
toku_assert_entire_node_in_memory(node);
// Update the oldest known referenced xid for this node if it is younger
// than the one currently known. Otherwise, it's better to keep the heurstic
// we have and ignore this one.
if (oldest_referenced_xid >= node->oldest_referenced_xid_known) {
node->oldest_referenced_xid_known = oldest_referenced_xid;
// Take the newer of the two oldest referenced xid values from the node and gc_info.
// The gc_info usually has a newer value, because we got it at the top of this call
// stack from the txn manager. But sometimes the node has a newer value, if some
// other thread sees a newer value and writes to this node before we got the lock.
if (gc_info->oldest_referenced_xid_for_implicit_promotion > node->oldest_referenced_xid_known) {
node->oldest_referenced_xid_known = gc_info->oldest_referenced_xid_for_implicit_promotion;
} else if (gc_info->oldest_referenced_xid_for_implicit_promotion < node->oldest_referenced_xid_known) {
gc_info->oldest_referenced_xid_for_implicit_promotion = node->oldest_referenced_xid_known;
}
// Get the MSN from the header. Now that we have a write lock on the
......@@ -2760,13 +2751,6 @@ static void inject_message_in_locked_node(
toku_ft_flush_node_on_background_thread(ft, node);
}
else {
// Garbage collect in-memory leaf nodes that appear to be very overfull.
//
// This mechanism prevents direct leaf injections from producing an arbitrary amount
// of MVCC garbage if they never get evicted.
if (node->height == 0 && toku_serialize_ftnode_size(node) > (ft->h->nodesize * 8)) {
ft_leaf_run_gc(node, ft);
}
toku_unpin_ftnode(ft, node);
}
}
......@@ -2892,7 +2876,7 @@ static bool process_maybe_reactive_child(FT ft, FTNODE parent, FTNODE child, int
abort();
}
static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t fullhash, FT_MSG_S *cmd, size_t flow_deltas[], TXNID oldest_referenced_xid, GC_INFO gc_info)
static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t fullhash, FT_MSG_S *cmd, size_t flow_deltas[], txn_gc_info *gc_info)
// Effect:
// Inject cmd into the node at this blocknum (cachekey).
// Gets a write lock on the node for you.
......@@ -2905,7 +2889,7 @@ static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t f
toku_assert_entire_node_in_memory(node);
paranoid_invariant(node->fullhash==fullhash);
ft_verify_flags(ft, node);
inject_message_in_locked_node(ft, node, -1, cmd, flow_deltas, oldest_referenced_xid, gc_info);
inject_message_in_locked_node(ft, node, -1, cmd, flow_deltas, gc_info);
}
__attribute__((const))
......@@ -2924,8 +2908,7 @@ static void push_something_in_subtree(
int target_childnum,
FT_MSG_S *cmd,
size_t flow_deltas[],
TXNID oldest_referenced_xid,
GC_INFO gc_info,
txn_gc_info *gc_info,
int depth,
seqinsert_loc loc,
bool just_did_split_or_merge
......@@ -2966,7 +2949,7 @@ static void push_something_in_subtree(
default:
STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break;
}
inject_message_in_locked_node(ft, subtree_root, target_childnum, cmd, flow_deltas, oldest_referenced_xid, gc_info);
inject_message_in_locked_node(ft, subtree_root, target_childnum, cmd, flow_deltas, gc_info);
} else {
int r;
int childnum;
......@@ -3063,13 +3046,13 @@ static void push_something_in_subtree(
struct ftnode_fetch_extra bfe;
fill_bfe_for_full_read(&bfe, ft); // should be fully in memory, we just split it
toku_pin_ftnode_off_client_thread_batched(ft, subtree_root_blocknum, subtree_root_fullhash, &bfe, PL_READ, 0, nullptr, &newparent);
push_something_in_subtree(ft, newparent, -1, cmd, flow_deltas, oldest_referenced_xid, gc_info, depth, loc, true);
push_something_in_subtree(ft, newparent, -1, cmd, flow_deltas, gc_info, depth, loc, true);
return;
}
}
if (next_loc != NEITHER_EXTREME || child->dirty || toku_bnc_should_promote(ft, bnc)) {
push_something_in_subtree(ft, child, -1, cmd, flow_deltas, oldest_referenced_xid, gc_info, depth + 1, next_loc, false);
push_something_in_subtree(ft, child, -1, cmd, flow_deltas, gc_info, depth + 1, next_loc, false);
toku_sync_fetch_and_add(&bnc->flow[0], flow_deltas[0]);
// The recursive call unpinned the child, but
// we're responsible for unpinning subtree_root.
......@@ -3105,7 +3088,7 @@ static void push_something_in_subtree(
default:
STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break;
}
inject_message_at_this_blocknum(ft, subtree_root_blocknum, subtree_root_fullhash, cmd, flow_deltas, oldest_referenced_xid, gc_info);
inject_message_at_this_blocknum(ft, subtree_root_blocknum, subtree_root_fullhash, cmd, flow_deltas, gc_info);
}
}
}
......@@ -3113,8 +3096,7 @@ static void push_something_in_subtree(
void toku_ft_root_put_cmd(
FT ft,
FT_MSG_S *cmd,
TXNID oldest_referenced_xid,
GC_INFO gc_info
txn_gc_info *gc_info
)
// Effect:
// - assign msn to cmd and update msn in the header
......@@ -3217,22 +3199,22 @@ void toku_ft_root_put_cmd(
// If the root's a leaf or we're injecting a broadcast, drop the read lock and inject here.
toku_unpin_ftnode_read_only(ft, node);
STATUS_INC(FT_PRO_NUM_ROOT_H0_INJECT, 1);
inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas, oldest_referenced_xid, gc_info);
inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas, gc_info);
} else if (node->height > 1) {
// If the root's above height 1, we are definitely eligible for promotion.
push_something_in_subtree(ft, node, -1, cmd, flow_deltas, oldest_referenced_xid, gc_info, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
push_something_in_subtree(ft, node, -1, cmd, flow_deltas, gc_info, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
} else {
// The root's height 1. We may be eligible for promotion here.
// On the extremes, we want to promote, in the middle, we don't.
int childnum = toku_ftnode_which_child(node, cmd->u.id.key, &ft->cmp_descriptor, ft->compare_fun);
if (childnum == 0 || childnum == node->n_children - 1) {
// On the extremes, promote. We know which childnum we're going to, so pass that down too.
push_something_in_subtree(ft, node, childnum, cmd, flow_deltas, oldest_referenced_xid, gc_info, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
push_something_in_subtree(ft, node, childnum, cmd, flow_deltas, gc_info, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
} else {
// At height 1 in the middle, don't promote, drop the read lock and inject here.
toku_unpin_ftnode_read_only(ft, node);
STATUS_INC(FT_PRO_NUM_ROOT_H1_INJECT, 1);
inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas, oldest_referenced_xid, gc_info);
inject_message_at_this_blocknum(ft, root_key, fullhash, cmd, flow_deltas, gc_info);
}
}
}
......@@ -3275,8 +3257,8 @@ void toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, in
}
// Effect: Optimize the ft.
void toku_ft_optimize (FT_HANDLE brt) {
TOKULOGGER logger = toku_cachefile_logger(brt->ft->cf);
void toku_ft_optimize (FT_HANDLE ft_h) {
TOKULOGGER logger = toku_cachefile_logger(ft_h->ft->cf);
if (logger) {
TXNID oldest = toku_txn_manager_get_oldest_living_xid(logger->txn_manager);
......@@ -3295,7 +3277,17 @@ void toku_ft_optimize (FT_HANDLE brt) {
toku_init_dbt(&key);
toku_init_dbt(&val);
FT_MSG_S ftcmd = { FT_OPTIMIZE, ZERO_MSN, message_xids, .u = { .id = {&key,&val} } };
toku_ft_root_put_cmd(brt->ft, &ftcmd, TXNID_NONE, make_gc_info(true));
TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
txn_manager_state txn_state_for_gc(txn_manager);
TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_estimate,
// no messages above us, we can implicitly promote uxrs based on this xid
oldest_referenced_xid_estimate,
true);
toku_ft_root_put_cmd(ft_h->ft, &ftcmd, &gc_info);
xids_destroy(&message_xids);
}
}
......@@ -3343,6 +3335,16 @@ toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, uint32
}
}
TXN_MANAGER toku_ft_get_txn_manager(FT_HANDLE ft_h) {
TOKULOGGER logger = toku_cachefile_logger(ft_h->ft->cf);
return logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr;
}
TXNID toku_ft_get_oldest_referenced_xid_estimate(FT_HANDLE ft_h) {
TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
return txn_manager != nullptr ? toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager) : TXNID_NONE;
}
void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging, enum ft_msg_type type) {
paranoid_invariant(type==FT_INSERT || type==FT_INSERT_NO_OVERWRITE);
XIDS message_xids = xids_get_root_xids(); //By default use committed messages
......@@ -3369,19 +3371,35 @@ void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
// do nothing
} else {
TXNID oldest_referenced_xid = (txn) ? txn->oldest_referenced_xid : TXNID_NONE;
toku_ft_send_insert(ft_h, key, val, message_xids, type, oldest_referenced_xid, make_gc_info(txn ? !txn->for_recovery : false));
TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
txn_manager_state txn_state_for_gc(txn_manager);
TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_estimate,
// no messages above us, we can implicitly promote uxrs based on this xid
oldest_referenced_xid_estimate,
txn != nullptr ? !txn->for_recovery : false);
toku_ft_send_insert(ft_h, key, val, message_xids, type, &gc_info);
}
}
static void
ft_send_update_msg(FT_HANDLE brt, FT_MSG_S *msg, TOKUTXN txn) {
ft_send_update_msg(FT_HANDLE ft_h, FT_MSG_S *msg, TOKUTXN txn) {
msg->xids = (txn
? toku_txn_get_xids(txn)
: xids_get_root_xids());
TXNID oldest_referenced_xid = (txn) ? txn->oldest_referenced_xid : TXNID_NONE;
toku_ft_root_put_cmd(brt->ft, msg, oldest_referenced_xid, make_gc_info(txn ? !txn->for_recovery : false));
TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
txn_manager_state txn_state_for_gc(txn_manager);
TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_estimate,
// no messages above us, we can implicitly promote uxrs based on this xid
oldest_referenced_xid_estimate,
txn != nullptr ? !txn->for_recovery : false);
toku_ft_root_put_cmd(ft_h->ft, msg, &gc_info);
}
void toku_ft_maybe_update(FT_HANDLE ft_h, const DBT *key, const DBT *update_function_extra,
......@@ -3450,15 +3468,15 @@ void toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *update_function_e
}
}
void toku_ft_send_insert(FT_HANDLE brt, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type, TXNID oldest_referenced_xid, GC_INFO gc_info) {
void toku_ft_send_insert(FT_HANDLE brt, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type, txn_gc_info *gc_info) {
FT_MSG_S ftcmd = { type, ZERO_MSN, xids, .u = { .id = { key, val } } };
toku_ft_root_put_cmd(brt->ft, &ftcmd, oldest_referenced_xid, gc_info);
toku_ft_root_put_cmd(brt->ft, &ftcmd, gc_info);
}
void toku_ft_send_commit_any(FT_HANDLE brt, DBT *key, XIDS xids, TXNID oldest_referenced_xid, GC_INFO gc_info) {
void toku_ft_send_commit_any(FT_HANDLE brt, DBT *key, XIDS xids, txn_gc_info *gc_info) {
DBT val;
FT_MSG_S ftcmd = { FT_COMMIT_ANY, ZERO_MSN, xids, .u = { .id = { key, toku_init_dbt(&val) } } };
toku_ft_root_put_cmd(brt->ft, &ftcmd, oldest_referenced_xid, gc_info);
toku_ft_root_put_cmd(brt->ft, &ftcmd, gc_info);
}
void toku_ft_delete(FT_HANDLE brt, DBT *key, TOKUTXN txn) {
......@@ -3514,15 +3532,23 @@ void toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_vali
if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
// do nothing
} else {
TXNID oldest_referenced_xid = (txn) ? txn->oldest_referenced_xid : TXNID_NONE;
toku_ft_send_delete(ft_h, key, message_xids, oldest_referenced_xid, make_gc_info(txn ? !txn->for_recovery : false));
TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
txn_manager_state txn_state_for_gc(txn_manager);
TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_estimate,
// no messages above us, we can implicitly promote uxrs based on this xid
oldest_referenced_xid_estimate,
txn != nullptr ? !txn->for_recovery : false);
toku_ft_send_delete(ft_h, key, message_xids, &gc_info);
}
}
void toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids, TXNID oldest_referenced_xid, GC_INFO gc_info) {
void toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids, txn_gc_info *gc_info) {
DBT val; toku_init_dbt(&val);
FT_MSG_S ftcmd = { FT_DELETE_ANY, ZERO_MSN, xids, .u = { .id = { key, &val } } };
toku_ft_root_put_cmd(brt->ft, &ftcmd, oldest_referenced_xid, gc_info);
toku_ft_root_put_cmd(brt->ft, &ftcmd, gc_info);
}
/* ******************** open,close and create ********************** */
......@@ -4334,7 +4360,7 @@ int fifo_offset_msn_cmp(FIFO &fifo, const int32_t &ao, const int32_t &bo)
* basement node.
*/
static void
do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, TXNID oldest_referenced_xid, uint64_t *workdone, STAT64INFO stats_to_update)
do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, txn_gc_info *gc_info, uint64_t *workdone, STAT64INFO stats_to_update)
{
// The messages are being iterated over in (key,msn) order or just in
// msn order, so all the messages for one key, from one buffer, are in
......@@ -4359,8 +4385,7 @@ do_bn_apply_cmd(FT_HANDLE t, BASEMENTNODE bn, struct fifo_entry *entry, TXNID ol
&t->ft->cmp_descriptor,
bn,
&ftcmd,
oldest_referenced_xid,
make_gc_info(true), //mvcc is needed
gc_info,
workdone,
stats_to_update
);
......@@ -4378,7 +4403,7 @@ struct iterate_do_bn_apply_cmd_extra {
FT_HANDLE t;
BASEMENTNODE bn;
NONLEAF_CHILDINFO bnc;
TXNID oldest_referenced_xid;
txn_gc_info *gc_info;
uint64_t *workdone;
STAT64INFO stats_to_update;
};
......@@ -4387,7 +4412,7 @@ int iterate_do_bn_apply_cmd(const int32_t &offset, const uint32_t UU(idx), struc
int iterate_do_bn_apply_cmd(const int32_t &offset, const uint32_t UU(idx), struct iterate_do_bn_apply_cmd_extra *const e)
{
struct fifo_entry *entry = toku_fifo_get_entry(e->bnc->buffer, offset);
do_bn_apply_cmd(e->t, e->bn, entry, e->oldest_referenced_xid, e->workdone, e->stats_to_update);
do_bn_apply_cmd(e->t, e->bn, entry, e->gc_info, e->workdone, e->stats_to_update);
return 0;
}
......@@ -4509,7 +4534,7 @@ bnc_apply_messages_to_basement_node(
FTNODE ancestor, // the ancestor node where we can find messages to apply
int childnum, // which child buffer of ancestor contains messages we want
struct pivot_bounds const * const bounds, // contains pivot key bounds of this basement node
TXNID oldest_referenced_xid, // may be younger than what's in ancestor, we should grab the value from the highest node we have
txn_gc_info *gc_info,
bool* msgs_applied
)
{
......@@ -4569,11 +4594,11 @@ bnc_apply_messages_to_basement_node(
for (int i = 0; i < buffer_size; ++i) {
*msgs_applied = true;
struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]);
do_bn_apply_cmd(t, bn, entry, oldest_referenced_xid, &workdone_this_ancestor, &stats_delta);
do_bn_apply_cmd(t, bn, entry, gc_info, &workdone_this_ancestor, &stats_delta);
}
} else if (stale_lbi == stale_ube) {
// No stale messages to apply, we just apply fresh messages, and mark them to be moved to stale later.
struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .oldest_referenced_xid = oldest_referenced_xid, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta };
struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .gc_info = gc_info, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta };
if (fresh_ube - fresh_lbi > 0) *msgs_applied = true;
r = bnc->fresh_message_tree.iterate_and_mark_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(fresh_lbi, fresh_ube, &iter_extra);
assert_zero(r);
......@@ -4582,7 +4607,7 @@ bnc_apply_messages_to_basement_node(
// No fresh messages to apply, we just apply stale messages.
if (stale_ube - stale_lbi > 0) *msgs_applied = true;
struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .oldest_referenced_xid = oldest_referenced_xid, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta };
struct iterate_do_bn_apply_cmd_extra iter_extra = { .t = t, .bn = bn, .bnc = bnc, .gc_info = gc_info, .workdone = &workdone_this_ancestor, .stats_to_update = &stats_delta };
r = bnc->stale_message_tree.iterate_on_range<struct iterate_do_bn_apply_cmd_extra, iterate_do_bn_apply_cmd>(stale_lbi, stale_ube, &iter_extra);
assert_zero(r);
......@@ -4605,7 +4630,7 @@ apply_ancestors_messages_to_bn(
int childnum,
ANCESTORS ancestors,
struct pivot_bounds const * const bounds,
TXNID oldest_referenced_xid,
txn_gc_info *gc_info,
bool* msgs_applied
)
{
......@@ -4620,7 +4645,7 @@ apply_ancestors_messages_to_bn(
curr_ancestors->node,
curr_ancestors->childnum,
&curr_bounds,
oldest_referenced_xid,
gc_info,
msgs_applied
);
// We don't want to check this ancestor node again if the
......@@ -4657,13 +4682,14 @@ toku_apply_ancestors_messages_to_node (
VERIFY_NODE(t, node);
paranoid_invariant(node->height == 0);
TXNID oldest_referenced_xid = ancestors->node->oldest_referenced_xid_known;
for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
if (curr_ancestors->node->oldest_referenced_xid_known > oldest_referenced_xid) {
oldest_referenced_xid = curr_ancestors->node->oldest_referenced_xid_known;
}
}
TXN_MANAGER txn_manager = toku_ft_get_txn_manager(t);
txn_manager_state txn_state_for_gc(txn_manager);
TXNID oldest_referenced_xid_for_simple_gc = toku_ft_get_oldest_referenced_xid_estimate(t);
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_for_simple_gc,
node->oldest_referenced_xid_known,
true);
if (!node->dirty && child_to_read >= 0) {
paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
apply_ancestors_messages_to_bn(
......@@ -4672,7 +4698,7 @@ toku_apply_ancestors_messages_to_node (
child_to_read,
ancestors,
bounds,
oldest_referenced_xid,
&gc_info,
msgs_applied
);
}
......@@ -4691,7 +4717,7 @@ toku_apply_ancestors_messages_to_node (
i,
ancestors,
bounds,
oldest_referenced_xid,
&gc_info,
msgs_applied
);
}
......
......@@ -243,9 +243,12 @@ void toku_ft_delete (FT_HANDLE brt, DBT *k, TOKUTXN txn);
// Effect: Delete a key from a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
void toku_ft_maybe_delete (FT_HANDLE brt, DBT *k, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging);
void toku_ft_send_insert(FT_HANDLE brt, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type, TXNID oldest_referenced_xid, GC_INFO gc_info);
void toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids, TXNID oldest_referenced_xid, GC_INFO gc_info);
void toku_ft_send_commit_any(FT_HANDLE brt, DBT *key, XIDS xids, TXNID oldest_referenced_xids, GC_INFO gc_info);
TXNID toku_ft_get_oldest_referenced_xid_estimate(FT_HANDLE ft_h);
TXN_MANAGER toku_ft_get_txn_manager(FT_HANDLE ft_h);
void toku_ft_send_insert(FT_HANDLE brt, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type, txn_gc_info *gc_info);
void toku_ft_send_delete(FT_HANDLE brt, DBT *key, XIDS xids, txn_gc_info *gc_info);
void toku_ft_send_commit_any(FT_HANDLE brt, DBT *key, XIDS xids, txn_gc_info *gc_info);
int toku_close_ft_handle_nolsn (FT_HANDLE, char **error_string) __attribute__ ((warn_unused_result));
......
......@@ -221,6 +221,7 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char
toku_fill_dbt(&valdbt, val, vallen) } } };
static size_t zero_flow_deltas[] = { 0, 0 };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
toku_ft_node_put_cmd (
brt->ft->compare_fun,
brt->ft->update_fun,
......@@ -229,7 +230,7 @@ int toku_testsetup_insert_to_leaf (FT_HANDLE brt, BLOCKNUM blocknum, const char
-1,
&cmd,
true,
make_gc_info(true),
&gc_info,
zero_flow_deltas,
NULL
);
......
......@@ -2925,7 +2925,9 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
.xids = lbuf->xids,
.u = { .id = { &thekey, &theval } } };
uint64_t workdone=0;
toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(true), &workdone, stats_to_update);
// there's no mvcc garbage in a bulk-loaded FT, so there's no need to pass useful gc info
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, &gc_info, &workdone, stats_to_update);
}
static int write_literal(struct dbout *out, void*data, size_t len) {
......
......@@ -143,10 +143,6 @@ typedef TOKU_XA_XID *XIDP; // this is the type that's passed to the logger code
static inline BLOCKNUM make_blocknum(int64_t b) { BLOCKNUM result={b}; return result; }
typedef struct gc_info_s { bool mvcc_needed; } GC_INFO;
static inline GC_INFO make_gc_info(bool mvcc_needed) { GC_INFO result = {mvcc_needed}; return result; }
// This struct hold information about values stored in the cachetable.
// As one can tell from the names, we are probably violating an
// abstraction layer by placing names.
......
......@@ -247,12 +247,11 @@ toku_le_apply_msg(FT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data
uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced
TXNID oldest_referenced_xid,
GC_INFO gc_info,
txn_gc_info *gc_info,
LEAFENTRY *new_leafentry_p,
int64_t * numbytes_delta_p);
bool toku_le_worth_running_garbage_collection(LEAFENTRY le, TXNID oldest_referenced_xid_known);
bool toku_le_worth_running_garbage_collection(LEAFENTRY le, txn_gc_info *gc_info);
void
toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
......@@ -260,11 +259,8 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
uint32_t idx,
void* keyp,
uint32_t keylen,
txn_gc_info *gc_info,
LEAFENTRY *new_leaf_entry,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns,
TXNID oldest_referenced_xid_known,
int64_t * numbytes_delta_p);
#endif /* TOKU_LEAFENTRY_H */
......
......@@ -247,7 +247,6 @@ struct tokutxn {
DB_TXN *container_db_txn; // reference to DB_TXN that contains this tokutxn
xid_omt_t *live_root_txn_list; // the root txns live when the root ancestor (self if a root) started.
XIDS xids; // Represents the xid list
TXNID oldest_referenced_xid;
TOKUTXN snapshot_next;
TOKUTXN snapshot_prev;
......
......@@ -96,6 +96,7 @@ PATENT RIGHTS GRANT:
#include "ft.h"
#include "ft-ops.h"
#include "log-internal.h"
//#include "txn_manager.h"
#include "xids.h"
#include "rollback-apply.h"
......@@ -265,7 +266,16 @@ static int do_insertion (enum ft_msg_type type, FILENUM filenum, BYTESTRING key,
? toku_fill_dbt(&data_dbt, data->data, data->len)
: toku_init_dbt(&data_dbt) } } };
toku_ft_root_put_cmd(h, &ftcmd, txn->oldest_referenced_xid, make_gc_info(!txn->for_recovery));
TXN_MANAGER txn_manager = toku_logger_get_txn_manager(txn->logger);
txn_manager_state txn_state_for_gc(txn_manager);
TXNID oldest_referenced_xid_estimate = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_estimate,
// no messages above us, we can implicitly promote uxrs based on this xid
oldest_referenced_xid_estimate,
!txn->for_recovery);
toku_ft_root_put_cmd(h, &ftcmd, &gc_info);
if (reset_root_xid_that_created) {
TXNID new_root_xid_that_created = xids_get_outermost_xid(xids);
toku_reset_root_xid_that_created(h, new_root_xid_that_created);
......
......@@ -124,8 +124,9 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
// apply an insert to the leaf node
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u = {.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(false), NULL, NULL);
toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, &gc_info, NULL, NULL);
leafnode->max_msn_applied_to_node_on_disk = msn;
......
......@@ -132,8 +132,9 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
MSN msn = next_dummymsn();
brt->ft->h->max_msn_in_ft = msn;
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd, make_gc_info(false), nullptr, nullptr);
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd, &gc_info, nullptr, nullptr);
{
int r = toku_ft_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0);
......@@ -141,7 +142,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
}
FT_MSG_S badcmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &badcmd, make_gc_info(false), nullptr, nullptr);
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &badcmd, &gc_info, nullptr, nullptr);
// message should be rejected for duplicate msn, row should still have original val
{
......@@ -154,7 +155,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
msn = next_dummymsn();
brt->ft->h->max_msn_in_ft = msn;
FT_MSG_S cmd2 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &val2 }} };
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd2, make_gc_info(false), nullptr, nullptr);
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd2, &gc_info, nullptr, nullptr);
// message should be accepted, val should have new value
{
......@@ -166,7 +167,7 @@ append_leaf(FT_HANDLE brt, FTNODE leafnode, void *key, uint32_t keylen, void *va
// now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10;
FT_MSG_S cmd3 = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &badval } }};
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd3, make_gc_info(false), nullptr, nullptr);
toku_ft_leaf_apply_cmd(brt->ft->compare_fun, brt->ft->update_fun, &brt->ft->cmp_descriptor, leafnode, -1, &cmd3, &gc_info, nullptr, nullptr);
// message should be rejected, val should still have value in pair2
{
......
......@@ -96,6 +96,7 @@ PATENT RIGHTS GRANT:
static TOKUTXN const null_txn = 0;
static DB * const null_db = 0;
static const char *fname = TOKU_TEST_FILENAME;
static txn_gc_info non_mvcc_gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
static int dummy_cmp(DB *db __attribute__((unused)),
const DBT *a, const DBT *b) {
......@@ -217,8 +218,8 @@ insert_random_message_to_bn(
*keylenp = keydbt->size;
*keyp = toku_xmemdup(keydbt->data, keydbt->size);
int64_t numbytes;
toku_le_apply_msg(&msg, NULL, NULL, 0, TXNID_NONE, make_gc_info(false), save, &numbytes);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb, &msg, TXNID_NONE, make_gc_info(false), NULL, NULL);
toku_le_apply_msg(&msg, NULL, NULL, 0, &non_mvcc_gc_info, save, &numbytes);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb, &msg, &non_mvcc_gc_info, NULL, NULL);
if (msn.msn > blb->max_msn_applied.msn) {
blb->max_msn_applied = msn;
}
......@@ -267,12 +268,12 @@ insert_same_message_to_bns(
*keylenp = keydbt->size;
*keyp = toku_xmemdup(keydbt->data, keydbt->size);
int64_t numbytes;
toku_le_apply_msg(&msg, NULL, NULL, 0, TXNID_NONE, make_gc_info(false), save, &numbytes);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb1, &msg, TXNID_NONE, make_gc_info(false), NULL, NULL);
toku_le_apply_msg(&msg, NULL, NULL, 0, &non_mvcc_gc_info, save, &numbytes);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb1, &msg, &non_mvcc_gc_info, NULL, NULL);
if (msn.msn > blb1->max_msn_applied.msn) {
blb1->max_msn_applied = msn;
}
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb2, &msg, TXNID_NONE, make_gc_info(false), NULL, NULL);
toku_ft_bn_apply_cmd(t->ft->compare_fun, t->ft->update_fun, NULL, blb2, &msg, &non_mvcc_gc_info, NULL, NULL);
if (msn.msn > blb2->max_msn_applied.msn) {
blb2->max_msn_applied = msn;
}
......@@ -684,7 +685,7 @@ flush_to_leaf(FT_HANDLE t, bool make_leaf_up_to_date, bool use_flush) {
if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], make_gc_info(false), NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], &non_mvcc_gc_info, NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
......@@ -908,7 +909,7 @@ flush_to_leaf_with_keyrange(FT_HANDLE t, bool make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (dummy_cmp(NULL, parent_messages[i]->u.id.key, &childkeys[7]) <= 0 &&
!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], make_gc_info(false), NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child, -1, parent_messages[i], &non_mvcc_gc_info, NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
......@@ -1104,8 +1105,8 @@ compare_apply_and_flush(FT_HANDLE t, bool make_leaf_up_to_date) {
if (make_leaf_up_to_date) {
for (i = 0; i < num_parent_messages; ++i) {
if (!parent_messages_is_fresh[i]) {
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, -1, parent_messages[i], make_gc_info(false), NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, -1, parent_messages[i], make_gc_info(false), NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child1, -1, parent_messages[i], &non_mvcc_gc_info, NULL, NULL);
toku_ft_leaf_apply_cmd(t->ft->compare_fun, t->ft->update_fun, &t->ft->descriptor, child2, -1, parent_messages[i], &non_mvcc_gc_info, NULL, NULL);
}
}
for (i = 0; i < 8; ++i) {
......
......@@ -453,12 +453,12 @@ test_le_apply(ULE ule_initial, FT_MSG msg, ULE ule_expected) {
size_t result_memsize = 0;
int64_t ignoreme;
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
toku_le_apply_msg(msg,
le_initial,
nullptr,
0,
TXNID_NONE,
make_gc_info(true),
&gc_info,
&le_result,
&ignoreme);
if (le_result) {
......@@ -751,7 +751,8 @@ static bool ule_worth_running_garbage_collection(ULE ule, TXNID oldest_reference
LEAFENTRY le;
int r = le_pack(ule, nullptr, 0, nullptr, 0, 0, &le); CKERR(r);
invariant_notnull(le);
bool worth_running = toku_le_worth_running_garbage_collection(le, oldest_referenced_xid_known);
txn_gc_info gc_info(nullptr, oldest_referenced_xid_known, oldest_referenced_xid_known, true);
bool worth_running = toku_le_worth_running_garbage_collection(le, &gc_info);
toku_free(le);
return worth_running;
}
......
......@@ -128,7 +128,8 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(false), NULL, NULL);
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, &gc_info, NULL, NULL);
// Create bad tree (don't do following):
// leafnode->max_msn_applied_to_node = msn;
......
......@@ -116,7 +116,8 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(false), NULL, NULL);
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -117,7 +117,8 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(false), NULL, NULL);
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -116,7 +116,8 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(false), NULL, NULL);
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -117,7 +117,8 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(false), NULL, NULL);
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_cmd_once(BLB(leafnode,0), &cmd, idx, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -119,7 +119,8 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(false), NULL, NULL);
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -116,7 +116,8 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
MSN msn = next_dummymsn();
FT_MSG_S cmd = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, TXNID_NONE, make_gc_info(false), NULL, NULL);
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_cmd_once(BLB(leafnode, 0), &cmd, idx, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -239,7 +239,6 @@ toku_txn_begin_with_xid (
}
else {
parent->child_manager->start_child_txn_for_recovery(txn, parent, xid);
txn->oldest_referenced_xid = parent->oldest_referenced_xid;
}
}
else {
......@@ -255,7 +254,6 @@ toku_txn_begin_with_xid (
}
else {
parent->child_manager->start_child_txn(txn, parent);
txn->oldest_referenced_xid = parent->oldest_referenced_xid;
toku_txn_manager_handle_snapshot_create_for_child_txn(
txn,
logger->txn_manager,
......@@ -327,7 +325,6 @@ static txn_child_manager tcm;
.container_db_txn = container_db_txn,
.live_root_txn_list = nullptr,
.xids = NULL,
.oldest_referenced_xid = TXNID_NONE,
.snapshot_next = NULL,
.snapshot_prev = NULL,
.begin_was_logged = false,
......
......@@ -291,6 +291,7 @@ void toku_txn_manager_init(TXN_MANAGER* txn_managerp) {
txn_manager->last_xid = 0;
txn_manager->last_xid_seen_for_recover = TXNID_NONE;
txn_manager->last_calculated_oldest_referenced_xid = TXNID_NONE;
*txn_managerp = txn_manager;
}
......@@ -324,6 +325,10 @@ toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager) {
return rval;
}
TXNID toku_txn_manager_get_oldest_referenced_xid_estimate(TXN_MANAGER txn_manager) {
return txn_manager->last_calculated_oldest_referenced_xid;
}
int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids);
int live_root_txn_list_iter(const TOKUTXN &live_xid, const uint32_t UU(index), TXNID **const referenced_xids){
(*referenced_xids)[index] = live_xid->txnid.parent_id64;
......@@ -371,7 +376,7 @@ max_xid(TXNID a, TXNID b) {
return a < b ? b : a;
}
static TXNID get_oldest_referenced_xid_unlocked(TXN_MANAGER txn_manager) {
static void set_oldest_referenced_xid(TXN_MANAGER txn_manager) {
TXNID oldest_referenced_xid = TXNID_MAX;
int r;
if (txn_manager->live_root_ids.size() > 0) {
......@@ -397,8 +402,8 @@ static TXNID get_oldest_referenced_xid_unlocked(TXN_MANAGER txn_manager) {
if (txn_manager->last_xid < oldest_referenced_xid) {
oldest_referenced_xid = txn_manager->last_xid;
}
paranoid_invariant(oldest_referenced_xid != TXNID_MAX);
return oldest_referenced_xid;
invariant(oldest_referenced_xid != TXNID_MAX);
txn_manager->last_calculated_oldest_referenced_xid = oldest_referenced_xid;
}
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
......@@ -610,7 +615,6 @@ void toku_txn_manager_start_txn_for_recovery(
// using xid that is passed in
txn_manager->last_xid = max_xid(txn_manager->last_xid, xid);
toku_txn_update_xids_in_txn(txn, xid);
txn->oldest_referenced_xid = TXNID_NONE;
uint32_t idx;
int r = txn_manager->live_root_txns.find_zero<TOKUTXN, find_xid>(txn, nullptr, &idx);
......@@ -672,7 +676,7 @@ void toku_txn_manager_start_txn(
r = txn_manager->live_root_ids.insert_at(txn->txnid.parent_id64, idx);
invariant_zero(r);
}
txn->oldest_referenced_xid = get_oldest_referenced_xid_unlocked(txn_manager);
set_oldest_referenced_xid(txn_manager);
if (needs_snapshot) {
txn_manager_create_snapshot_unlocked(
......@@ -825,7 +829,17 @@ void toku_txn_manager_clone_state_for_gc(
txn_manager_unlock(txn_manager);
}
void txn_manager_state::init() {
invariant(!initialized);
invariant_notnull(txn_manager);
toku_txn_manager_clone_state_for_gc(
txn_manager,
&snapshot_xids,
&referenced_xids,
&live_root_txns
);
initialized = true;
}
void toku_txn_manager_id2txn_unlocked(TXN_MANAGER txn_manager, TXNID_PAIR txnid, TOKUTXN *result) {
TOKUTXN txn;
......
......@@ -121,14 +121,74 @@ struct txn_manager {
TXNID last_xid;
TXNID last_xid_seen_for_recover;
TXNID last_calculated_oldest_referenced_xid;
};
struct txn_manager_state {
txn_manager_state(TXN_MANAGER mgr) :
txn_manager(mgr),
initialized(false) {
snapshot_xids.create_no_array();
referenced_xids.create_no_array();
live_root_txns.create_no_array();
}
// should not copy construct
txn_manager_state &operator=(txn_manager_state &rhs) = delete;
txn_manager_state(txn_manager_state &rhs) = delete;
~txn_manager_state() {
snapshot_xids.destroy();
referenced_xids.destroy();
live_root_txns.destroy();
}
void init();
TXN_MANAGER txn_manager;
bool initialized;
// a snapshot of the txn manager's mvcc state
// only valid if initialized = true
xid_omt_t snapshot_xids;
rx_omt_t referenced_xids;
xid_omt_t live_root_txns;
};
// represents all of the information needed to run garbage collection
struct txn_gc_info {
txn_gc_info(txn_manager_state *st, TXNID xid_sgc, TXNID xid_ip, bool mvcc)
: txn_state_for_gc(st),
oldest_referenced_xid_for_simple_gc(xid_sgc),
oldest_referenced_xid_for_implicit_promotion(xid_ip),
mvcc_needed(mvcc) {
}
// a snapshot of the transcation system. may be null.
txn_manager_state *txn_state_for_gc;
// the oldest xid in any live list
//
// suitible for simple garbage collection that cleans up multiple committed
// transaction records into one. not suitible for implicit promotions, which
// must be correct in the face of abort messages - see ftnode->oldest_referenced_xid
TXNID oldest_referenced_xid_for_simple_gc;
// lower bound on the oldest xid in any live when the messages to be cleaned
// had no messages above them. suitable for implicitly promoting a provisonal uxr.
TXNID oldest_referenced_xid_for_implicit_promotion;
// whether or not mvcc is actually needed - false during recovery and non-transactional systems
const bool mvcc_needed;
};
void toku_txn_manager_init(TXN_MANAGER* txn_manager);
void toku_txn_manager_destroy(TXN_MANAGER txn_manager);
TXNID toku_txn_manager_get_oldest_living_xid(TXN_MANAGER txn_manager);
TXNID toku_txn_manager_get_oldest_referenced_xid_estimate(TXN_MANAGER txn_manager);
void toku_txn_manager_handle_snapshot_create_for_child_txn(
TOKUTXN txn,
TXN_MANAGER txn_manager,
......
......@@ -116,7 +116,7 @@ PATENT RIGHTS GRANT:
#include "ule-internal.h"
#include <util/status.h>
#include <util/scoped_malloc.h>
#include <util/partitioned_counter.h>
#define ULE_DEBUG 0
......@@ -141,6 +141,10 @@ status_init(void) {
STATUS_INIT(LE_MAX_PROVISIONAL_XR, nullptr, UINT64, "max provisional xr", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_EXPANDED, nullptr, UINT64, "expanded", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_MAX_MEMSIZE, nullptr, UINT64, "max memsize", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_APPLY_GC_BYTES_IN, nullptr, PARCOUNT, "size of leafentries before garbage collection (during message application)", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_APPLY_GC_BYTES_OUT, nullptr, PARCOUNT, "size of leafentries after garbage collection (during message application)", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_NORMAL_GC_BYTES_IN, nullptr, PARCOUNT, "size of leafentries before garbage collection (outside message application)", TOKU_ENGINE_STATUS);
STATUS_INIT(LE_NORMAL_GC_BYTES_OUT,nullptr, PARCOUNT, "size of leafentries after garbage collection (outside message application)", TOKU_ENGINE_STATUS);
le_status.initialized = true;
}
#undef STATUS_INIT
......@@ -153,6 +157,14 @@ toku_le_get_status(LE_STATUS statp) {
}
#define STATUS_VALUE(x) le_status.status[x].value.num
#define STATUS_INC(x, d) \
do { \
if (le_status.status[x].type == PARCOUNT) { \
increment_partitioned_counter(le_status.status[x].value.parcount, d); \
} else { \
toku_sync_fetch_and_add(&le_status.status[x].value.num, d); \
} \
} while (0)
///////////////////////////////////////////////////////////////////////////////////
......@@ -309,18 +321,18 @@ xid_reads_committed_xid(TXNID tl1, TXNID xc, const xid_omt_t &snapshot_txnids, c
// so we get rid of them.
//
static void
ule_simple_garbage_collection(ULE ule, TXNID oldest_referenced_xid, GC_INFO gc_info) {
ule_simple_garbage_collection(ULE ule, txn_gc_info *gc_info) {
uint32_t curr_index = 0;
uint32_t num_entries;
if (ule->num_cuxrs == 1) {
goto done;
}
if (gc_info.mvcc_needed) {
if (gc_info->mvcc_needed) {
// starting at the top of the committed stack, find the first
// uxr with a txnid that is less than oldest_referenced_xid
for (uint32_t i = 0; i < ule->num_cuxrs; i++) {
curr_index = ule->num_cuxrs - i - 1;
if (ule->uxrs[curr_index].xid < oldest_referenced_xid) {
if (ule->uxrs[curr_index].xid < gc_info->oldest_referenced_xid_for_simple_gc) {
break;
}
}
......@@ -441,6 +453,25 @@ ule_garbage_collect(ULE ule, const xid_omt_t &snapshot_xids, const rx_omt_t &ref
done:;
}
static size_t ule_packed_memsize(ULE ule) {
// Returns: The size 'ule' would be when packed into a leafentry, or 0 if the
// topmost committed value is a delete.
if (ule->num_cuxrs == 1 && ule->num_puxrs == 0) {
UXR uxr = ule_get_innermost_uxr(ule);
if (uxr_is_delete(uxr)) {
return 0;
}
}
return le_memsize_from_ule(ule);
}
// Heuristics to control when we decide to initialize
// txn manager state (possibly expensive) and run gc.
enum {
ULE_MIN_STACK_SIZE_TO_FORCE_GC = 5,
ULE_MIN_MEMSIZE_TO_FORCE_GC = 1024 * 1024
};
/////////////////////////////////////////////////////////////////////////////////
// This is the big enchilada. (Bring Tums.) Note that this level of abstraction
// has no knowledge of the inner structure of either leafentry or msg. It makes
......@@ -460,10 +491,10 @@ toku_le_apply_msg(FT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data
uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced
TXNID oldest_referenced_xid,
GC_INFO gc_info,
txn_gc_info *gc_info,
LEAFENTRY *new_leafentry_p,
int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead
invariant_notnull(gc_info);
paranoid_invariant_notnull(new_leafentry_p);
ULE_S ule;
int64_t oldnumbytes = 0;
......@@ -486,7 +517,35 @@ toku_le_apply_msg(FT_MSG msg,
oldnumbytes = ule_get_innermost_numbytes(&ule, keylen);
}
msg_modify_ule(&ule, msg); // modify unpacked leafentry
ule_simple_garbage_collection(&ule, oldest_referenced_xid, gc_info);
// - we may be able to immediately promote the newly-apllied outermost provisonal uxr
// - either way, run simple gc first, and then full gc if there are still some committed uxrs.
ule_try_promote_provisional_outermost(&ule, gc_info->oldest_referenced_xid_for_implicit_promotion);
ule_simple_garbage_collection(&ule, gc_info);
txn_manager_state *txn_state_for_gc = gc_info->txn_state_for_gc;
size_t size_before_gc = 0;
if (ule.num_cuxrs > 1 && txn_state_for_gc != nullptr && // there is garbage to clean, and our caller gave us state..
// ..and either the state is pre-initialized, or the committed stack is large enough
(txn_state_for_gc->initialized || ule.num_cuxrs >= ULE_MIN_STACK_SIZE_TO_FORCE_GC ||
// ..or the ule's raw memsize is sufficiently large
(size_before_gc = ule_packed_memsize(&ule)) >= ULE_MIN_MEMSIZE_TO_FORCE_GC)) {
// ..then it's worth running gc, possibly initializing the txn manager state, if it isn't already
if (!txn_state_for_gc->initialized) {
txn_state_for_gc->init();
}
size_before_gc = size_before_gc != 0 ? size_before_gc : // it's already been calculated above
ule_packed_memsize(&ule);
ule_garbage_collect(&ule,
txn_state_for_gc->snapshot_xids,
txn_state_for_gc->referenced_xids,
txn_state_for_gc->live_root_txns
);
size_t size_after_gc = ule_packed_memsize(&ule);
STATUS_INC(LE_APPLY_GC_BYTES_IN, size_before_gc);
STATUS_INC(LE_APPLY_GC_BYTES_OUT, size_after_gc);
}
int rval = le_pack(
&ule, // create packed leafentry
data_buffer,
......@@ -504,7 +563,7 @@ toku_le_apply_msg(FT_MSG msg,
ule_cleanup(&ule);
}
bool toku_le_worth_running_garbage_collection(LEAFENTRY le, TXNID oldest_referenced_xid_known) {
bool toku_le_worth_running_garbage_collection(LEAFENTRY le, txn_gc_info *gc_info) {
// Effect: Quickly determines if it's worth trying to run garbage collection on a leafentry
// Return: True if it makes sense to try garbage collection, false otherwise.
// Rationale: Garbage collection is likely to clean up under two circumstances:
......@@ -520,7 +579,8 @@ bool toku_le_worth_running_garbage_collection(LEAFENTRY le, TXNID oldest_referen
} else {
paranoid_invariant(le->u.mvcc.num_cxrs == 1);
}
return le->u.mvcc.num_pxrs > 0 && le_outermost_uncommitted_xid(le) < oldest_referenced_xid_known;
return le->u.mvcc.num_pxrs > 0 &&
le_outermost_uncommitted_xid(le) < gc_info->oldest_referenced_xid_for_implicit_promotion;
}
// Garbage collect one leaf entry, using the given OMT's.
......@@ -547,12 +607,12 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
uint32_t idx,
void* keyp,
uint32_t keylen,
txn_gc_info *gc_info,
LEAFENTRY *new_leaf_entry,
const xid_omt_t &snapshot_xids,
const rx_omt_t &referenced_xids,
const xid_omt_t &live_root_txns,
TXNID oldest_referenced_xid_known,
int64_t * numbytes_delta_p) {
// We shouldn't want to run gc without having provided a snapshot of the txn system.
invariant_notnull(gc_info);
invariant_notnull(gc_info->txn_state_for_gc);
paranoid_invariant_notnull(new_leaf_entry);
ULE_S ule;
int64_t oldnumbytes = 0;
......@@ -576,9 +636,19 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
// The oldest known refeferenced xid is a lower bound on the oldest possible
// live xid, so we use that. It's usually close enough to get rid of most
// garbage in leafentries.
TXNID oldest_possible_live_xid = oldest_referenced_xid_known;
ule_try_promote_provisional_outermost(&ule, oldest_possible_live_xid);
ule_garbage_collect(&ule, snapshot_xids, referenced_xids, live_root_txns);
ule_try_promote_provisional_outermost(&ule, gc_info->oldest_referenced_xid_for_implicit_promotion);
// No need to run simple gc here if we're going straight for full gc.
if (ule.num_cuxrs > 1) {
size_t size_before_gc = ule_packed_memsize(&ule);
ule_garbage_collect(&ule,
gc_info->txn_state_for_gc->snapshot_xids,
gc_info->txn_state_for_gc->referenced_xids,
gc_info->txn_state_for_gc->live_root_txns);
size_t size_after_gc = ule_packed_memsize(&ule);
STATUS_INC(LE_APPLY_GC_BYTES_IN, size_before_gc);
STATUS_INC(LE_APPLY_GC_BYTES_OUT, size_after_gc);
}
int r = le_pack(
&ule,
......
......@@ -615,7 +615,16 @@ indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xi
} else {
result = toku_ydb_check_avail_fs_space(indexer->i->env);
if (result == 0) {
toku_ft_send_delete(db_struct_i(hotdb)->ft_handle, hotkey, xids, TXNID_NONE, make_gc_info(true));
FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle;
TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
txn_manager_state txn_state_for_gc(txn_manager);
TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_estimate,
oldest_referenced_xid_estimate,
true);
toku_ft_send_delete(db_struct_i(hotdb)->ft_handle, hotkey, xids, &gc_info);
}
}
return result;
......@@ -651,7 +660,16 @@ indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *ho
} else {
result = toku_ydb_check_avail_fs_space(indexer->i->env);
if (result == 0) {
toku_ft_send_insert(db_struct_i(hotdb)->ft_handle, hotkey, hotval, xids, FT_INSERT, TXNID_NONE, make_gc_info(true));
FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle;
TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
txn_manager_state txn_state_for_gc(txn_manager);
TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_estimate,
oldest_referenced_xid_estimate,
true);
toku_ft_send_insert(db_struct_i(hotdb)->ft_handle, hotkey, hotval, xids, FT_INSERT, &gc_info);
}
}
return result;
......@@ -670,8 +688,18 @@ indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) {
result = indexer->i->test_commit_any(indexer, hotdb, hotkey, xids);
} else {
result = toku_ydb_check_avail_fs_space(indexer->i->env);
if (result == 0)
toku_ft_send_commit_any(db_struct_i(hotdb)->ft_handle, hotkey, xids, TXNID_NONE, make_gc_info(true));
if (result == 0) {
FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle;
TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
txn_manager_state txn_state_for_gc(txn_manager);
TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
txn_gc_info gc_info(&txn_state_for_gc,
oldest_referenced_xid_estimate,
oldest_referenced_xid_estimate,
true);
toku_ft_send_commit_any(db_struct_i(hotdb)->ft_handle, hotkey, xids, &gc_info);
}
}
}
return result;
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
/*
COPYING CONDITIONS NOTICE:
This program is free software; you can redistribute it and/or modify
it under the terms of version 2 of the GNU General Public License as
published by the Free Software Foundation, and provided that the
following conditions are met:
* Redistributions of source code must retain this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below).
* Redistributions in binary form must reproduce this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below) in the documentation and/or other materials
provided with the distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2014 Tokutek, Inc.
DISCLAIMER:
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
UNIVERSITY PATENT NOTICE:
The technology is licensed by the Massachusetts Institute of
Technology, Rutgers State University of New Jersey, and the Research
Foundation of State University of New York at Stony Brook under
United States of America Serial No. 11/760379 and to the patents
and/or patent applications resulting from it.
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
This software is covered by US Patent No. 8,489,638.
PATENT RIGHTS GRANT:
"THIS IMPLEMENTATION" means the copyrightable works distributed by
Tokutek as part of the Fractal Tree project.
"PATENT CLAIMS" means the claims of patents that are owned or
licensable by Tokutek, both currently or in the future; and that in
the absence of this license would be infringed by THIS
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
"PATENT CHALLENGE" shall mean a challenge to the validity,
patentability, enforceability and/or non-infringement of any of the
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
Tokutek hereby grants to you, for the term and geographical scope of
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, transfer, and
otherwise run, modify, and propagate the contents of THIS
IMPLEMENTATION, where such license applies only to the PATENT
CLAIMS. This grant does not include claims that would be infringed
only as a consequence of further modifications of THIS
IMPLEMENTATION. If you or your agent or licensee institute or order
or agree to the institution of patent litigation against any entity
(including a cross-claim or counterclaim in a lawsuit) alleging that
THIS IMPLEMENTATION constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any rights
granted to you under this License shall terminate as of the date
such litigation is filed. If you or your agent or exclusive
licensee institute or order or agree to the institution of a PATENT
CHALLENGE, then Tokutek may terminate any rights granted to you
under this License.
*/
#ident "Copyright (c) 2014 Tokutek Inc. All rights reserved."
#include "test.h"
// Test the following scenario:
// Begin A
// A deletes key K
// A aborts
// Begin B
// B deletes key K-1
// B deletes key K
// B deletes key K+1
// B commits
// Begin C
// C queries K, should read K (not the delete!).
//
// An incorrect mvcc implementation would 'implicitly' promote
// A's delete to committed, based on the fact that the oldest
// referenced xid at the time of injection for key k-1 and k+1
// is greater than A's xid.
static void test_insert_bad_implicit_promotion(void) {
int r;
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_cachesize(env, 1, 0, 1); CKERR(r); // 1gb cache so this test fits in memory
r = env->open(env, TOKU_TEST_FILENAME, DB_CREATE+DB_PRIVATE+DB_INIT_MPOOL+DB_INIT_TXN, 0); CKERR(r);
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->set_pagesize(db, 4096); CKERR(r);
r = db->open(db, NULL, "db", NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
const int val_size = 512;
DBT key;
DBT val;
char *XMALLOC_N(val_size, val_buf);
memset(val_buf, 'x', val_size);
dbt_init(&val, val_buf, val_size);
// Insert rows [0, N]
const int N = 1000;
for (int i = 0; i < N; i++) {
int k = toku_htonl(i);
dbt_init(&key, &k, sizeof(k));
r = db->put(db, NULL, &key, &val, 0); CKERR(r);
}
int key_500 = toku_htonl(500);
int key_499 = toku_htonl(499);
int key_501 = toku_htonl(501);
// sanity check our keys
r = db->get(db, NULL, dbt_init(&key, &key_500, sizeof(key_500)), &val, 0); CKERR(r);
r = db->get(db, NULL, dbt_init(&key, &key_500, sizeof(key_499)), &val, 0); CKERR(r);
r = db->get(db, NULL, dbt_init(&key, &key_500, sizeof(key_501)), &val, 0); CKERR(r);
// Abort a delete for key 500
DB_TXN *txn_A;
r = env->txn_begin(env, NULL, &txn_A, DB_SERIALIZABLE); CKERR(r);
dbt_init(&key, &key_500, sizeof(key_500));
r = db->del(db, txn_A, &key, DB_DELETE_ANY); CKERR(r);
r = txn_A->abort(txn_A); CKERR(r);
// Commit two deletes on keys 499 and 501. This should inject
// at least one message in the same buffer that has the delete/abort
// messages for key 500.
DB_TXN *txn_B;
r = env->txn_begin(env, NULL, &txn_B, DB_SERIALIZABLE); CKERR(r);
dbt_init(&key, &key_499, sizeof(key_499));
r = db->del(db, txn_B, &key, DB_DELETE_ANY); CKERR(r);
dbt_init(&key, &key_501, sizeof(key_501));
r = db->del(db, txn_B, &key, DB_DELETE_ANY); CKERR(r);
r = txn_B->commit(txn_B, 0); CKERR(r);
// No transactions are live - so when we create txn C, the oldest
// referenced xid will be txn C. If our implicit promotion logic is
// wrong, we will use txn C's xid to promote the delete on key 500
// before the abort message hits it, and C's query will return nothing.
DB_TXN *txn_C;
dbt_init(&key, &key_500, sizeof(key_500));
r = env->txn_begin(env, NULL, &txn_C, DB_TXN_SNAPSHOT); CKERR(r);
r = db->get(db, txn_C, &key, &val, 0); CKERR(r);
r = txn_C->commit(txn_C, 0); CKERR(r);
toku_free(val_buf);
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
}
int
test_main(int argc, char *const argv[]) {
parse_args(argc, argv);
toku_os_recursive_delete(TOKU_TEST_FILENAME);
int r = toku_os_mkdir(TOKU_TEST_FILENAME, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
test_insert_bad_implicit_promotion();
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment