Commit 3d6a00ea authored by Yoni Fogel's avatar Yoni Fogel Committed by Leif Walsh

Refs Tokutek/mongo#650 Remove fullhash from rollback_log_nodes; fixes issue...

Refs Tokutek/mongo#650 Remove fullhash from rollback_log_nodes; fixes issue with large (or childful) transactions during recovery due to hash_id and filenum being decoupled
parent b9452de1
...@@ -632,7 +632,7 @@ int toku_serialize_ftnode_to(int fd, BLOCKNUM, FTNODE node, FTNODE_DISK_DATA* nd ...@@ -632,7 +632,7 @@ int toku_serialize_ftnode_to(int fd, BLOCKNUM, FTNODE node, FTNODE_DISK_DATA* nd
int toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized_log, bool is_serialized, int toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized_log, bool is_serialized,
FT h, bool for_checkpoint); FT h, bool for_checkpoint);
void toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized); void toku_serialize_rollback_log_to_memory_uncompressed(ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBACK_LOG_NODE serialized);
int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash, ROLLBACK_LOG_NODE *logp, FT h); int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *logp, FT h);
int toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, struct ftnode_fetch_extra* bfe); int toku_deserialize_bp_from_disk(FTNODE node, FTNODE_DISK_DATA ndd, int childnum, int fd, struct ftnode_fetch_extra* bfe);
int toku_deserialize_bp_from_compressed(FTNODE node, int childnum, struct ftnode_fetch_extra *bfe); int toku_deserialize_bp_from_compressed(FTNODE node, int childnum, struct ftnode_fetch_extra *bfe);
int toku_deserialize_ftnode_from (int fd, BLOCKNUM off, uint32_t /*fullhash*/, FTNODE *ftnode, FTNODE_DISK_DATA* ndd, struct ftnode_fetch_extra* bfe); int toku_deserialize_ftnode_from (int fd, BLOCKNUM off, uint32_t /*fullhash*/, FTNODE *ftnode, FTNODE_DISK_DATA* ndd, struct ftnode_fetch_extra* bfe);
......
...@@ -118,6 +118,7 @@ enum ft_layout_version_e { ...@@ -118,6 +118,7 @@ enum ft_layout_version_e {
FT_LAYOUT_VERSION_22 = 22, // Ming: Add oldest known referenced xid to each ftnode, for better garbage collection FT_LAYOUT_VERSION_22 = 22, // Ming: Add oldest known referenced xid to each ftnode, for better garbage collection
FT_LAYOUT_VERSION_23 = 23, // Ming: Fix upgrade path #5902 FT_LAYOUT_VERSION_23 = 23, // Ming: Fix upgrade path #5902
FT_LAYOUT_VERSION_24 = 24, // Riddler: change logentries that log transactions to store TXNID_PAIRs instead of TXNIDs FT_LAYOUT_VERSION_24 = 24, // Riddler: change logentries that log transactions to store TXNID_PAIRs instead of TXNIDs
FT_LAYOUT_VERSION_25 = 25, // SecretSquirrel: Remove fullhashes from rollback_log_nodes
FT_NEXT_VERSION, // the version after the current version FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line. FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
......
...@@ -2763,8 +2763,7 @@ toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBA ...@@ -2763,8 +2763,7 @@ toku_serialize_rollback_log_to (int fd, ROLLBACK_LOG_NODE log, SERIALIZED_ROLLBA
} }
static int static int
deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, uint32_t fullhash, ROLLBACK_LOG_NODE *log_p, deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log_p, struct rbuf *rb) {
FT h, struct rbuf *rb) {
ROLLBACK_LOG_NODE MALLOC(result); ROLLBACK_LOG_NODE MALLOC(result);
int r; int r;
if (result==NULL) { if (result==NULL) {
...@@ -2793,13 +2792,7 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, uint32_t fullhash, ROLLBA ...@@ -2793,13 +2792,7 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, uint32_t fullhash, ROLLBA
r = toku_db_badformat(); r = toku_db_badformat();
goto died0; goto died0;
} }
result->hash = toku_cachetable_hash(h->cf, result->blocknum);
if (result->hash != fullhash) {
r = toku_db_badformat();
goto died0;
}
result->previous = rbuf_blocknum(rb); result->previous = rbuf_blocknum(rb);
result->previous_hash = toku_cachetable_hash(h->cf, result->previous);
result->rollentry_resident_bytecount = rbuf_ulonglong(rb); result->rollentry_resident_bytecount = rbuf_ulonglong(rb);
size_t arena_initial_size = rbuf_ulonglong(rb); size_t arena_initial_size = rbuf_ulonglong(rb);
...@@ -2840,13 +2833,13 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, uint32_t fullhash, ROLLBA ...@@ -2840,13 +2833,13 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, uint32_t fullhash, ROLLBA
} }
static int static int
deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknum, uint32_t fullhash, deserialize_rollback_log_from_rbuf_versioned (uint32_t version, BLOCKNUM blocknum,
ROLLBACK_LOG_NODE *log, ROLLBACK_LOG_NODE *log,
FT h, struct rbuf *rb) { struct rbuf *rb) {
int r = 0; int r = 0;
ROLLBACK_LOG_NODE rollback_log_node = NULL; ROLLBACK_LOG_NODE rollback_log_node = NULL;
invariant(version==FT_LAYOUT_VERSION); //Rollback log nodes do not survive version changes. invariant(version==FT_LAYOUT_VERSION); //Rollback log nodes do not survive version changes.
r = deserialize_rollback_log_from_rbuf(blocknum, fullhash, &rollback_log_node, h, rb); r = deserialize_rollback_log_from_rbuf(blocknum, &rollback_log_node, rb);
if (r==0) { if (r==0) {
*log = rollback_log_node; *log = rollback_log_node;
} }
...@@ -3022,8 +3015,7 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum, ...@@ -3022,8 +3015,7 @@ read_and_decompress_block_from_fd_into_rbuf(int fd, BLOCKNUM blocknum,
// Read rollback log node from file into struct. Perform version upgrade if necessary. // Read rollback log node from file into struct. Perform version upgrade if necessary.
int int
toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash, toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *logp, FT h) {
ROLLBACK_LOG_NODE *logp, FT h) {
int layout_version = 0; int layout_version = 0;
int r; int r;
struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0}; struct rbuf rb = {.buf = NULL, .size = 0, .ndone = 0};
...@@ -3037,7 +3029,6 @@ toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash ...@@ -3037,7 +3029,6 @@ toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash
ROLLBACK_LOG_NODE XMALLOC(log); ROLLBACK_LOG_NODE XMALLOC(log);
rollback_empty_log_init(log); rollback_empty_log_init(log);
log->blocknum.b = blocknum.b; log->blocknum.b = blocknum.b;
log->hash = fullhash;
r = 0; r = 0;
*logp = log; *logp = log;
goto cleanup; goto cleanup;
...@@ -3054,7 +3045,7 @@ toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash ...@@ -3054,7 +3045,7 @@ toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, uint32_t fullhash
} }
} }
r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, fullhash, logp, h, &rb); r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, logp, &rb);
cleanup: cleanup:
if (rb.buf) toku_free(rb.buf); if (rb.buf) toku_free(rb.buf);
......
...@@ -209,15 +209,12 @@ struct txn_roll_info { ...@@ -209,15 +209,12 @@ struct txn_roll_info {
// the spilled rollback head is the block number of the first rollback node // the spilled rollback head is the block number of the first rollback node
// that makes up the rollback log chain // that makes up the rollback log chain
BLOCKNUM spilled_rollback_head; BLOCKNUM spilled_rollback_head;
uint32_t spilled_rollback_head_hash;
// the spilled rollback is the block number of the last rollback node that // the spilled rollback is the block number of the last rollback node that
// makes up the rollback log chain. // makes up the rollback log chain.
BLOCKNUM spilled_rollback_tail; BLOCKNUM spilled_rollback_tail;
uint32_t spilled_rollback_tail_hash;
// the current rollback node block number we may use. if this is ROLLBACK_NONE, // the current rollback node block number we may use. if this is ROLLBACK_NONE,
// then we need to create one and set it here before using it. // then we need to create one and set it here before using it.
BLOCKNUM current_rollback; BLOCKNUM current_rollback;
uint32_t current_rollback_hash;
}; };
struct tokutxn { struct tokutxn {
......
...@@ -155,9 +155,7 @@ const struct logtype rollbacks[] = { ...@@ -155,9 +155,7 @@ const struct logtype rollbacks[] = {
{"rollinclude", 'r', FA{{"TXNID_PAIR", "xid", 0}, {"rollinclude", 'r', FA{{"TXNID_PAIR", "xid", 0},
{"uint64_t", "num_nodes", 0}, {"uint64_t", "num_nodes", 0},
{"BLOCKNUM", "spilled_head", 0}, {"BLOCKNUM", "spilled_head", 0},
{"uint32_t", "spilled_head_hash", 0},
{"BLOCKNUM", "spilled_tail", 0}, {"BLOCKNUM", "spilled_tail", 0},
{"uint32_t", "spilled_tail_hash", 0},
NULLFIELD}, LOG_BEGIN_ACTION_NA}, NULLFIELD}, LOG_BEGIN_ACTION_NA},
{"load", 'l', FA{{"FILENUM", "old_filenum", 0}, {"load", 'l', FA{{"FILENUM", "old_filenum", 0},
{"BYTESTRING", "new_iname", 0}, {"BYTESTRING", "new_iname", 0},
......
...@@ -365,9 +365,7 @@ static int ...@@ -365,9 +365,7 @@ static int
toku_apply_rollinclude (TXNID_PAIR xid, toku_apply_rollinclude (TXNID_PAIR xid,
uint64_t num_nodes, uint64_t num_nodes,
BLOCKNUM spilled_head, BLOCKNUM spilled_head,
uint32_t spilled_head_hash __attribute__((__unused__)),
BLOCKNUM spilled_tail, BLOCKNUM spilled_tail,
uint32_t spilled_tail_hash,
TOKUTXN txn, TOKUTXN txn,
LSN oplsn, LSN oplsn,
apply_rollback_item func) { apply_rollback_item func) {
...@@ -375,7 +373,6 @@ toku_apply_rollinclude (TXNID_PAIR xid, ...@@ -375,7 +373,6 @@ toku_apply_rollinclude (TXNID_PAIR xid,
struct roll_entry *item; struct roll_entry *item;
BLOCKNUM next_log = spilled_tail; BLOCKNUM next_log = spilled_tail;
uint32_t next_log_hash = spilled_tail_hash;
uint64_t last_sequence = num_nodes; uint64_t last_sequence = num_nodes;
bool found_head = false; bool found_head = false;
...@@ -383,7 +380,7 @@ toku_apply_rollinclude (TXNID_PAIR xid, ...@@ -383,7 +380,7 @@ toku_apply_rollinclude (TXNID_PAIR xid,
while (next_log.b != ROLLBACK_NONE.b) { while (next_log.b != ROLLBACK_NONE.b) {
//pin log //pin log
ROLLBACK_LOG_NODE log; ROLLBACK_LOG_NODE log;
toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log); toku_get_and_pin_rollback_log(txn, next_log, &log);
toku_rollback_verify_contents(log, xid, last_sequence - 1); toku_rollback_verify_contents(log, xid, last_sequence - 1);
last_sequence = log->sequence; last_sequence = log->sequence;
...@@ -400,16 +397,13 @@ toku_apply_rollinclude (TXNID_PAIR xid, ...@@ -400,16 +397,13 @@ toku_apply_rollinclude (TXNID_PAIR xid,
assert(log->sequence == 0); assert(log->sequence == 0);
} }
next_log = log->previous; next_log = log->previous;
next_log_hash = log->previous_hash;
{ {
//Clean up transaction structure to prevent //Clean up transaction structure to prevent
//toku_txn_close from double-freeing //toku_txn_close from double-freeing
spilled_tail = next_log; spilled_tail = next_log;
spilled_tail_hash = next_log_hash;
if (found_head) { if (found_head) {
assert(next_log.b == ROLLBACK_NONE.b); assert(next_log.b == ROLLBACK_NONE.b);
spilled_head = next_log; spilled_head = next_log;
spilled_head_hash = next_log_hash;
} }
} }
toku_rollback_log_unpin_and_remove(txn, log); toku_rollback_log_unpin_and_remove(txn, log);
...@@ -421,15 +415,13 @@ int ...@@ -421,15 +415,13 @@ int
toku_commit_rollinclude (TXNID_PAIR xid, toku_commit_rollinclude (TXNID_PAIR xid,
uint64_t num_nodes, uint64_t num_nodes,
BLOCKNUM spilled_head, BLOCKNUM spilled_head,
uint32_t spilled_head_hash,
BLOCKNUM spilled_tail, BLOCKNUM spilled_tail,
uint32_t spilled_tail_hash,
TOKUTXN txn, TOKUTXN txn,
LSN oplsn) { LSN oplsn) {
int r; int r;
r = toku_apply_rollinclude(xid, num_nodes, r = toku_apply_rollinclude(xid, num_nodes,
spilled_head, spilled_head_hash, spilled_head,
spilled_tail, spilled_tail_hash, spilled_tail,
txn, oplsn, txn, oplsn,
toku_commit_rollback_item); toku_commit_rollback_item);
return r; return r;
...@@ -439,15 +431,13 @@ int ...@@ -439,15 +431,13 @@ int
toku_rollback_rollinclude (TXNID_PAIR xid, toku_rollback_rollinclude (TXNID_PAIR xid,
uint64_t num_nodes, uint64_t num_nodes,
BLOCKNUM spilled_head, BLOCKNUM spilled_head,
uint32_t spilled_head_hash,
BLOCKNUM spilled_tail, BLOCKNUM spilled_tail,
uint32_t spilled_tail_hash,
TOKUTXN txn, TOKUTXN txn,
LSN oplsn) { LSN oplsn) {
int r; int r;
r = toku_apply_rollinclude(xid, num_nodes, r = toku_apply_rollinclude(xid, num_nodes,
spilled_head, spilled_head_hash, spilled_head,
spilled_tail, spilled_tail_hash, spilled_tail,
txn, oplsn, txn, oplsn,
toku_abort_rollback_item); toku_abort_rollback_item);
return r; return r;
......
...@@ -143,17 +143,14 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) { ...@@ -143,17 +143,14 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
//printf("%s:%d abort\n", __FILE__, __LINE__); //printf("%s:%d abort\n", __FILE__, __LINE__);
BLOCKNUM next_log = ROLLBACK_NONE; BLOCKNUM next_log = ROLLBACK_NONE;
uint32_t next_log_hash = 0;
bool is_current = false; bool is_current = false;
if (txn_has_current_rollback_log(txn)) { if (txn_has_current_rollback_log(txn)) {
next_log = txn->roll_info.current_rollback; next_log = txn->roll_info.current_rollback;
next_log_hash = txn->roll_info.current_rollback_hash;
is_current = true; is_current = true;
} }
else if (txn_has_spilled_rollback_logs(txn)) { else if (txn_has_spilled_rollback_logs(txn)) {
next_log = txn->roll_info.spilled_rollback_tail; next_log = txn->roll_info.spilled_rollback_tail;
next_log_hash = txn->roll_info.spilled_rollback_tail_hash;
} }
uint64_t last_sequence = txn->roll_info.num_rollback_nodes; uint64_t last_sequence = txn->roll_info.num_rollback_nodes;
...@@ -161,7 +158,7 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) { ...@@ -161,7 +158,7 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
while (next_log.b != ROLLBACK_NONE.b) { while (next_log.b != ROLLBACK_NONE.b) {
ROLLBACK_LOG_NODE log; ROLLBACK_LOG_NODE log;
//pin log //pin log
toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log); toku_get_and_pin_rollback_log(txn, next_log, &log);
toku_rollback_verify_contents(log, txn->txnid, last_sequence - 1); toku_rollback_verify_contents(log, txn->txnid, last_sequence - 1);
toku_maybe_prefetch_previous_rollback_log(txn, log); toku_maybe_prefetch_previous_rollback_log(txn, log);
...@@ -180,23 +177,19 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) { ...@@ -180,23 +177,19 @@ apply_txn(TOKUTXN txn, LSN lsn, apply_rollback_item func) {
assert(log->sequence == 0); assert(log->sequence == 0);
} }
next_log = log->previous; next_log = log->previous;
next_log_hash = log->previous_hash;
{ {
//Clean up transaction structure to prevent //Clean up transaction structure to prevent
//toku_txn_close from double-freeing //toku_txn_close from double-freeing
if (is_current) { if (is_current) {
txn->roll_info.current_rollback = ROLLBACK_NONE; txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0;
is_current = false; is_current = false;
} }
else { else {
txn->roll_info.spilled_rollback_tail = next_log; txn->roll_info.spilled_rollback_tail = next_log;
txn->roll_info.spilled_rollback_tail_hash = next_log_hash;
} }
if (found_head) { if (found_head) {
assert(next_log.b == ROLLBACK_NONE.b); assert(next_log.b == ROLLBACK_NONE.b);
txn->roll_info.spilled_rollback_head = next_log; txn->roll_info.spilled_rollback_head = next_log;
txn->roll_info.spilled_rollback_head_hash = next_log_hash;
} }
} }
bool give_back = false; bool give_back = false;
...@@ -228,13 +221,11 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) { ...@@ -228,13 +221,11 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
num_nodes--; //Don't count the in-progress rollback log. num_nodes--; //Don't count the in-progress rollback log.
} }
toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid, num_nodes, toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid, num_nodes,
txn->roll_info.spilled_rollback_head, txn->roll_info.spilled_rollback_head_hash, txn->roll_info.spilled_rollback_head,
txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash); txn->roll_info.spilled_rollback_tail);
//Remove ownership from child. //Remove ownership from child.
txn->roll_info.spilled_rollback_head = ROLLBACK_NONE; txn->roll_info.spilled_rollback_head = ROLLBACK_NONE;
txn->roll_info.spilled_rollback_head_hash = 0;
txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE; txn->roll_info.spilled_rollback_tail = ROLLBACK_NONE;
txn->roll_info.spilled_rollback_tail_hash = 0;
} }
// if we're commiting a child rollback, put its entries into the parent // if we're commiting a child rollback, put its entries into the parent
// by pinning both child and parent and then linking the child log entry // by pinning both child and parent and then linking the child log entry
...@@ -247,8 +238,7 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) { ...@@ -247,8 +238,7 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
//Pin child log //Pin child log
ROLLBACK_LOG_NODE child_log; ROLLBACK_LOG_NODE child_log;
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &child_log);
txn->roll_info.current_rollback_hash, &child_log);
toku_rollback_verify_contents(child_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1); toku_rollback_verify_contents(child_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1);
// Append the list to the front of the parent. // Append the list to the front of the parent.
...@@ -284,7 +274,6 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) { ...@@ -284,7 +274,6 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
toku_rollback_log_unpin_and_remove(txn, child_log); toku_rollback_log_unpin_and_remove(txn, child_log);
} }
txn->roll_info.current_rollback = ROLLBACK_NONE; txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0;
toku_maybe_spill_rollbacks(txn->parent, parent_log); toku_maybe_spill_rollbacks(txn->parent, parent_log);
toku_rollback_log_unpin(txn->parent, parent_log); toku_rollback_log_unpin(txn->parent, parent_log);
......
...@@ -219,13 +219,13 @@ void toku_rollback_flush_callback ( ...@@ -219,13 +219,13 @@ void toku_rollback_flush_callback (
} }
} }
int toku_rollback_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash, int toku_rollback_fetch_callback (CACHEFILE cachefile, PAIR p, int fd, BLOCKNUM logname, uint32_t fullhash UU(),
void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) { void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
int r; int r;
FT CAST_FROM_VOIDP(h, extraargs); FT CAST_FROM_VOIDP(h, extraargs);
assert(h->cf == cachefile); assert(h->cf == cachefile);
ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv; ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h); r = toku_deserialize_rollback_log_from(fd, logname, result, h);
if (r==0) { if (r==0) {
(*result)->ct_pair = p; (*result)->ct_pair = p;
*sizep = rollback_memory_size(*result); *sizep = rollback_memory_size(*result);
......
...@@ -158,7 +158,7 @@ static void toku_rollback_node_save_ct_pair(CACHEKEY UU(key), void *value_data, ...@@ -158,7 +158,7 @@ static void toku_rollback_node_save_ct_pair(CACHEKEY UU(key), void *value_data,
// //
// initializes an empty rollback log node // initializes an empty rollback log node
// Does not touch the blocknum or hash, that is the // Does not touch the blocknum, that is the
// responsibility of the caller // responsibility of the caller
// //
void rollback_empty_log_init(ROLLBACK_LOG_NODE log) { void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
...@@ -173,7 +173,6 @@ void rollback_empty_log_init(ROLLBACK_LOG_NODE log) { ...@@ -173,7 +173,6 @@ void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
log->dirty = true; log->dirty = true;
log->sequence = 0; log->sequence = 0;
log->previous = make_blocknum(0); log->previous = make_blocknum(0);
log->previous_hash = 0;
log->oldest_logentry = NULL; log->oldest_logentry = NULL;
log->newest_logentry = NULL; log->newest_logentry = NULL;
log->rollentry_arena = NULL; log->rollentry_arena = NULL;
...@@ -185,14 +184,12 @@ void rollback_empty_log_init(ROLLBACK_LOG_NODE log) { ...@@ -185,14 +184,12 @@ void rollback_empty_log_init(ROLLBACK_LOG_NODE log) {
static void rollback_initialize_for_txn( static void rollback_initialize_for_txn(
ROLLBACK_LOG_NODE log, ROLLBACK_LOG_NODE log,
TOKUTXN txn, TOKUTXN txn,
BLOCKNUM previous, BLOCKNUM previous
uint32_t previous_hash
) )
{ {
log->txnid = txn->txnid; log->txnid = txn->txnid;
log->sequence = txn->roll_info.num_rollback_nodes++; log->sequence = txn->roll_info.num_rollback_nodes++;
log->previous = previous; log->previous = previous;
log->previous_hash = previous_hash;
log->oldest_logentry = NULL; log->oldest_logentry = NULL;
log->newest_logentry = NULL; log->newest_logentry = NULL;
log->rollentry_arena = memarena_create(); log->rollentry_arena = memarena_create();
...@@ -206,12 +203,11 @@ void make_rollback_log_empty(ROLLBACK_LOG_NODE log) { ...@@ -206,12 +203,11 @@ void make_rollback_log_empty(ROLLBACK_LOG_NODE log) {
} }
// create and pin a new rollback log node. chain it to the other rollback nodes // create and pin a new rollback log node. chain it to the other rollback nodes
// by providing a previous blocknum/ hash and assigning the new rollback log // by providing a previous blocknum and assigning the new rollback log
// node the next sequence number // node the next sequence number
static void rollback_log_create ( static void rollback_log_create (
TOKUTXN txn, TOKUTXN txn,
BLOCKNUM previous, BLOCKNUM previous,
uint32_t previous_hash,
ROLLBACK_LOG_NODE *result ROLLBACK_LOG_NODE *result
) )
{ {
...@@ -220,16 +216,15 @@ static void rollback_log_create ( ...@@ -220,16 +216,15 @@ static void rollback_log_create (
CACHEFILE cf = txn->logger->rollback_cachefile; CACHEFILE cf = txn->logger->rollback_cachefile;
FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf)); FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
rollback_initialize_for_txn(log, txn, previous, previous_hash); rollback_initialize_for_txn(log, txn, previous);
toku_allocate_blocknum(ft->blocktable, &log->blocknum, ft); toku_allocate_blocknum(ft->blocktable, &log->blocknum, ft);
log->hash = toku_cachetable_hash(ft->cf, log->blocknum); const uint32_t hash = toku_cachetable_hash(ft->cf, log->blocknum);
*result = log; *result = log;
toku_cachetable_put(cf, log->blocknum, log->hash, toku_cachetable_put(cf, log->blocknum, hash,
log, rollback_memory_size(log), log, rollback_memory_size(log),
get_write_callbacks_for_rollback_log(ft), get_write_callbacks_for_rollback_log(ft),
toku_rollback_node_save_ct_pair); toku_rollback_node_save_ct_pair);
txn->roll_info.current_rollback = log->blocknum; txn->roll_info.current_rollback = log->blocknum;
txn->roll_info.current_rollback_hash = log->hash;
} }
void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) { void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
...@@ -255,14 +250,11 @@ void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) { ...@@ -255,14 +250,11 @@ void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
if (!txn_has_spilled_rollback_logs(txn)) { if (!txn_has_spilled_rollback_logs(txn)) {
//First spilled. Copy to head. //First spilled. Copy to head.
txn->roll_info.spilled_rollback_head = txn->roll_info.current_rollback; txn->roll_info.spilled_rollback_head = txn->roll_info.current_rollback;
txn->roll_info.spilled_rollback_head_hash = txn->roll_info.current_rollback_hash;
} }
//Unconditionally copy to tail. Old tail does not need to be cached anymore. //Unconditionally copy to tail. Old tail does not need to be cached anymore.
txn->roll_info.spilled_rollback_tail = txn->roll_info.current_rollback; txn->roll_info.spilled_rollback_tail = txn->roll_info.current_rollback;
txn->roll_info.spilled_rollback_tail_hash = txn->roll_info.current_rollback_hash;
txn->roll_info.current_rollback = ROLLBACK_NONE; txn->roll_info.current_rollback = ROLLBACK_NONE;
txn->roll_info.current_rollback_hash = 0;
} }
} }
...@@ -311,8 +303,8 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo ...@@ -311,8 +303,8 @@ void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE lo
BLOCKNUM name = log->previous; BLOCKNUM name = log->previous;
int r = 0; int r = 0;
if (name.b != ROLLBACK_NONE.b) { if (name.b != ROLLBACK_NONE.b) {
uint32_t hash = log->previous_hash;
CACHEFILE cf = txn->logger->rollback_cachefile; CACHEFILE cf = txn->logger->rollback_cachefile;
uint32_t hash = toku_cachetable_hash(cf, name);
FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf)); FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
bool doing_prefetch = false; bool doing_prefetch = false;
r = toku_cachefile_prefetch(cf, name, hash, r = toku_cachefile_prefetch(cf, name, hash,
...@@ -334,10 +326,11 @@ void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log, ...@@ -334,10 +326,11 @@ void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log,
assert(log->sequence == sequence); assert(log->sequence == sequence);
} }
void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash, ROLLBACK_LOG_NODE *log) { void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log) {
void * value; void * value;
CACHEFILE cf = txn->logger->rollback_cachefile; CACHEFILE cf = txn->logger->rollback_cachefile;
FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf)); FT CAST_FROM_VOIDP(h, toku_cachefile_get_userdata(cf));
uint32_t hash = toku_cachetable_hash(cf, blocknum);
int r = toku_cachetable_get_and_pin_with_dep_pairs(cf, blocknum, hash, int r = toku_cachetable_get_and_pin_with_dep_pairs(cf, blocknum, hash,
&value, NULL, &value, NULL,
get_write_callbacks_for_rollback_log(h), get_write_callbacks_for_rollback_log(h),
...@@ -351,7 +344,6 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash ...@@ -351,7 +344,6 @@ void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash
assert(r == 0); assert(r == 0);
ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value); ROLLBACK_LOG_NODE CAST_FROM_VOIDP(pinned_log, value);
assert(pinned_log->blocknum.b == blocknum.b); assert(pinned_log->blocknum.b == blocknum.b);
assert(pinned_log->hash == hash);
*log = pinned_log; *log = pinned_log;
} }
...@@ -359,7 +351,7 @@ void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE ...@@ -359,7 +351,7 @@ void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE
ROLLBACK_LOG_NODE pinned_log = NULL; ROLLBACK_LOG_NODE pinned_log = NULL;
invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions invariant(txn->state == TOKUTXN_LIVE || txn->state == TOKUTXN_PREPARING); // hot indexing may call this function for prepared transactions
if (txn_has_current_rollback_log(txn)) { if (txn_has_current_rollback_log(txn)) {
toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, txn->roll_info.current_rollback_hash, &pinned_log); toku_get_and_pin_rollback_log(txn, txn->roll_info.current_rollback, &pinned_log);
toku_rollback_verify_contents(pinned_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1); toku_rollback_verify_contents(pinned_log, txn->txnid, txn->roll_info.num_rollback_nodes - 1);
} else { } else {
// For each transaction, we try to acquire the first rollback log // For each transaction, we try to acquire the first rollback log
...@@ -378,15 +370,13 @@ void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE ...@@ -378,15 +370,13 @@ void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE
rollback_initialize_for_txn( rollback_initialize_for_txn(
pinned_log, pinned_log,
txn, txn,
txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail
txn->roll_info.spilled_rollback_tail_hash
); );
txn->roll_info.current_rollback = pinned_log->blocknum; txn->roll_info.current_rollback = pinned_log->blocknum;
txn->roll_info.current_rollback_hash = pinned_log->hash;
} }
} }
if (pinned_log == NULL) { if (pinned_log == NULL) {
rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, txn->roll_info.spilled_rollback_tail_hash, &pinned_log); rollback_log_create(txn, txn->roll_info.spilled_rollback_tail, &pinned_log);
} }
} }
assert(pinned_log->txnid.parent_id64 == txn->txnid.parent_id64); assert(pinned_log->txnid.parent_id64 == txn->txnid.parent_id64);
......
...@@ -103,8 +103,8 @@ void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t sta ...@@ -103,8 +103,8 @@ void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t sta
// is a current rollback node to use, pin it, otherwise create one. // is a current rollback node to use, pin it, otherwise create one.
void toku_get_and_pin_rollback_log_for_new_entry(TOKUTXN txn, ROLLBACK_LOG_NODE *log); void toku_get_and_pin_rollback_log_for_new_entry(TOKUTXN txn, ROLLBACK_LOG_NODE *log);
// get a specific rollback by blocknum and hash // get a specific rollback by blocknum
void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash, ROLLBACK_LOG_NODE *log); void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, ROLLBACK_LOG_NODE *log);
// unpin a rollback node from the cachetable // unpin a rollback node from the cachetable
void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log); void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log);
...@@ -155,11 +155,9 @@ struct rollback_log_node { ...@@ -155,11 +155,9 @@ struct rollback_log_node {
// the sequence is between 0 and totalnodes-1 // the sequence is between 0 and totalnodes-1
uint64_t sequence; uint64_t sequence;
BLOCKNUM blocknum; // on which block does this node live? BLOCKNUM blocknum; // on which block does this node live?
uint32_t hash;
// which block number is the previous in the chain of rollback nodes // which block number is the previous in the chain of rollback nodes
// that make up this rollback log? // that make up this rollback log?
BLOCKNUM previous; BLOCKNUM previous;
uint32_t previous_hash;
struct roll_entry *oldest_logentry; struct roll_entry *oldest_logentry;
struct roll_entry *newest_logentry; struct roll_entry *newest_logentry;
MEMARENA rollentry_arena; MEMARENA rollentry_arena;
......
...@@ -96,7 +96,6 @@ PATENT RIGHTS GRANT: ...@@ -96,7 +96,6 @@ PATENT RIGHTS GRANT:
void rollback_log_node_cache::init (uint32_t max_num_avail_nodes) { void rollback_log_node_cache::init (uint32_t max_num_avail_nodes) {
XMALLOC_N(max_num_avail_nodes, m_avail_blocknums); XMALLOC_N(max_num_avail_nodes, m_avail_blocknums);
XMALLOC_N(max_num_avail_nodes, m_hashes);
m_max_num_avail = max_num_avail_nodes; m_max_num_avail = max_num_avail_nodes;
m_first = 0; m_first = 0;
m_num_avail = 0; m_num_avail = 0;
...@@ -110,7 +109,6 @@ void rollback_log_node_cache::init (uint32_t max_num_avail_nodes) { ...@@ -110,7 +109,6 @@ void rollback_log_node_cache::init (uint32_t max_num_avail_nodes) {
void rollback_log_node_cache::destroy() { void rollback_log_node_cache::destroy() {
toku_mutex_destroy(&m_mutex); toku_mutex_destroy(&m_mutex);
toku_free(m_avail_blocknums); toku_free(m_avail_blocknums);
toku_free(m_hashes);
} }
// returns true if rollback log node was successfully added, // returns true if rollback log node was successfully added,
...@@ -125,7 +123,6 @@ bool rollback_log_node_cache::give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_N ...@@ -125,7 +123,6 @@ bool rollback_log_node_cache::give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_N
index -= m_max_num_avail; index -= m_max_num_avail;
} }
m_avail_blocknums[index].b = log->blocknum.b; m_avail_blocknums[index].b = log->blocknum.b;
m_hashes[index] = log->hash;
m_num_avail++; m_num_avail++;
} }
toku_mutex_unlock(&m_mutex); toku_mutex_unlock(&m_mutex);
...@@ -144,11 +141,9 @@ bool rollback_log_node_cache::give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_N ...@@ -144,11 +141,9 @@ bool rollback_log_node_cache::give_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_N
// for getting a rollback log node // for getting a rollback log node
void rollback_log_node_cache::get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log){ void rollback_log_node_cache::get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NODE* log){
BLOCKNUM b = ROLLBACK_NONE; BLOCKNUM b = ROLLBACK_NONE;
uint32_t hash;
toku_mutex_lock(&m_mutex); toku_mutex_lock(&m_mutex);
if (m_num_avail > 0) { if (m_num_avail > 0) {
b.b = m_avail_blocknums[m_first].b; b.b = m_avail_blocknums[m_first].b;
hash = m_hashes[m_first];
m_num_avail--; m_num_avail--;
if (++m_first >= m_max_num_avail) { if (++m_first >= m_max_num_avail) {
m_first = 0; m_first = 0;
...@@ -156,7 +151,7 @@ void rollback_log_node_cache::get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NO ...@@ -156,7 +151,7 @@ void rollback_log_node_cache::get_rollback_log_node(TOKUTXN txn, ROLLBACK_LOG_NO
} }
toku_mutex_unlock(&m_mutex); toku_mutex_unlock(&m_mutex);
if (b.b != ROLLBACK_NONE.b) { if (b.b != ROLLBACK_NONE.b) {
toku_get_and_pin_rollback_log(txn, b, hash, log); toku_get_and_pin_rollback_log(txn, b, log);
invariant(rollback_log_is_unused(*log)); invariant(rollback_log_is_unused(*log));
} else { } else {
*log = NULL; *log = NULL;
......
...@@ -108,7 +108,6 @@ class rollback_log_node_cache { ...@@ -108,7 +108,6 @@ class rollback_log_node_cache {
private: private:
BLOCKNUM* m_avail_blocknums; BLOCKNUM* m_avail_blocknums;
uint32_t* m_hashes;
uint32_t m_first; uint32_t m_first;
uint32_t m_num_avail; uint32_t m_num_avail;
uint32_t m_max_num_avail; uint32_t m_max_num_avail;
......
...@@ -308,11 +308,8 @@ static void toku_txn_create_txn ( ...@@ -308,11 +308,8 @@ static void toku_txn_create_txn (
.num_rollentries_processed = 0, .num_rollentries_processed = 0,
.rollentry_raw_count = 0, .rollentry_raw_count = 0,
.spilled_rollback_head = ROLLBACK_NONE, .spilled_rollback_head = ROLLBACK_NONE,
.spilled_rollback_head_hash = 0,
.spilled_rollback_tail = ROLLBACK_NONE, .spilled_rollback_tail = ROLLBACK_NONE,
.spilled_rollback_tail_hash = 0,
.current_rollback = ROLLBACK_NONE, .current_rollback = ROLLBACK_NONE,
.current_rollback_hash = 0,
}; };
static txn_child_manager tcm; static txn_child_manager tcm;
...@@ -405,17 +402,9 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) { ...@@ -405,17 +402,9 @@ toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info) {
txn->roll_info.num_rollback_nodes = info->num_rollback_nodes; txn->roll_info.num_rollback_nodes = info->num_rollback_nodes;
txn->roll_info.num_rollentries = info->num_rollentries; txn->roll_info.num_rollentries = info->num_rollentries;
CACHEFILE rollback_cachefile = txn->logger->rollback_cachefile;
txn->roll_info.spilled_rollback_head = info->spilled_rollback_head; txn->roll_info.spilled_rollback_head = info->spilled_rollback_head;
txn->roll_info.spilled_rollback_head_hash = toku_cachetable_hash(rollback_cachefile,
txn->roll_info.spilled_rollback_head);
txn->roll_info.spilled_rollback_tail = info->spilled_rollback_tail; txn->roll_info.spilled_rollback_tail = info->spilled_rollback_tail;
txn->roll_info.spilled_rollback_tail_hash = toku_cachetable_hash(rollback_cachefile,
txn->roll_info.spilled_rollback_tail);
txn->roll_info.current_rollback = info->current_rollback; txn->roll_info.current_rollback = info->current_rollback;
txn->roll_info.current_rollback_hash = toku_cachetable_hash(rollback_cachefile,
txn->roll_info.current_rollback);
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment