Commit 82c79717 authored by Yoni Fogel's avatar Yoni Fogel

[t:2449] Bugfix in cachetable pairs for rollback log nodes.

We were using TOKUTXNs as the extraarg, but the TOKUTXN struct could be freed while the pair is still in memory.
We only used the TOKUTXN to get the cachefile and header.  We already are given the cachefile, so we
made the extraargs be the header

git-svn-id: file:///svn/toku/tokudb@19238 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0a1e0ddb
...@@ -220,7 +220,7 @@ int toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, struct brt_header ...@@ -220,7 +220,7 @@ int toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, struct brt_header
int toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE log, int toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE log,
struct brt_header *h, int n_workitems, int n_threads, struct brt_header *h, int n_workitems, int n_threads,
BOOL for_checkpoint); BOOL for_checkpoint);
int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, ROLLBACK_LOG_NODE *logp, TOKUTXN txn, struct brt_header *h); int toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, ROLLBACK_LOG_NODE *logp, struct brt_header *h);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, struct brt_header *h); int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, struct brt_header *h);
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */ unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len); int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......
...@@ -1746,7 +1746,7 @@ toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE log ...@@ -1746,7 +1746,7 @@ toku_serialize_rollback_log_to (int fd, BLOCKNUM blocknum, ROLLBACK_LOG_NODE log
static int static int
deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, ROLLBACK_LOG_NODE *log_p, deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, ROLLBACK_LOG_NODE *log_p,
TOKUTXN txn, struct brt_header *h, struct rbuf *rb) { struct brt_header *h, struct rbuf *rb) {
TAGMALLOC(ROLLBACK_LOG_NODE, result); TAGMALLOC(ROLLBACK_LOG_NODE, result);
int r; int r;
if (result==NULL) { if (result==NULL) {
...@@ -1769,10 +1769,6 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, ROLLB ...@@ -1769,10 +1769,6 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, ROLLB
//TODO: This is hard.. everything is shared in a single dictionary. //TODO: This is hard.. everything is shared in a single dictionary.
rbuf_TXNID(rb, &result->txnid); rbuf_TXNID(rb, &result->txnid);
result->sequence = rbuf_ulonglong(rb); result->sequence = rbuf_ulonglong(rb);
if (result->txnid == txn->txnid64 && result->sequence > txn->num_rollback_nodes) {
r = toku_db_badformat();
goto died0;
}
result->thislogname = rbuf_blocknum(rb); result->thislogname = rbuf_blocknum(rb);
if (result->thislogname.b != blocknum.b) { if (result->thislogname.b != blocknum.b) {
r = toku_db_badformat(); r = toku_db_badformat();
...@@ -1827,7 +1823,7 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, ROLLB ...@@ -1827,7 +1823,7 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, ROLLB
static int static int
deserialize_rollback_log_from_rbuf_versioned (u_int32_t version, BLOCKNUM blocknum, u_int32_t fullhash, deserialize_rollback_log_from_rbuf_versioned (u_int32_t version, BLOCKNUM blocknum, u_int32_t fullhash,
ROLLBACK_LOG_NODE *log, ROLLBACK_LOG_NODE *log,
TOKUTXN txn, struct brt_header *h, struct rbuf *rb) { struct brt_header *h, struct rbuf *rb) {
int r = 0; int r = 0;
ROLLBACK_LOG_NODE rollback_log_node = NULL; ROLLBACK_LOG_NODE rollback_log_node = NULL;
...@@ -1835,7 +1831,7 @@ deserialize_rollback_log_from_rbuf_versioned (u_int32_t version, BLOCKNUM blockn ...@@ -1835,7 +1831,7 @@ deserialize_rollback_log_from_rbuf_versioned (u_int32_t version, BLOCKNUM blockn
switch (version) { switch (version) {
case BRT_LAYOUT_VERSION: case BRT_LAYOUT_VERSION:
if (!upgrade) if (!upgrade)
r = deserialize_rollback_log_from_rbuf(blocknum, fullhash, &rollback_log_node, txn, h, rb); r = deserialize_rollback_log_from_rbuf(blocknum, fullhash, &rollback_log_node, h, rb);
if (r==0) { if (r==0) {
assert(rollback_log_node); assert(rollback_log_node);
*log = rollback_log_node; *log = rollback_log_node;
...@@ -1851,7 +1847,7 @@ deserialize_rollback_log_from_rbuf_versioned (u_int32_t version, BLOCKNUM blockn ...@@ -1851,7 +1847,7 @@ deserialize_rollback_log_from_rbuf_versioned (u_int32_t version, BLOCKNUM blockn
// Read rollback log node from file into struct. Perform version upgrade if necessary. // Read rollback log node from file into struct. Perform version upgrade if necessary.
int int
toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash,
ROLLBACK_LOG_NODE *logp, TOKUTXN txn, struct brt_header *h) { ROLLBACK_LOG_NODE *logp, struct brt_header *h) {
toku_trace("deserial start"); toku_trace("deserial start");
int r; int r;
...@@ -1869,7 +1865,7 @@ toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, u_int32_t fullhas ...@@ -1869,7 +1865,7 @@ toku_deserialize_rollback_log_from (int fd, BLOCKNUM blocknum, u_int32_t fullhas
} }
} }
r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, fullhash, logp, txn, h, &rb); r = deserialize_rollback_log_from_rbuf_versioned(layout_version, blocknum, fullhash, logp, h, &rb);
toku_trace("deserial done"); toku_trace("deserial done");
......
...@@ -322,15 +322,13 @@ toku_rollback_log_free(ROLLBACK_LOG_NODE *log_p) { ...@@ -322,15 +322,13 @@ toku_rollback_log_free(ROLLBACK_LOG_NODE *log_p) {
static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
void *rollback_v, void *extraargs, long UU(size), void *rollback_v, void *extraargs, long UU(size),
BOOL write_me, BOOL keep_me, BOOL for_checkpoint) { BOOL write_me, BOOL keep_me, BOOL for_checkpoint) {
assert(extraargs);
int r; int r;
TOKUTXN txn = extraargs; ROLLBACK_LOG_NODE log = rollback_v;
ROLLBACK_LOG_NODE log = rollback_v; struct brt_header *h = extraargs;
CACHEFILE rollback_cachefile = txn->logger->rollback_cachefile;
struct brt_header *h = toku_cachefile_get_userdata(rollback_cachefile);
assert(h->cf == cachefile);
assert(log->thislogname.b==logname.b); assert(log->thislogname.b==logname.b);
assert(rollback_cachefile == cachefile);
if (write_me && !h->panic) { if (write_me && !h->panic) {
int n_workitems, n_threads; int n_workitems, n_threads;
toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads); toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads);
...@@ -354,15 +352,12 @@ static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM ...@@ -354,15 +352,12 @@ static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM
static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash, static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash,
void **rollback_pv, long *sizep, void *extraargs) { void **rollback_pv, long *sizep, void *extraargs) {
assert(extraargs);
int r; int r;
TOKUTXN txn = extraargs; struct brt_header *h = extraargs;
CACHEFILE rollback_cachefile = txn->logger->rollback_cachefile; assert(h->cf == cachefile);
struct brt_header *h = toku_cachefile_get_userdata(rollback_cachefile);
assert(rollback_cachefile == cachefile);
ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv; ROLLBACK_LOG_NODE *result = (ROLLBACK_LOG_NODE*)rollback_pv;
r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, txn, h); r = toku_deserialize_rollback_log_from(fd, logname, fullhash, result, h);
if (r==0) { if (r==0) {
*sizep = rollback_memory_size(*result); *sizep = rollback_memory_size(*result);
} }
...@@ -396,7 +391,7 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o ...@@ -396,7 +391,7 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o
r=toku_cachetable_put(cf, log->thislogname, log->thishash, r=toku_cachetable_put(cf, log->thislogname, log->thishash,
log, rollback_memory_size(log), log, rollback_memory_size(log),
toku_rollback_flush_callback, toku_rollback_fetch_callback, toku_rollback_flush_callback, toku_rollback_fetch_callback,
txn); h);
assert(r==0); assert(r==0);
txn->current_rollback = log->thislogname; txn->current_rollback = log->thislogname;
txn->current_rollback_hash = log->thishash; txn->current_rollback_hash = log->thishash;
...@@ -602,10 +597,11 @@ toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) { ...@@ -602,10 +597,11 @@ toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
if (name.b != ROLLBACK_NONE.b) { if (name.b != ROLLBACK_NONE.b) {
uint32_t hash = log->older_hash; uint32_t hash = log->older_hash;
CACHEFILE cf = txn->logger->rollback_cachefile; CACHEFILE cf = txn->logger->rollback_cachefile;
struct brt_header *h = toku_cachefile_get_userdata(cf);
r = toku_cachefile_prefetch(cf, name, hash, r = toku_cachefile_prefetch(cf, name, hash,
toku_rollback_flush_callback, toku_rollback_flush_callback,
toku_rollback_fetch_callback, toku_rollback_fetch_callback,
txn); h);
assert(r==0); assert(r==0);
} }
return r; return r;
...@@ -625,10 +621,11 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO ...@@ -625,10 +621,11 @@ int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLO
if (!log) { if (!log) {
CACHEFILE cf = txn->logger->rollback_cachefile; CACHEFILE cf = txn->logger->rollback_cachefile;
void * log_v; void * log_v;
struct brt_header *h = toku_cachefile_get_userdata(cf);
r = toku_cachetable_get_and_pin(cf, name, hash, r = toku_cachetable_get_and_pin(cf, name, hash,
&log_v, NULL, &log_v, NULL,
toku_rollback_flush_callback, toku_rollback_fetch_callback, toku_rollback_flush_callback, toku_rollback_fetch_callback,
txn); h);
assert(r==0); assert(r==0);
log = (ROLLBACK_LOG_NODE)log_v; log = (ROLLBACK_LOG_NODE)log_v;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment