Commit f5ca05e2 authored by Leif Walsh's avatar Leif Walsh

Merge pull request #100 from Tokutek/mongo/650

remove fullhash from rollback_log_nodes

fixes Tokutek/mongo#650
parents b5d8ed60 a3dec7ad
......@@ -632,7 +632,7 @@ int toku_serialize_ftnode_to(int fd, BLOCKNUM, FTNODE node, FTNODE_DISK_DATA* nd
int toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized_log, bool is_serialized,
FT h, bool for_checkpoint);
void toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized);
int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash, ROLLBACK_LOG_NODE *logp, FT h);
int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *logp, FT h);
int toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, struct ftnode_fetch_extra* bfe);
int toku_deserialize_bp_from_compressed(FTNODE node, int childnum, struct ftnode_fetch_extra *bfe);
int toku_deserialize_ftnode_from (int fd, BLOCKNUM off, uint32_t /*fullhash*/, FTNODE *ftnode, FTNODE_DISK_DATA* ndd, struct ftnode_fetch_extra* bfe);
......
......@@ -461,6 +461,7 @@ serialize_ft_min_size (uint32_t version) {
size_t size = 0;
switch(version) {
case FT_LAYOUT_VERSION_25:
case FT_LAYOUT_VERSION_24:
case FT_LAYOUT_VERSION_23:
case FT_LAYOUT_VERSION_22:
......
......@@ -118,6 +118,7 @@ enum ft_layout_version_e {
FT_LAYOUT_VERSION_22 = 22, // Ming: Add oldest known referenced xid to each ftnode, for better garbage collection
FT_LAYOUT_VERSION_23 = 23, // Ming: Fix upgrade path #5902
FT_LAYOUT_VERSION_24 = 24, // Riddler: change logentries that log transactions to store TXNID_PAIRs instead of TXNIDs
FT_LAYOUT_VERSION_25 = 25, // SecretSquirrel: ROLLBACK_LOG_NODES (on disk and in memory) now just use blocknum (instead of blocknum + hash) to point to other log nodes. same for xstillopen log entry
FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
......
......@@ -2763,8 +2763,7 @@ toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBA
}
static int
deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, uint32_t fullhash, ROLLBACK_LOG_NODE *log_p,
FT h, struct rbuf *rb) {
deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log_p, struct rbuf *rb) {
ROLLBACK_LOG_NODE MALLOC(result);
int r;
if (result==NULL) {
......@@ -2793,13 +2792,7 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, uint32_t fullhash, ROLLBA
r = toku_db_badformat();
goto died0;
}
result->hash = toku_cachetable_hash(h->cf, result->blocknum);
if (result->hash != fullhash) {
r = toku_db_badformat();
goto died0;
}
result->previous = rbuf_blocknum(rb);
result->previous_hash = toku_cachetable_hash(h->cf, result->previous);
result->rollentry_resident_bytecount = rbuf_ulonglong(rb);
size_t arena_initial_size = rbuf_ulonglong(rb);
......@@ -2840,13 +2833,13 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, uint32_t fullhash, ROLLBA
}
static int
deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknum, uint32_t fullhash,
deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknum,
ROLLBACK_LOG_NODE *log,
FT h, struct rbuf *rb) {
struct rbuf *rb) {
int r = 0;
ROLLBACK_LOG_NODE rollback_log_node = NULL;
invariant(version==FT_LAYOUT_VERSION); //Rollback log nodes do not survive version changes.
r = deserialize_rollback_log_from_rbuf(blocknum, fullhash, &rollback_log_node, h, rb);
r = deserialize_rollback_log_from_rbuf(blocknum, &rollback_log_node, rb);
if (r==0) {
*log = rollback_log_node;
}
......@@ -3022,8 +3015,7 @@ cleanup:
// Read rollback log node from file into struct. Perform version upgrade if necessary.
int
toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash,
ROLLBACK_LOG_NODE *logp, FT h) {
toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *logp, FT h) {
int layout_version = 0;
int r;
struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0};
......@@ -3037,7 +3029,6 @@ toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash
ROLLBACK_LOG_NODE XMALLOC(log);
rollback_empty_log_init(log);
log->blocknum.b = blocknum.b;
log->hash = fullhash;
r = 0;
*logp = log;
goto cleanup;
......@@ -3054,7 +3045,7 @@ toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash
}
}
r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, fullhash, logp, h, &rb);
r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, logp, &rb);
cleanup:
if (rb.buf) toku_free(rb.buf);
......
......@@ -209,15 +209,12 @@ struct txn_roll_info {
// the spilled rollback head is the block number of the first rollback node
// that makes up the rollback log chain
BLOCKNUM spilled_rollback_head;
uint32_t spilled_rollback_head_hash;
// the spilled rollback is the block number of the last rollback node that
// makes up the rollback log chain.
BLOCKNUM spilled_rollback_tail;
uint32_t spilled_rollback_tail_hash;
// the current rollback node block number we may use. if this is ROLLBACK_NONE,
// then we need to create one and set it here before using it.
BLOCKNUM current_rollback;
uint32_t current_rollback_hash;
};
struct tokutxn {
......
......@@ -155,9 +155,7 @@ const struct logtype rollbacks[] = {
{"rollinclude", 'r', FA{{"TXNID_PAIR", "xid", 0},
{"uint64_t", "num_nodes", 0},
{"BLOCKNUM", "spilled_head", 0},
{"uint32_t", "spilled_head_hash", 0},
{"BLOCKNUM", "spilled_tail", 0},
{"uint32_t", "spilled_tail_hash", 0},
NULLFIELD}, LOG_BEGIN_ACTION_NA},
{"load", 'l', FA{{"FILENUM", "old_filenum", 0},
{"BYTESTRING", "new_iname", 0},
......
......@@ -365,9 +365,7 @@ static int
toku_apply_rollinclude (TXNID_PAIR xid,
uint64_t num_nodes,
BLOCKNUM spilled_head,
uint32_t spilled_head_hash __attribute__((__unused__)),
BLOCKNUM spilled_tail,
uint32_t spilled_tail_hash,
TOKUTXN txn,
LSN oplsn,
apply_rollback_item func) {
......@@ -375,7 +373,6 @@ toku_apply_rollinclude (TXNID_PAIR xid,
struct roll_entry *item;
BLOCKNUM next_log = spilled_tail;
uint32_t next_log_hash = spilled_tail_hash;
uint64_t last_sequence = num_nodes;
bool found_head = false;
......@@ -383,7 +380,7 @@ toku_apply_rollinclude (TXNID_PAIR xid,
while (next_log.b != ROLLBACK_NONE.b) {
//pin log
ROLLBACK_LOG_NODE log;
toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
toku_get_and_pin_rollback_log(txn, next_log, &log);
toku_rollback_verify_contents(log, xid, last_sequence - 1);
last_sequence = log->sequence;
......@@ -400,16 +397,13 @@ toku_apply_rollinclude (TXNID_PAIR xid,
assert(log->sequence == 0);
}
next_log = log->previous;
next_log_hash = log->previous_hash;
{
//Clean up transaction structure to prevent
//toku_txn_close from double-freeing
spilled_tail = next_log;
spilled_tail_hash = next_log_hash;
if (found_head) {
assert(next_log.b == ROLLBACK_NONE.b);
spilled_head = next_log;
spilled_head_hash = next_log_hash;
}
}
toku_rollback_log_unpin_and_remove(txn, log);
......@@ -421,15 +415,13 @@ int
toku_commit_rollinclude (TXNID_PAIR xid,
uint64_t num_nodes,
BLOCKNUM spilled_head,
uint32_t spilled_head_hash,
BLOCKNUM spilled_tail,
uint32_t spilled_tail_hash,
TOKUTXN txn,
LSN oplsn) {
int r;
r = toku_apply_rollinclude(xid, num_nodes,
spilled_head, spilled_head_hash,
spilled_tail, spilled_tail_hash,
spilled_head,
spilled_tail,
txn, oplsn,
toku_commit_rollback_item);
return r;
......@@ -439,15 +431,13 @@ int
toku_rollback_rollinclude (TXNID_PAIR xid,
uint64_t num_nodes,
BLOCKNUM spilled_head,
uint32_t spilled_head_hash,
BLOCKNUM spilled_tail,
uint32_t spilled_tail_hash,
TOKUTXN txn,
LSN oplsn) {
int r;
r = toku_apply_rollinclude(xid, num_nodes,
spilled_head, spilled_head_hash,
spilled_tail, spilled_tail_hash,
spilled_head,
spilled_tail,
txn, oplsn,
toku_abort_rollback_item);
return r;
......
......@@ -143,17 +143,14 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
//printf("%s:%d abort\n", __FILE__, __LINE__);
BLOCKNUM next_log = ROLLBACK_NONE;
uint32_t next_log_hash = 0;
bool is_current = false;
if (txn_has_current_rollback_log(txn)) {
next_log = txn->roll_info.current_rollback;
next_log_hash = txn->roll_info.current_rollback_hash;
is_current = true;
}
else if (txn_has_spilled_rollback_logs(txn)) {
next_log = txn->roll_info.spilled_rollback_tail;
next_log_hash = txn->roll_info.spilled_rollback_tail_hash;
}
uint64_t last_sequence = txn->roll_info.num_rollback_nodes;
......@@ -161,7 +158,7 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
while (next_log.b != ROLLBACK_NONE.b) {
ROLLBACK_LOG_NODE log;
//pin log
toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
toku_get_and_pin_rollback_log(txn, next_log, &log);
toku_rollback_verify_contents(log, txn->txnid, last_sequence - 1);
toku_maybe_prefetch_previous_rollback_log(txn, log);
......@@ -180,23 +177,19 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
assert(log->sequence == 0);
}
next_log = log->previous;
next_log_hash = log->previous_hash;
{
//Clean up transaction structure to prevent
//toku_txn_close from double-freeing
if (is_current) {
txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0;
is_current = false;
}
else {
txn->roll_info.spilled_rollback_tail = next_log;
txn->roll_info.spilled_rollback_tail_hash = next_log_hash;
}
if (found_head) {
assert(next_log.b == ROLLBACK_NONE.b);
txn->roll_info.spilled_rollback_head = next_log;
txn->roll_info.spilled_rollback_head_hash = next_log_hash;
}
}
bool give_back = false;
......@@ -228,13 +221,11 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
num_nodes--; //Don't count the in-progress rollback log.
}
toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid, num_nodes,
txn->roll_info.spilled_rollback_head, txn->roll_info.spilled_rollback_head_hash,
txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash);
txn->roll_info.spilled_rollback_head,
txn->roll_info.spilled_rollback_tail);
//Remove ownership from child.
txn->roll_info.spilled_rollback_head = ROLLBACK_NONE;
txn->roll_info.spilled_rollback_head_hash = 0;
txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE;
txn->roll_info.spilled_rollback_tail_hash = 0;
}
// if we're commiting a child rollback, put its entries into the parent
// by pinning both child and parent and then linking the child log entry
......@@ -247,8 +238,7 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
//Pin child log
ROLLBACK_LOG_NODE child_log;
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback,
txn->roll_info.current_rollback_hash, &child_log);
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &child_log);
toku_rollback_verify_contents(child_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1);
// Append the list to the front of the parent.
......@@ -284,7 +274,6 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
toku_rollback_log_unpin_and_remove(txn, child_log);
}
txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0;
toku_maybe_spill_rollbacks(txn->parent, parent_log);
toku_rollback_log_unpin(txn->parent, parent_log);
......
......@@ -219,13 +219,13 @@ void toku_rollback_flush_callback (
}
}
int toku_rollback_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash,
int toku_rollback_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash UU(),
void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
int r;
FT CAST_FROM_VOIDP(h, extraargs);
assert(h->cf == cachefile);
ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h);
r = toku_deserialize_rollback_log_from(fd, logname, result, h);
if (r==0) {
(*result)->ct_pair = p;
*sizep = rollback_memory_size(*result);
......
......@@ -158,7 +158,7 @@ static void toku_rollback_node_save_ct_pair(CACHEKEY UU(key), void *value_data,
//
// initializes an empty rollback log node
// Does not touch the blocknum or hash, that is the
// Does not touch the blocknum, that is the
// responsibility of the caller
//
void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
......@@ -173,7 +173,6 @@ void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
log->dirty = true;
log->sequence = 0;
log->previous = make_blocknum(0);
log->previous_hash = 0;
log->oldest_logentry = NULL;
log->newest_logentry = NULL;
log->rollentry_arena = NULL;
......@@ -185,14 +184,12 @@ void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
static void rollback_initialize_for_txn(
ROLLBACK_LOG_NODE log,
TOKUTXN txn,
BLOCKNUM previous,
uint32_t previous_hash
BLOCKNUM previous
)
{
log->txnid = txn->txnid;
log->sequence = txn->roll_info.num_rollback_nodes++;
log->previous = previous;
log->previous_hash = previous_hash;
log->oldest_logentry = NULL;
log->newest_logentry = NULL;
log->rollentry_arena = memarena_create();
......@@ -206,12 +203,11 @@ void make_rollback_log_empty(ROLLBACK_LOG_NODE log) {
}
// create and pin a new rollback log node. chain it to the other rollback nodes
// by providing a previous blocknum/ hash and assigning the new rollback log
// by providing a previous blocknum and assigning the new rollback log
// node the next sequence number
static void rollback_log_create (
TOKUTXN txn,
BLOCKNUM previous,
uint32_t previous_hash,
ROLLBACK_LOG_NODE *result
)
{
......@@ -220,16 +216,15 @@ static void rollback_log_create (
CACHEFILE cf = txn->logger->rollback_cachefile;
FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
rollback_initialize_for_txn(log, txn, previous, previous_hash);
rollback_initialize_for_txn(log, txn, previous);
toku_allocate_blocknum(ft->blocktable, &log->blocknum, ft);
log->hash = toku_cachetable_hash(ft->cf, log->blocknum);
const uint32_t hash = toku_cachetable_hash(ft->cf, log->blocknum);
*result = log;
toku_cachetable_put(cf, log->blocknum, log->hash,
toku_cachetable_put(cf, log->blocknum, hash,
log, rollback_memory_size(log),
get_write_callbacks_for_rollback_log(ft),
toku_rollback_node_save_ct_pair);
txn->roll_info.current_rollback = log->blocknum;
txn->roll_info.current_rollback_hash = log->hash;
}
void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
......@@ -255,14 +250,11 @@ void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
if (!txn_has_spilled_rollback_logs(txn)) {
//First spilled. Copy to head.
txn->roll_info.spilled_rollback_head = txn->roll_info.current_rollback;
txn->roll_info.spilled_rollback_head_hash = txn->roll_info.current_rollback_hash;
}
//Unconditionally copy to tail. Old tail does not need to be cached anymore.
txn->roll_info.spilled_rollback_tail = txn->roll_info.current_rollback;
txn->roll_info.spilled_rollback_tail_hash = txn->roll_info.current_rollback_hash;
txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0;
}
}
......@@ -311,8 +303,8 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo
BLOCKNUM name = log->previous;
int r = 0;
if (name.b != ROLLBACK_NONE.b) {
uint32_t hash = log->previous_hash;
CACHEFILE cf = txn->logger->rollback_cachefile;
uint32_t hash = toku_cachetable_hash(cf, name);
FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
bool doing_prefetch = false;
r = toku_cachefile_prefetch(cf, name, hash,
......@@ -334,10 +326,11 @@ void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log,
assert(log->sequence == sequence);
}
void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash, ROLLBACK_LOG_NODE *log) {
void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log) {
void * value;
CACHEFILE cf = txn->logger->rollback_cachefile;
FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
uint32_t hash = toku_cachetable_hash(cf, blocknum);
int r = toku_cachetable_get_and_pin_with_dep_pairs(cf, blocknum, hash,
&value, NULL,
get_write_callbacks_for_rollback_log(h),
......@@ -351,7 +344,6 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash
assert(r == 0);
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
assert(pinned_log->blocknum.b == blocknum.b);
assert(pinned_log->hash == hash);
*log = pinned_log;
}
......@@ -359,7 +351,7 @@ void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE
ROLLBACK_LOG_NODE pinned_log = NULL;
invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions
if (txn_has_current_rollback_log(txn)) {
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, txn->roll_info.current_rollback_hash, &pinned_log);
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &pinned_log);
toku_rollback_verify_contents(pinned_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1);
} else {
// For each transaction, we try to acquire the first rollback log
......@@ -378,15 +370,13 @@ void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE
rollback_initialize_for_txn(
pinned_log,
txn,
txn->roll_info.spilled_rollback_tail,
txn->roll_info.spilled_rollback_tail_hash
txn->roll_info.spilled_rollback_tail
);
txn->roll_info.current_rollback = pinned_log->blocknum;
txn->roll_info.current_rollback_hash = pinned_log->hash;
}
}
if (pinned_log == NULL) {
rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash, &pinned_log);
rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, &pinned_log);
}
}
assert(pinned_log->txnid.parent_id64 == txn->txnid.parent_id64);
......
......@@ -103,8 +103,8 @@ void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t sta
// is a current rollback node to use, pin it, otherwise create one.
void toku_get_and_pin_rollback_log_for_new_entry(TOKUTXN txn, ROLLBACK_LOG_NODE *log);
// get a specific rollback by blocknum and hash
void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash, ROLLBACK_LOG_NODE *log);
// get a specific rollback by blocknum
void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log);
// unpin a rollback node from the cachetable
void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log);
......@@ -155,11 +155,9 @@ struct rollback_log_node {
// the sequence is between 0 and totalnodes-1
uint64_t sequence;
BLOCKNUM blocknum; // on which block does this node live?
uint32_t hash;
// which block number is the previous in the chain of rollback nodes
// that make up this rollback log?
BLOCKNUM previous;
uint32_t previous_hash;
struct roll_entry *oldest_logentry;
struct roll_entry *newest_logentry;
MEMARENA rollentry_arena;
......
......@@ -96,7 +96,6 @@ PATENT RIGHTS GRANT:
void rollback_log_node_cache::init (uint32_t max_num_avail_nodes) {
XMALLOC_N(max_num_avail_nodes, m_avail_blocknums);
XMALLOC_N(max_num_avail_nodes, m_hashes);
m_max_num_avail = max_num_avail_nodes;
m_first = 0;
m_num_avail = 0;
......@@ -110,7 +109,6 @@ void rollback_log_node_cache::init (uint32_t max_num_avail_nodes) {
void rollback_log_node_cache::destroy() {
toku_mutex_destroy(&m_mutex);
toku_free(m_avail_blocknums);
toku_free(m_hashes);
}
// returns true if rollback log node was successfully added,
......@@ -125,7 +123,6 @@ bool rollback_log_node_cache::give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_N
index -= m_max_num_avail;
}
m_avail_blocknums[index].b = log->blocknum.b;
m_hashes[index] = log->hash;
m_num_avail++;
}
toku_mutex_unlock(&m_mutex);
......@@ -144,11 +141,9 @@ bool rollback_log_node_cache::give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_N
// for getting a rollback log node
void rollback_log_node_cache::get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log){
BLOCKNUM b = ROLLBACK_NONE;
uint32_t hash;
toku_mutex_lock(&m_mutex);
if (m_num_avail > 0) {
b.b = m_avail_blocknums[m_first].b;
hash = m_hashes[m_first];
m_num_avail--;
if (++m_first >= m_max_num_avail) {
m_first = 0;
......@@ -156,7 +151,7 @@ void rollback_log_node_cache::get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NO
}
toku_mutex_unlock(&m_mutex);
if (b.b != ROLLBACK_NONE.b) {
toku_get_and_pin_rollback_log(txn, b, hash, log);
toku_get_and_pin_rollback_log(txn, b, log);
invariant(rollback_log_is_unused(*log));
} else {
*log = NULL;
......
......@@ -108,7 +108,6 @@ public:
private:
BLOCKNUM* m_avail_blocknums;
uint32_t* m_hashes;
uint32_t m_first;
uint32_t m_num_avail;
uint32_t m_max_num_avail;
......
......@@ -308,11 +308,8 @@ static void toku_txn_create_txn (
.num_rollentries_processed = 0,
.rollentry_raw_count = 0,
.spilled_rollback_head = ROLLBACK_NONE,
.spilled_rollback_head_hash = 0,
.spilled_rollback_tail = ROLLBACK_NONE,
.spilled_rollback_tail_hash = 0,
.current_rollback = ROLLBACK_NONE,
.current_rollback_hash = 0,
};
static txn_child_manager tcm;
......@@ -405,17 +402,9 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
txn->roll_info.num_rollback_nodes = info->num_rollback_nodes;
txn->roll_info.num_rollentries = info->num_rollentries;
CACHEFILE rollback_cachefile = txn->logger->rollback_cachefile;
txn->roll_info.spilled_rollback_head = info->spilled_rollback_head;
txn->roll_info.spilled_rollback_head_hash = toku_cachetable_hash(rollback_cachefile,
txn->roll_info.spilled_rollback_head);
txn->roll_info.spilled_rollback_tail = info->spilled_rollback_tail;
txn->roll_info.spilled_rollback_tail_hash = toku_cachetable_hash(rollback_cachefile,
txn->roll_info.spilled_rollback_tail);
txn->roll_info.current_rollback = info->current_rollback;
txn->roll_info.current_rollback_hash = toku_cachetable_hash(rollback_cachefile,
txn->roll_info.current_rollback);
return 0;
}
......
......@@ -739,6 +739,7 @@ if __name__ == '__main__':
default_recover_testnames = ['recover-test_stress1.tdb',
'recover-test_stress2.tdb',
'recover-test_stress3.tdb',
'recover-child-rollback.tdb',
'recover-test_stress_openclose.tdb']
build_group = OptionGroup(parser, 'Build Options', 'Control how the fractal tree and tests get built.')
build_group.add_option('--skip_build', action='store_false', dest='build', default=True,
......
......@@ -146,6 +146,7 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
recover-2483
recover-3113
recover-5146
recover-child-rollback
recover-compare-db
recover-compare-db-descriptor
recover-del-multiple
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
/*
COPYING CONDITIONS NOTICE:
This program is free software; you can redistribute it and/or modify
it under the terms of version 2 of the GNU General Public License as
published by the Free Software Foundation, and provided that the
following conditions are met:
* Redistributions of source code must retain this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below).
* Redistributions in binary form must reproduce this COPYING
CONDITIONS NOTICE, the COPYRIGHT NOTICE (below), the
DISCLAIMER (below), the UNIVERSITY PATENT NOTICE (below), the
PATENT MARKING NOTICE (below), and the PATENT RIGHTS
GRANT (below) in the documentation and/or other materials
provided with the distribution.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301, USA.
COPYRIGHT NOTICE:
TokuDB, Tokutek Fractal Tree Indexing Library.
Copyright (C) 2007-2013 Tokutek, Inc.
DISCLAIMER:
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
UNIVERSITY PATENT NOTICE:
The technology is licensed by the Massachusetts Institute of
Technology, Rutgers State University of New Jersey, and the Research
Foundation of State University of New York at Stony Brook under
United States of America Serial No. 11/760379 and to the patents
and/or patent applications resulting from it.
PATENT MARKING NOTICE:
This software is covered by US Patent No. 8,185,551.
This software is covered by US Patent No. 8,489,638.
PATENT RIGHTS GRANT:
"THIS IMPLEMENTATION" means the copyrightable works distributed by
Tokutek as part of the Fractal Tree project.
"PATENT CLAIMS" means the claims of patents that are owned or
licensable by Tokutek, both currently or in the future; and that in
the absence of this license would be infringed by THIS
IMPLEMENTATION or by using or running THIS IMPLEMENTATION.
"PATENT CHALLENGE" shall mean a challenge to the validity,
patentability, enforceability and/or non-infringement of any of the
PATENT CLAIMS or otherwise opposing any of the PATENT CLAIMS.
Tokutek hereby grants to you, for the term and geographical scope of
the PATENT CLAIMS, a non-exclusive, no-charge, royalty-free,
irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, transfer, and
otherwise run, modify, and propagate the contents of THIS
IMPLEMENTATION, where such license applies only to the PATENT
CLAIMS. This grant does not include claims that would be infringed
only as a consequence of further modifications of THIS
IMPLEMENTATION. If you or your agent or licensee institute or order
or agree to the institution of patent litigation against any entity
(including a cross-claim or counterclaim in a lawsuit) alleging that
THIS IMPLEMENTATION constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any rights
granted to you under this License shall terminate as of the date
such litigation is filed. If you or your agent or exclusive
licensee institute or order or agree to the institution of a PATENT
CHALLENGE, then Tokutek may terminate any rights granted to you
under this License.
*/
#ident "Copyright (c) 2007-2013 Tokutek Inc. All rights reserved."
#ident "$Id$"
#include "test.h"
#include <stdio.h>
#include <stdlib.h>
#include <toku_pthread.h>
#include <unistd.h>
#include <memory.h>
#include <sys/stat.h>
#include <db.h>
#include "threaded_stress_test_helpers.h"
//
// This test is a form of stress that does operations on a single dictionary:
// We create a dictionary bigger than the cachetable (around 4x greater).
// Then, we spawn a bunch of pthreads that do the following:
// - scan dictionary forward with bulk fetch
// - scan dictionary forward slowly
// - scan dictionary backward with bulk fetch
// - scan dictionary backward slowly
// - Grow the dictionary with insertions
// - do random point queries into the dictionary
// With the small cachetable, this should produce quite a bit of churn in reading in and evicting nodes.
// If the test runs to completion without crashing, we consider it a success. It also tests that snapshots
// work correctly by verifying that table scans sum their vals to 0.
//
// This does NOT test:
// - splits and merges
// - multiple DBs
//
// Variables that are interesting to tweak and run:
// - small cachetable
// - number of elements
//
static void
stress_table(DB_ENV *env, DB **dbp, struct cli_args *cli_args) {
//
// the threads that we want:
// - one (or more) thread(s) constantly updating random values, wrapped in a persistent parent transaction.
if (verbose) printf("starting creation of pthreads\n");
const int num_threads = cli_args->num_update_threads;
struct arg myargs[num_threads];
for (int i = 0; i < num_threads; i++) {
arg_init(&myargs[i], dbp, env, cli_args);
}
struct update_op_args uoe = get_update_op_args(cli_args, NULL);
// make the guy that updates the db
for (int i = 0; i < cli_args->num_update_threads; ++i) {
myargs[i].operation_extra = &uoe;
myargs[i].operation = update_op;
myargs[i].do_prepare = true;
myargs[i].wrap_in_parent = true;
}
run_workers(myargs, num_threads, cli_args->num_seconds, true, cli_args);
}
int
test_main(int argc, char *const argv[]) {
struct cli_args args = get_default_args();
args.num_seconds = 5;
//args.txn_size = 64; // 100 * 256 is more than enough to spill (4096) byte rollback nodes for parent and child.
//args.val_size = 512; // Large values to overflow a rollback log node fast.
//args.env_args.node_size = 4*1024*1024; // Large nodes to prevent spending much time
//args.env_args.basement_node_size = 128*1024; // Large nodes to prevent spending much time
args.env_args.checkpointing_period = 1;
parse_stress_test_args(argc, argv, &args);
if (args.do_test_and_crash) {
stress_test_main(&args);
}
if (args.do_recover) {
stress_recover(&args);
}
return 0;
}
......@@ -232,6 +232,7 @@ struct arg {
bool do_prepare;
bool prelock_updates;
bool track_thread_performance;
bool wrap_in_parent;
};
static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cli_args) {
......@@ -246,6 +247,7 @@ static void arg_init(struct arg *arg, DB **dbp, DB_ENV *env, struct cli_args *cl
arg->do_prepare = false;
arg->prelock_updates = false;
arg->track_thread_performance = true;
arg->wrap_in_parent = false;
}
enum operation_type {
......@@ -568,6 +570,7 @@ static void *worker(void *arg_v) {
arg->random_data = &random_data;
DB_ENV *env = arg->env;
DB_TXN *txn = nullptr;
DB_TXN *ptxn = nullptr;
if (verbose) {
toku_pthread_t self = toku_pthread_self();
uintptr_t intself = (uintptr_t) self;
......@@ -575,11 +578,13 @@ static void *worker(void *arg_v) {
}
if (arg->cli->single_txn) {
r = env->txn_begin(env, 0, &txn, arg->txn_flags); CKERR(r);
} else if (arg->wrap_in_parent) {
r = env->txn_begin(env, 0, &ptxn, arg->txn_flags); CKERR(r);
}
while (run_test) {
lock_worker_op(we);
if (!arg->cli->single_txn) {
r = env->txn_begin(env, 0, &txn, arg->txn_flags); CKERR(r);
r = env->txn_begin(env, ptxn, &txn, arg->txn_flags); CKERR(r);
}
r = arg->operation(txn, arg, arg->operation_extra, we->counters);
if (r==0 && !arg->cli->single_txn && arg->do_prepare) {
......@@ -616,6 +621,9 @@ static void *worker(void *arg_v) {
if (arg->cli->single_txn) {
int flags = get_commit_flags(arg->cli);
int chk_r = txn->commit(txn, flags); CKERR(chk_r);
} else if (arg->wrap_in_parent) {
int flags = get_commit_flags(arg->cli);
int chk_r = ptxn->commit(ptxn, flags); CKERR(chk_r);
}
if (verbose) {
toku_pthread_t self = toku_pthread_self();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment